vrl/stdlib/
http_request.rs

1//! # HTTP Get Function
2//!
3//! This function allows making HTTP requests but is not recommended for frequent or performance-critical workflows.
4//! It performs synchronous blocking operations, which can negatively impact concurrency and increase response times.
5//!
6//! Due to potential network-related delays or failures, avoid using this function in latency-sensitive contexts.
7
8use crate::compiler::prelude::*;
9
10#[cfg(not(target_arch = "wasm32"))]
11#[allow(clippy::similar_names)]
12mod non_wasm {
13    use super::{
14        Context, Expression, ExpressionError, ExpressionExt, FunctionExpression, Resolved, TypeDef,
15        TypeState, Value, VrlValueConvert,
16    };
17    use crate::value::value::ObjectMap;
18    use reqwest_middleware::{
19        ClientBuilder, ClientWithMiddleware,
20        reqwest::{
21            Client, Method, Proxy,
22            header::{HeaderMap, HeaderName, HeaderValue},
23        },
24    };
25    use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
26    use std::sync::LazyLock;
27    use tokio::runtime::Handle;
28    use tokio::time::Duration;
29    use tokio::{runtime, task};
30
31    static STD_CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| build_client(None));
32
33    fn build_client(proxies: Option<Vec<Proxy>>) -> ClientWithMiddleware {
34        let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
35
36        let mut client_builder = Client::builder()
37            .connect_timeout(Duration::from_secs(5))
38            .timeout(Duration::from_secs(10));
39
40        if let Some(proxies) = proxies {
41            for proxy in proxies {
42                client_builder = client_builder.proxy(proxy);
43            }
44        }
45
46        let client = client_builder
47            .build()
48            .expect("Failed to create HTTP client");
49
50        ClientBuilder::new(client)
51            .with(RetryTransientMiddleware::new_with_policy(retry_policy))
52            .build()
53    }
54
55    /// Redacts sensitive header values to prevent them from appearing in error messages.
56    /// Headers like Authorization, Cookie, and API keys are replaced with ***.
57    fn redact_sensitive_headers(headers: &ObjectMap) -> ObjectMap {
58        const SENSITIVE_HEADERS: &[&str] = &[
59            "authorization",
60            "cookie",
61            "set-cookie",
62            "x-api-key",
63            "api-key",
64            "x-auth-token",
65            "proxy-authorization",
66        ];
67
68        headers
69            .iter()
70            .map(|(key, value)| {
71                let key_lower = key.as_ref().to_lowercase();
72                if SENSITIVE_HEADERS.contains(&key_lower.as_str())
73                    || key_lower.contains("token")
74                    || key_lower.contains("secret")
75                    || key_lower.contains("password")
76                {
77                    (key.clone(), Value::from("***"))
78                } else {
79                    (key.clone(), value.clone())
80                }
81            })
82            .collect()
83    }
84
85    async fn http_request(
86        client: &ClientWithMiddleware,
87        url: &Value,
88        method: &Value,
89        headers: Value,
90        body: &Value,
91        redact_headers: bool,
92    ) -> Resolved {
93        let url = url.try_bytes_utf8_lossy()?;
94        let method = method.try_bytes_utf8_lossy()?.to_uppercase();
95        let headers = headers.try_object()?;
96        let body = body.try_bytes_utf8_lossy()?;
97
98        let format_headers = |headers: &ObjectMap| -> Value {
99            if redact_headers {
100                Value::Object(redact_sensitive_headers(headers))
101            } else {
102                Value::Object(headers.clone())
103            }
104        };
105
106        let method = Method::try_from(method.as_str())
107            .map_err(|_| format!("Unsupported HTTP method: {method}"))?;
108        let mut header_map = HeaderMap::new();
109
110        for (key, value) in &headers {
111            let key = key
112                .parse::<HeaderName>()
113                .map_err(|_| format!("Invalid header key: {key}"))?;
114            let val = value
115                .try_bytes_utf8_lossy()
116                .map_err(|e| {
117                    format!(
118                        "Invalid header value for key '{key}': {} (headers: {})",
119                        e,
120                        format_headers(&headers)
121                    )
122                })?
123                .parse::<HeaderValue>()
124                .map_err(|_| {
125                    format!(
126                        "Invalid header value for key '{key}' (headers: {})",
127                        format_headers(&headers)
128                    )
129                })?;
130            header_map.insert(key, val);
131        }
132
133        let response = client
134            .request(method.clone(), url.as_ref())
135            .headers(header_map)
136            .body(body.as_bytes().to_owned())
137            .send()
138            .await
139            .map_err(|e| {
140                format!(
141                    "HTTP request failed: {} (url: {}, method: {}, headers: {})",
142                    e,
143                    url,
144                    method,
145                    format_headers(&headers)
146                )
147            })?;
148
149        let body = response
150            .text()
151            .await
152            .map_err(|e| format!("Failed to read response body: {e}"))?;
153
154        Ok(body.into())
155    }
156
157    fn make_proxies(
158        http_proxy: Option<Value>,
159        https_proxy: Option<Value>,
160    ) -> Result<Option<Vec<Proxy>>, ExpressionError> {
161        let mut proxies = Vec::new();
162
163        if let Some(http_proxy) = http_proxy {
164            proxies.push(
165                Proxy::http(http_proxy.try_bytes_utf8_lossy()?.as_ref())
166                    .map_err(|e| format!("Invalid proxy: {e}"))?,
167            );
168        }
169
170        if let Some(https_proxy) = https_proxy {
171            proxies.push(
172                Proxy::https(https_proxy.try_bytes_utf8_lossy()?.as_ref())
173                    .map_err(|e| format!("Invalid proxy: {e}"))?,
174            );
175        }
176
177        Ok((!proxies.is_empty()).then_some(proxies))
178    }
179
180    #[derive(Debug, Clone)]
181    pub(super) enum ClientOrProxies {
182        Client(ClientWithMiddleware),
183        Proxies {
184            http_proxy: Option<Box<dyn Expression>>,
185            https_proxy: Option<Box<dyn Expression>>,
186        },
187    }
188
189    impl ClientOrProxies {
190        pub(super) fn new(
191            state: &TypeState,
192            http_proxy: Option<Box<dyn Expression>>,
193            https_proxy: Option<Box<dyn Expression>>,
194        ) -> Result<Self, ExpressionError> {
195            let const_http_proxy = http_proxy
196                .as_ref()
197                .map(|http_proxy| http_proxy.resolve_constant(state));
198            let const_https_proxy = https_proxy
199                .as_ref()
200                .map(|https_proxy| https_proxy.resolve_constant(state));
201
202            match (const_http_proxy, const_https_proxy) {
203                // No proxies specified
204                (None, None) => Ok(Self::no_proxies()),
205                // Only http proxy specified and could resolve as constant
206                (Some(Some(http)), None) => {
207                    Ok(Self::Client(build_client(make_proxies(Some(http), None)?)))
208                }
209                // Only https proxy specified and could resolve as constant
210                (None, Some(Some(https))) => {
211                    Ok(Self::Client(build_client(make_proxies(None, Some(https))?)))
212                }
213                // Both proxies specified and could resolve as constants
214                (Some(Some(http)), Some(Some(https))) => Ok(Self::Client(build_client(
215                    make_proxies(Some(http), Some(https))?,
216                ))),
217                // Couldn't evaluate as constants
218                _ => Ok(Self::new_proxies_no_const_resolve(http_proxy, https_proxy)),
219            }
220        }
221
222        pub fn no_proxies() -> Self {
223            Self::Proxies {
224                http_proxy: None,
225                https_proxy: None,
226            }
227        }
228
229        pub fn new_proxies_no_const_resolve(
230            http_proxy: Option<Box<dyn Expression>>,
231            https_proxy: Option<Box<dyn Expression>>,
232        ) -> Self {
233            Self::Proxies {
234                http_proxy,
235                https_proxy,
236            }
237        }
238
239        fn get_client(&self, ctx: &mut Context) -> Result<ClientWithMiddleware, ExpressionError> {
240            match self {
241                Self::Client(client) => Ok(client.clone()),
242                Self::Proxies {
243                    http_proxy,
244                    https_proxy,
245                } => {
246                    let http_proxy = http_proxy
247                        .as_ref()
248                        .map(|http_proxy| http_proxy.resolve(ctx))
249                        .transpose()?;
250
251                    let https_proxy = https_proxy
252                        .as_ref()
253                        .map(|https_proxy| https_proxy.resolve(ctx))
254                        .transpose()?;
255
256                    if let Some(proxies) = make_proxies(http_proxy, https_proxy)? {
257                        Ok(build_client(Some(proxies)))
258                    } else {
259                        Ok(STD_CLIENT.clone())
260                    }
261                }
262            }
263        }
264    }
265
266    #[derive(Debug, Clone)]
267    pub(super) struct HttpRequestFn {
268        pub(super) url: Box<dyn Expression>,
269        pub(super) method: Option<Box<dyn Expression>>,
270        pub(super) headers: Option<Box<dyn Expression>>,
271        pub(super) body: Option<Box<dyn Expression>>,
272        pub(super) client_or_proxies: ClientOrProxies,
273        pub(super) redact_headers: Option<Box<dyn Expression>>,
274    }
275
276    impl FunctionExpression for HttpRequestFn {
277        fn resolve(&self, ctx: &mut Context) -> Resolved {
278            let url = self.url.resolve(ctx)?;
279            let method = self
280                .method
281                .map_resolve_with_default(ctx, || super::DEFAULT_METHOD.clone())?;
282            let headers = self
283                .headers
284                .map_resolve_with_default(ctx, || super::DEFAULT_HEADERS.clone())?;
285            let body = self
286                .body
287                .map_resolve_with_default(ctx, || super::DEFAULT_BODY.clone())?;
288            let client = self.client_or_proxies.get_client(ctx)?;
289            let redact_headers = self
290                .redact_headers
291                .map_resolve_with_default(ctx, || super::DEFAULT_REDACT_HEADERS.clone())?
292                .try_boolean()?;
293
294            // block_in_place runs the HTTP request synchronously
295            // without blocking Tokio's async worker threads.
296            // This temporarily moves execution to a blocking-compatible thread.
297            task::block_in_place(|| {
298                if let Ok(handle) = Handle::try_current() {
299                    handle.block_on(async {
300                        http_request(&client, &url, &method, headers, &body, redact_headers).await
301                    })
302                } else {
303                    let runtime = runtime::Builder::new_current_thread()
304                        .enable_all()
305                        .build()
306                        .expect("tokio runtime creation failed");
307
308                    runtime.block_on(async move {
309                        http_request(&client, &url, &method, headers, &body, redact_headers).await
310                    })
311                }
312            })
313        }
314
315        fn type_def(&self, _: &TypeState) -> TypeDef {
316            TypeDef::bytes().fallible()
317        }
318    }
319}
320
321#[allow(clippy::wildcard_imports)]
322#[cfg(not(target_arch = "wasm32"))]
323use non_wasm::*;
324
325use std::sync::LazyLock;
326
327static DEFAULT_METHOD: LazyLock<Value> = LazyLock::new(|| Value::Bytes(Bytes::from("get")));
328static DEFAULT_HEADERS: LazyLock<Value> =
329    LazyLock::new(|| Value::Object(std::collections::BTreeMap::new()));
330static DEFAULT_BODY: LazyLock<Value> = LazyLock::new(|| Value::Bytes(Bytes::from("")));
331static DEFAULT_REDACT_HEADERS: LazyLock<Value> = LazyLock::new(|| Value::Boolean(true));
332
333static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
334    vec![
335        Parameter::required("url", kind::BYTES, "The URL to make the HTTP request to."),
336        Parameter::optional(
337            "method",
338            kind::BYTES,
339            "The HTTP method to use (e.g., GET, POST, PUT, DELETE). Defaults to GET.",
340        )
341        .default(&DEFAULT_METHOD),
342        Parameter::optional(
343            "headers",
344            kind::OBJECT,
345            "An object containing HTTP headers to send with the request.",
346        )
347        .default(&DEFAULT_HEADERS),
348        Parameter::optional("body", kind::BYTES, "The request body content to send.")
349            .default(&DEFAULT_BODY),
350        Parameter::optional(
351            "http_proxy",
352            kind::BYTES,
353            "HTTP proxy URL to use for the request.",
354        ),
355        Parameter::optional(
356            "https_proxy",
357            kind::BYTES,
358            "HTTPS proxy URL to use for the request.",
359        ),
360        Parameter::optional(
361            "redact_headers",
362            kind::BOOLEAN,
363            "Whether to redact sensitive header values in error messages.",
364        )
365        .default(&DEFAULT_REDACT_HEADERS),
366    ]
367});
368
369#[derive(Clone, Copy, Debug)]
370pub struct HttpRequest;
371
372impl Function for HttpRequest {
373    fn identifier(&self) -> &'static str {
374        "http_request"
375    }
376
377    fn usage(&self) -> &'static str {
378        "Makes an HTTP request to the specified URL."
379    }
380
381    fn notices(&self) -> &'static [&'static str] {
382        &[indoc! {"
383            This function performs synchronous blocking operations and is not recommended for
384            frequent or performance-critical workflows due to potential network-related delays.
385        "}]
386    }
387
388    fn category(&self) -> &'static str {
389        Category::System.as_ref()
390    }
391
392    fn return_kind(&self) -> u16 {
393        kind::BYTES
394    }
395
396    #[cfg(not(feature = "test"))]
397    fn examples(&self) -> &'static [Example] {
398        &[]
399    }
400
401    #[cfg(feature = "test")]
402    fn examples(&self) -> &'static [Example] {
403        &[
404            example! {
405                title: "Basic HTTP request",
406                source: r#"http_request("https://httpbin.org/get")"#,
407                result: Ok(
408                    r#"{"args":{},"headers":{"Accept":"*/*","Host":"httpbin.org"},"url":"https://httpbin.org/get"}"#,
409                ),
410            },
411            example! {
412                title: "HTTP request with bearer token",
413                source: r#"http_request("https://httpbin.org/bearer", headers: {"Authorization": "Bearer my_token"})"#,
414                result: Ok(r#"{"authenticated":true,"token":"my_token"}"#),
415            },
416            example! {
417                title: "HTTP PUT request",
418                source: r#"http_request("https://httpbin.org/put", method: "put")"#,
419                result: Ok(r#"{"args":{},"data": "","url": "https://httpbin.org/put"}"#),
420            },
421            example! {
422                title: "HTTP POST request with body",
423                source: r#"http_request("https://httpbin.org/post", method: "post", body: "{\"data\":{\"hello\":\"world\"}}")"#,
424                result: Ok(r#"{"data":"{\"data\":{\"hello\":\"world\"}}"}"#),
425            },
426        ]
427    }
428
429    fn parameters(&self) -> &'static [Parameter] {
430        PARAMETERS.as_slice()
431    }
432
433    #[cfg(not(target_arch = "wasm32"))]
434    #[allow(clippy::similar_names)]
435    fn compile(
436        &self,
437        state: &state::TypeState,
438        _ctx: &mut FunctionCompileContext,
439        arguments: ArgumentList,
440    ) -> Compiled {
441        let url = arguments.required("url");
442        let method = arguments.optional("method");
443        let headers = arguments.optional("headers");
444        let body = arguments.optional("body");
445        let http_proxy = arguments.optional("http_proxy");
446        let https_proxy = arguments.optional("https_proxy");
447        let redact_headers = arguments.optional("redact_headers");
448
449        let client_or_proxies = ClientOrProxies::new(state, http_proxy, https_proxy)
450            .map_err(|err| Box::new(err) as Box<dyn DiagnosticMessage>)?;
451
452        Ok(HttpRequestFn {
453            url,
454            method,
455            headers,
456            body,
457            client_or_proxies,
458            redact_headers,
459        }
460        .as_expr())
461    }
462
463    #[cfg(target_arch = "wasm32")]
464    fn compile(
465        &self,
466        _state: &state::TypeState,
467        ctx: &mut FunctionCompileContext,
468        _arguments: ArgumentList,
469    ) -> Compiled {
470        Ok(
471            super::wasm_unsupported_function::WasmUnsupportedFunction::new(
472                ctx.span(),
473                TypeDef::bytes().fallible(),
474            )
475            .as_expr(),
476        )
477    }
478}
479
480#[cfg(all(feature = "test", test, not(target_arch = "wasm32")))]
481mod tests {
482    use super::*;
483    use crate::value;
484
485    fn execute_http_request(http_request_fn: &HttpRequestFn) -> Resolved {
486        let tz = TimeZone::default();
487        let mut object = value!({});
488        let mut runtime_state = state::RuntimeState::default();
489        let mut ctx = Context::new(&mut object, &mut runtime_state, &tz);
490        http_request_fn.resolve(&mut ctx)
491    }
492
493    #[tokio::test(flavor = "multi_thread")]
494    async fn test_basic_get_request() {
495        let func: HttpRequestFn = HttpRequestFn {
496            url: expr!("https://httpbin.org/get"),
497            method: Some(expr!("get")),
498            headers: Some(expr!({})),
499            body: Some(expr!("")),
500            client_or_proxies: ClientOrProxies::no_proxies(),
501            redact_headers: Some(expr!(true)),
502        };
503
504        let result = execute_http_request(&func).expect("HTTP request failed");
505
506        let body = result
507            .try_bytes_utf8_lossy()
508            .expect("Failed to convert response to string");
509        let response: serde_json::Value =
510            serde_json::from_str(body.as_ref()).expect("Failed to parse JSON");
511
512        assert!(response.get("url").is_some());
513        assert_eq!(response["url"], "https://httpbin.org/get");
514    }
515
516    #[tokio::test(flavor = "multi_thread")]
517    async fn test_malformed_url() {
518        let func = HttpRequestFn {
519            url: expr!("not-a-valid-url"),
520            method: Some(expr!("get")),
521            headers: Some(expr!({})),
522            body: Some(expr!("")),
523            client_or_proxies: ClientOrProxies::no_proxies(),
524            redact_headers: Some(expr!(true)),
525        };
526
527        let result = execute_http_request(&func);
528        assert!(result.is_err());
529        let error = result.unwrap_err();
530        assert!(error.to_string().contains("HTTP request failed"));
531    }
532
533    #[tokio::test(flavor = "multi_thread")]
534    async fn test_invalid_header() {
535        let func = HttpRequestFn {
536            url: expr!("https://httpbin.org/get"),
537            method: Some(expr!("get")),
538            headers: Some(expr!({"Invalid Header With Spaces": "value"})),
539            body: Some(expr!("")),
540            client_or_proxies: ClientOrProxies::no_proxies(),
541            redact_headers: Some(expr!(true)),
542        };
543
544        let result = execute_http_request(&func);
545        assert!(result.is_err());
546        let error = result.unwrap_err();
547        assert!(error.to_string().contains("Invalid header key"));
548    }
549
550    #[tokio::test(flavor = "multi_thread")]
551    async fn test_invalid_proxy() {
552        let func = HttpRequestFn {
553            url: expr!("https://httpbin.org/get"),
554            method: Some(expr!("get")),
555            headers: Some(expr!({})),
556            body: Some(expr!("")),
557            client_or_proxies: ClientOrProxies::new_proxies_no_const_resolve(
558                None,
559                Some(expr!("not^a&valid*url")),
560            ),
561            redact_headers: Some(expr!(true)),
562        };
563
564        let result = execute_http_request(&func);
565        assert!(result.is_err());
566        let error = result.unwrap_err();
567        assert!(error.to_string().contains("Invalid proxy"));
568    }
569
570    #[tokio::test(flavor = "multi_thread")]
571    async fn test_sensitive_headers_redacted() {
572        let func = HttpRequestFn {
573            url: expr!("not-a-valid-url"),
574            method: Some(expr!("get")),
575            headers: Some(expr!({
576                "Authorization": "Bearer super_secret_12345",
577                "X-Api-Key": "key-67890",
578                "Content-Type": "application/json",
579                "Cookie": "session=abcdef",
580                "X-Custom-Token": "another-secret",
581                "User-Agent": "VRL/0.28"
582            })),
583            body: Some(expr!("")),
584            client_or_proxies: ClientOrProxies::no_proxies(),
585            redact_headers: Some(expr!(true)),
586        };
587
588        let result = execute_http_request(&func);
589        assert!(result.is_err());
590        let error = result.unwrap_err().to_string();
591
592        // Verify that sensitive values are redacted
593        assert!(
594            !error.contains("super_secret_12345"),
595            "Authorization token should be redacted"
596        );
597        assert!(!error.contains("key-67890"), "API key should be redacted");
598        assert!(!error.contains("abcdef"), "Cookie should be redacted");
599        assert!(
600            !error.contains("another-secret"),
601            "Custom token should be redacted"
602        );
603
604        // Verify that redacted placeholder appears
605        assert!(error.contains("***"), "Should contain *** placeholder");
606
607        // Verify that non-sensitive headers are still visible
608        assert!(
609            error.contains("application/json"),
610            "Non-sensitive headers should not be redacted"
611        );
612        assert!(
613            error.contains("VRL/0.28"),
614            "User-Agent should not be redacted"
615        );
616    }
617
618    #[tokio::test(flavor = "multi_thread")]
619    async fn test_redact_headers_disabled() {
620        let func = HttpRequestFn {
621            url: expr!("not-a-valid-url"),
622            method: Some(expr!("get")),
623            headers: Some(expr!({
624                "Authorization": "Bearer super_secret_12345",
625                "Content-Type": "application/json"
626            })),
627            body: Some(expr!("")),
628            client_or_proxies: ClientOrProxies::no_proxies(),
629            redact_headers: Some(expr!(false)),
630        };
631
632        let result = execute_http_request(&func);
633        assert!(result.is_err());
634        let error = result.unwrap_err().to_string();
635
636        // With redaction disabled, sensitive values should be visible
637        assert!(
638            error.contains("super_secret_12345"),
639            "Authorization token should not be redacted when redact_headers is false"
640        );
641    }
642}