1use crate::compiler::prelude::*;
9
10#[cfg(not(target_arch = "wasm32"))]
11#[allow(clippy::similar_names)]
12mod non_wasm {
13 use super::{
14 Context, Expression, ExpressionError, ExpressionExt, FunctionExpression, Resolved, TypeDef,
15 TypeState, Value, VrlValueConvert,
16 };
17 use crate::value::value::ObjectMap;
18 use reqwest_middleware::{
19 ClientBuilder, ClientWithMiddleware,
20 reqwest::{
21 Client, Method, Proxy,
22 header::{HeaderMap, HeaderName, HeaderValue},
23 },
24 };
25 use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
26 use std::sync::LazyLock;
27 use tokio::runtime::Handle;
28 use tokio::time::Duration;
29 use tokio::{runtime, task};
30
31 static STD_CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| build_client(None));
32
33 fn build_client(proxies: Option<Vec<Proxy>>) -> ClientWithMiddleware {
34 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
35
36 let mut client_builder = Client::builder()
37 .connect_timeout(Duration::from_secs(5))
38 .timeout(Duration::from_secs(10));
39
40 if let Some(proxies) = proxies {
41 for proxy in proxies {
42 client_builder = client_builder.proxy(proxy);
43 }
44 }
45
46 let client = client_builder
47 .build()
48 .expect("Failed to create HTTP client");
49
50 ClientBuilder::new(client)
51 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
52 .build()
53 }
54
55 fn redact_sensitive_headers(headers: &ObjectMap) -> ObjectMap {
58 const SENSITIVE_HEADERS: &[&str] = &[
59 "authorization",
60 "cookie",
61 "set-cookie",
62 "x-api-key",
63 "api-key",
64 "x-auth-token",
65 "proxy-authorization",
66 ];
67
68 headers
69 .iter()
70 .map(|(key, value)| {
71 let key_lower = key.as_ref().to_lowercase();
72 if SENSITIVE_HEADERS.contains(&key_lower.as_str())
73 || key_lower.contains("token")
74 || key_lower.contains("secret")
75 || key_lower.contains("password")
76 {
77 (key.clone(), Value::from("***"))
78 } else {
79 (key.clone(), value.clone())
80 }
81 })
82 .collect()
83 }
84
85 async fn http_request(
86 client: &ClientWithMiddleware,
87 url: &Value,
88 method: &Value,
89 headers: Value,
90 body: &Value,
91 redact_headers: bool,
92 ) -> Resolved {
93 let url = url.try_bytes_utf8_lossy()?;
94 let method = method.try_bytes_utf8_lossy()?.to_uppercase();
95 let headers = headers.try_object()?;
96 let body = body.try_bytes_utf8_lossy()?;
97
98 let format_headers = |headers: &ObjectMap| -> Value {
99 if redact_headers {
100 Value::Object(redact_sensitive_headers(headers))
101 } else {
102 Value::Object(headers.clone())
103 }
104 };
105
106 let method = Method::try_from(method.as_str())
107 .map_err(|_| format!("Unsupported HTTP method: {method}"))?;
108 let mut header_map = HeaderMap::new();
109
110 for (key, value) in &headers {
111 let key = key
112 .parse::<HeaderName>()
113 .map_err(|_| format!("Invalid header key: {key}"))?;
114 let val = value
115 .try_bytes_utf8_lossy()
116 .map_err(|e| {
117 format!(
118 "Invalid header value for key '{key}': {} (headers: {})",
119 e,
120 format_headers(&headers)
121 )
122 })?
123 .parse::<HeaderValue>()
124 .map_err(|_| {
125 format!(
126 "Invalid header value for key '{key}' (headers: {})",
127 format_headers(&headers)
128 )
129 })?;
130 header_map.insert(key, val);
131 }
132
133 let response = client
134 .request(method.clone(), url.as_ref())
135 .headers(header_map)
136 .body(body.as_bytes().to_owned())
137 .send()
138 .await
139 .map_err(|e| {
140 format!(
141 "HTTP request failed: {} (url: {}, method: {}, headers: {})",
142 e,
143 url,
144 method,
145 format_headers(&headers)
146 )
147 })?;
148
149 let body = response
150 .text()
151 .await
152 .map_err(|e| format!("Failed to read response body: {e}"))?;
153
154 Ok(body.into())
155 }
156
157 fn make_proxies(
158 http_proxy: Option<Value>,
159 https_proxy: Option<Value>,
160 ) -> Result<Option<Vec<Proxy>>, ExpressionError> {
161 let mut proxies = Vec::new();
162
163 if let Some(http_proxy) = http_proxy {
164 proxies.push(
165 Proxy::http(http_proxy.try_bytes_utf8_lossy()?.as_ref())
166 .map_err(|e| format!("Invalid proxy: {e}"))?,
167 );
168 }
169
170 if let Some(https_proxy) = https_proxy {
171 proxies.push(
172 Proxy::https(https_proxy.try_bytes_utf8_lossy()?.as_ref())
173 .map_err(|e| format!("Invalid proxy: {e}"))?,
174 );
175 }
176
177 Ok((!proxies.is_empty()).then_some(proxies))
178 }
179
180 #[derive(Debug, Clone)]
181 pub(super) enum ClientOrProxies {
182 Client(ClientWithMiddleware),
183 Proxies {
184 http_proxy: Option<Box<dyn Expression>>,
185 https_proxy: Option<Box<dyn Expression>>,
186 },
187 }
188
189 impl ClientOrProxies {
190 pub(super) fn new(
191 state: &TypeState,
192 http_proxy: Option<Box<dyn Expression>>,
193 https_proxy: Option<Box<dyn Expression>>,
194 ) -> Result<Self, ExpressionError> {
195 let const_http_proxy = http_proxy
196 .as_ref()
197 .map(|http_proxy| http_proxy.resolve_constant(state));
198 let const_https_proxy = https_proxy
199 .as_ref()
200 .map(|https_proxy| https_proxy.resolve_constant(state));
201
202 match (const_http_proxy, const_https_proxy) {
203 (None, None) => Ok(Self::no_proxies()),
205 (Some(Some(http)), None) => {
207 Ok(Self::Client(build_client(make_proxies(Some(http), None)?)))
208 }
209 (None, Some(Some(https))) => {
211 Ok(Self::Client(build_client(make_proxies(None, Some(https))?)))
212 }
213 (Some(Some(http)), Some(Some(https))) => Ok(Self::Client(build_client(
215 make_proxies(Some(http), Some(https))?,
216 ))),
217 _ => Ok(Self::new_proxies_no_const_resolve(http_proxy, https_proxy)),
219 }
220 }
221
222 pub fn no_proxies() -> Self {
223 Self::Proxies {
224 http_proxy: None,
225 https_proxy: None,
226 }
227 }
228
229 pub fn new_proxies_no_const_resolve(
230 http_proxy: Option<Box<dyn Expression>>,
231 https_proxy: Option<Box<dyn Expression>>,
232 ) -> Self {
233 Self::Proxies {
234 http_proxy,
235 https_proxy,
236 }
237 }
238
239 fn get_client(&self, ctx: &mut Context) -> Result<ClientWithMiddleware, ExpressionError> {
240 match self {
241 Self::Client(client) => Ok(client.clone()),
242 Self::Proxies {
243 http_proxy,
244 https_proxy,
245 } => {
246 let http_proxy = http_proxy
247 .as_ref()
248 .map(|http_proxy| http_proxy.resolve(ctx))
249 .transpose()?;
250
251 let https_proxy = https_proxy
252 .as_ref()
253 .map(|https_proxy| https_proxy.resolve(ctx))
254 .transpose()?;
255
256 if let Some(proxies) = make_proxies(http_proxy, https_proxy)? {
257 Ok(build_client(Some(proxies)))
258 } else {
259 Ok(STD_CLIENT.clone())
260 }
261 }
262 }
263 }
264 }
265
266 #[derive(Debug, Clone)]
267 pub(super) struct HttpRequestFn {
268 pub(super) url: Box<dyn Expression>,
269 pub(super) method: Option<Box<dyn Expression>>,
270 pub(super) headers: Option<Box<dyn Expression>>,
271 pub(super) body: Option<Box<dyn Expression>>,
272 pub(super) client_or_proxies: ClientOrProxies,
273 pub(super) redact_headers: Option<Box<dyn Expression>>,
274 }
275
276 impl FunctionExpression for HttpRequestFn {
277 fn resolve(&self, ctx: &mut Context) -> Resolved {
278 let url = self.url.resolve(ctx)?;
279 let method = self
280 .method
281 .map_resolve_with_default(ctx, || super::DEFAULT_METHOD.clone())?;
282 let headers = self
283 .headers
284 .map_resolve_with_default(ctx, || super::DEFAULT_HEADERS.clone())?;
285 let body = self
286 .body
287 .map_resolve_with_default(ctx, || super::DEFAULT_BODY.clone())?;
288 let client = self.client_or_proxies.get_client(ctx)?;
289 let redact_headers = self
290 .redact_headers
291 .map_resolve_with_default(ctx, || super::DEFAULT_REDACT_HEADERS.clone())?
292 .try_boolean()?;
293
294 task::block_in_place(|| {
298 if let Ok(handle) = Handle::try_current() {
299 handle.block_on(async {
300 http_request(&client, &url, &method, headers, &body, redact_headers).await
301 })
302 } else {
303 let runtime = runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 .expect("tokio runtime creation failed");
307
308 runtime.block_on(async move {
309 http_request(&client, &url, &method, headers, &body, redact_headers).await
310 })
311 }
312 })
313 }
314
315 fn type_def(&self, _: &TypeState) -> TypeDef {
316 TypeDef::bytes().fallible()
317 }
318 }
319}
320
321#[allow(clippy::wildcard_imports)]
322#[cfg(not(target_arch = "wasm32"))]
323use non_wasm::*;
324
325use std::sync::LazyLock;
326
327static DEFAULT_METHOD: LazyLock<Value> = LazyLock::new(|| Value::Bytes(Bytes::from("get")));
328static DEFAULT_HEADERS: LazyLock<Value> =
329 LazyLock::new(|| Value::Object(std::collections::BTreeMap::new()));
330static DEFAULT_BODY: LazyLock<Value> = LazyLock::new(|| Value::Bytes(Bytes::from("")));
331static DEFAULT_REDACT_HEADERS: LazyLock<Value> = LazyLock::new(|| Value::Boolean(true));
332
333static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
334 vec![
335 Parameter::required("url", kind::BYTES, "The URL to make the HTTP request to."),
336 Parameter::optional(
337 "method",
338 kind::BYTES,
339 "The HTTP method to use (e.g., GET, POST, PUT, DELETE). Defaults to GET.",
340 )
341 .default(&DEFAULT_METHOD),
342 Parameter::optional(
343 "headers",
344 kind::OBJECT,
345 "An object containing HTTP headers to send with the request.",
346 )
347 .default(&DEFAULT_HEADERS),
348 Parameter::optional("body", kind::BYTES, "The request body content to send.")
349 .default(&DEFAULT_BODY),
350 Parameter::optional(
351 "http_proxy",
352 kind::BYTES,
353 "HTTP proxy URL to use for the request.",
354 ),
355 Parameter::optional(
356 "https_proxy",
357 kind::BYTES,
358 "HTTPS proxy URL to use for the request.",
359 ),
360 Parameter::optional(
361 "redact_headers",
362 kind::BOOLEAN,
363 "Whether to redact sensitive header values in error messages.",
364 )
365 .default(&DEFAULT_REDACT_HEADERS),
366 ]
367});
368
369#[derive(Clone, Copy, Debug)]
370pub struct HttpRequest;
371
372impl Function for HttpRequest {
373 fn identifier(&self) -> &'static str {
374 "http_request"
375 }
376
377 fn usage(&self) -> &'static str {
378 "Makes an HTTP request to the specified URL."
379 }
380
381 fn notices(&self) -> &'static [&'static str] {
382 &[indoc! {"
383 This function performs synchronous blocking operations and is not recommended for
384 frequent or performance-critical workflows due to potential network-related delays.
385 "}]
386 }
387
388 fn category(&self) -> &'static str {
389 Category::System.as_ref()
390 }
391
392 fn return_kind(&self) -> u16 {
393 kind::BYTES
394 }
395
396 #[cfg(not(feature = "test"))]
397 fn examples(&self) -> &'static [Example] {
398 &[]
399 }
400
401 #[cfg(feature = "test")]
402 fn examples(&self) -> &'static [Example] {
403 &[
404 example! {
405 title: "Basic HTTP request",
406 source: r#"http_request("https://httpbin.org/get")"#,
407 result: Ok(
408 r#"{"args":{},"headers":{"Accept":"*/*","Host":"httpbin.org"},"url":"https://httpbin.org/get"}"#,
409 ),
410 },
411 example! {
412 title: "HTTP request with bearer token",
413 source: r#"http_request("https://httpbin.org/bearer", headers: {"Authorization": "Bearer my_token"})"#,
414 result: Ok(r#"{"authenticated":true,"token":"my_token"}"#),
415 },
416 example! {
417 title: "HTTP PUT request",
418 source: r#"http_request("https://httpbin.org/put", method: "put")"#,
419 result: Ok(r#"{"args":{},"data": "","url": "https://httpbin.org/put"}"#),
420 },
421 example! {
422 title: "HTTP POST request with body",
423 source: r#"http_request("https://httpbin.org/post", method: "post", body: "{\"data\":{\"hello\":\"world\"}}")"#,
424 result: Ok(r#"{"data":"{\"data\":{\"hello\":\"world\"}}"}"#),
425 },
426 ]
427 }
428
429 fn parameters(&self) -> &'static [Parameter] {
430 PARAMETERS.as_slice()
431 }
432
433 #[cfg(not(target_arch = "wasm32"))]
434 #[allow(clippy::similar_names)]
435 fn compile(
436 &self,
437 state: &state::TypeState,
438 _ctx: &mut FunctionCompileContext,
439 arguments: ArgumentList,
440 ) -> Compiled {
441 let url = arguments.required("url");
442 let method = arguments.optional("method");
443 let headers = arguments.optional("headers");
444 let body = arguments.optional("body");
445 let http_proxy = arguments.optional("http_proxy");
446 let https_proxy = arguments.optional("https_proxy");
447 let redact_headers = arguments.optional("redact_headers");
448
449 let client_or_proxies = ClientOrProxies::new(state, http_proxy, https_proxy)
450 .map_err(|err| Box::new(err) as Box<dyn DiagnosticMessage>)?;
451
452 Ok(HttpRequestFn {
453 url,
454 method,
455 headers,
456 body,
457 client_or_proxies,
458 redact_headers,
459 }
460 .as_expr())
461 }
462
463 #[cfg(target_arch = "wasm32")]
464 fn compile(
465 &self,
466 _state: &state::TypeState,
467 ctx: &mut FunctionCompileContext,
468 _arguments: ArgumentList,
469 ) -> Compiled {
470 Ok(
471 super::wasm_unsupported_function::WasmUnsupportedFunction::new(
472 ctx.span(),
473 TypeDef::bytes().fallible(),
474 )
475 .as_expr(),
476 )
477 }
478}
479
480#[cfg(all(feature = "test", test, not(target_arch = "wasm32")))]
481mod tests {
482 use super::*;
483 use crate::value;
484
485 fn execute_http_request(http_request_fn: &HttpRequestFn) -> Resolved {
486 let tz = TimeZone::default();
487 let mut object = value!({});
488 let mut runtime_state = state::RuntimeState::default();
489 let mut ctx = Context::new(&mut object, &mut runtime_state, &tz);
490 http_request_fn.resolve(&mut ctx)
491 }
492
493 #[tokio::test(flavor = "multi_thread")]
494 async fn test_basic_get_request() {
495 let func: HttpRequestFn = HttpRequestFn {
496 url: expr!("https://httpbin.org/get"),
497 method: Some(expr!("get")),
498 headers: Some(expr!({})),
499 body: Some(expr!("")),
500 client_or_proxies: ClientOrProxies::no_proxies(),
501 redact_headers: Some(expr!(true)),
502 };
503
504 let result = execute_http_request(&func).expect("HTTP request failed");
505
506 let body = result
507 .try_bytes_utf8_lossy()
508 .expect("Failed to convert response to string");
509 let response: serde_json::Value =
510 serde_json::from_str(body.as_ref()).expect("Failed to parse JSON");
511
512 assert!(response.get("url").is_some());
513 assert_eq!(response["url"], "https://httpbin.org/get");
514 }
515
516 #[tokio::test(flavor = "multi_thread")]
517 async fn test_malformed_url() {
518 let func = HttpRequestFn {
519 url: expr!("not-a-valid-url"),
520 method: Some(expr!("get")),
521 headers: Some(expr!({})),
522 body: Some(expr!("")),
523 client_or_proxies: ClientOrProxies::no_proxies(),
524 redact_headers: Some(expr!(true)),
525 };
526
527 let result = execute_http_request(&func);
528 assert!(result.is_err());
529 let error = result.unwrap_err();
530 assert!(error.to_string().contains("HTTP request failed"));
531 }
532
533 #[tokio::test(flavor = "multi_thread")]
534 async fn test_invalid_header() {
535 let func = HttpRequestFn {
536 url: expr!("https://httpbin.org/get"),
537 method: Some(expr!("get")),
538 headers: Some(expr!({"Invalid Header With Spaces": "value"})),
539 body: Some(expr!("")),
540 client_or_proxies: ClientOrProxies::no_proxies(),
541 redact_headers: Some(expr!(true)),
542 };
543
544 let result = execute_http_request(&func);
545 assert!(result.is_err());
546 let error = result.unwrap_err();
547 assert!(error.to_string().contains("Invalid header key"));
548 }
549
550 #[tokio::test(flavor = "multi_thread")]
551 async fn test_invalid_proxy() {
552 let func = HttpRequestFn {
553 url: expr!("https://httpbin.org/get"),
554 method: Some(expr!("get")),
555 headers: Some(expr!({})),
556 body: Some(expr!("")),
557 client_or_proxies: ClientOrProxies::new_proxies_no_const_resolve(
558 None,
559 Some(expr!("not^a&valid*url")),
560 ),
561 redact_headers: Some(expr!(true)),
562 };
563
564 let result = execute_http_request(&func);
565 assert!(result.is_err());
566 let error = result.unwrap_err();
567 assert!(error.to_string().contains("Invalid proxy"));
568 }
569
570 #[tokio::test(flavor = "multi_thread")]
571 async fn test_sensitive_headers_redacted() {
572 let func = HttpRequestFn {
573 url: expr!("not-a-valid-url"),
574 method: Some(expr!("get")),
575 headers: Some(expr!({
576 "Authorization": "Bearer super_secret_12345",
577 "X-Api-Key": "key-67890",
578 "Content-Type": "application/json",
579 "Cookie": "session=abcdef",
580 "X-Custom-Token": "another-secret",
581 "User-Agent": "VRL/0.28"
582 })),
583 body: Some(expr!("")),
584 client_or_proxies: ClientOrProxies::no_proxies(),
585 redact_headers: Some(expr!(true)),
586 };
587
588 let result = execute_http_request(&func);
589 assert!(result.is_err());
590 let error = result.unwrap_err().to_string();
591
592 assert!(
594 !error.contains("super_secret_12345"),
595 "Authorization token should be redacted"
596 );
597 assert!(!error.contains("key-67890"), "API key should be redacted");
598 assert!(!error.contains("abcdef"), "Cookie should be redacted");
599 assert!(
600 !error.contains("another-secret"),
601 "Custom token should be redacted"
602 );
603
604 assert!(error.contains("***"), "Should contain *** placeholder");
606
607 assert!(
609 error.contains("application/json"),
610 "Non-sensitive headers should not be redacted"
611 );
612 assert!(
613 error.contains("VRL/0.28"),
614 "User-Agent should not be redacted"
615 );
616 }
617
618 #[tokio::test(flavor = "multi_thread")]
619 async fn test_redact_headers_disabled() {
620 let func = HttpRequestFn {
621 url: expr!("not-a-valid-url"),
622 method: Some(expr!("get")),
623 headers: Some(expr!({
624 "Authorization": "Bearer super_secret_12345",
625 "Content-Type": "application/json"
626 })),
627 body: Some(expr!("")),
628 client_or_proxies: ClientOrProxies::no_proxies(),
629 redact_headers: Some(expr!(false)),
630 };
631
632 let result = execute_http_request(&func);
633 assert!(result.is_err());
634 let error = result.unwrap_err().to_string();
635
636 assert!(
638 error.contains("super_secret_12345"),
639 "Authorization token should not be redacted when redact_headers is false"
640 );
641 }
642}