vector/sources/http_client/
client.rs

1//! Generalized HTTP client source.
2//! Calls an endpoint at an interval, decoding the HTTP responses into events.
3
4use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14    TimeZone,
15    codecs::{
16        StreamDecodingError,
17        decoding::{DeserializerConfig, FramingConfig},
18    },
19    compile_vrl,
20    config::{LogNamespace, SourceOutput, log_schema},
21    configurable::configurable_component,
22    event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25    compiler::{CompileConfig, Function, Program, runtime::Runtime},
26    prelude::TypeState,
27};
28
29use crate::{
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext},
32    format_vrl_diagnostics,
33    http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34    serde::{default_decoding, default_framing_message_based},
35    sources,
36    sources::util::{
37        http::HttpMethod,
38        http_client,
39        http_client::{
40            GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41            default_timeout, warn_if_interval_too_low,
42        },
43    },
44    tls::{TlsConfig, TlsSettings},
45};
46
47/// Configuration for the `http_client` source.
48#[serde_as]
49#[configurable_component(source(
50    "http_client",
51    "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55    /// The HTTP endpoint to collect events from.
56    ///
57    /// The full path must be specified.
58    #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59    pub endpoint: String,
60
61    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
62    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
63    /// to a value lower than the scrape interval to prevent this from happening.
64    #[serde(default = "default_interval")]
65    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66    #[serde(rename = "scrape_interval_secs")]
67    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68    pub interval: Duration,
69
70    /// The timeout for each scrape request.
71    #[serde(default = "default_timeout")]
72    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73    #[serde(rename = "scrape_timeout_secs")]
74    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75    pub timeout: Duration,
76
77    /// Custom parameters for the HTTP request query string.
78    ///
79    /// One or more values for the same parameter key can be provided.
80    ///
81    /// The parameters provided in this option are appended to any parameters
82    /// manually provided in the `endpoint` option.
83    ///
84    /// VRL functions are supported within query parameters. You can
85    /// use functions like `now()` to dynamically modify query
86    /// parameter values.
87    #[serde(default)]
88    #[configurable(metadata(
89        docs::additional_props_description = "A query string parameter and its value(s)."
90    ))]
91    #[configurable(metadata(docs::examples = "query_examples()"))]
92    pub query: QueryParameters,
93
94    #[configurable(derived)]
95    #[serde(default = "default_decoding")]
96    pub decoding: DeserializerConfig,
97
98    /// Framing to use in the decoding.
99    #[configurable(derived)]
100    #[serde(default = "default_framing_message_based")]
101    pub framing: FramingConfig,
102
103    /// Headers to apply to the HTTP requests.
104    ///
105    /// One or more values for the same header can be provided.
106    #[serde(default)]
107    #[configurable(metadata(
108        docs::additional_props_description = "An HTTP request header and its value(s)."
109    ))]
110    #[configurable(metadata(docs::examples = "headers_examples()"))]
111    pub headers: HashMap<String, Vec<String>>,
112
113    /// Specifies the method of the HTTP request.
114    #[serde(default = "default_http_method")]
115    pub method: HttpMethod,
116
117    /// Raw data to send as the HTTP request body.
118    ///
119    /// Can be a static string or a VRL expression.
120    ///
121    /// When a body is provided, the `Content-Type` header is automatically set to
122    /// `application/json` unless explicitly overridden in the `headers` configuration.
123    #[serde(default)]
124    pub body: Option<ParameterValue>,
125
126    /// TLS configuration.
127    #[configurable(derived)]
128    pub tls: Option<TlsConfig>,
129
130    /// HTTP Authentication.
131    #[configurable(derived)]
132    pub auth: Option<Auth>,
133
134    /// The namespace to use for logs. This overrides the global setting.
135    #[configurable(metadata(docs::hidden))]
136    #[serde(default)]
137    pub log_namespace: Option<bool>,
138}
139
140const fn default_http_method() -> HttpMethod {
141    HttpMethod::Get
142}
143
144fn query_examples() -> QueryParameters {
145    HashMap::<_, _>::from_iter([
146        (
147            "field".to_owned(),
148            QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
149        ),
150        (
151            "fruit".to_owned(),
152            QueryParameterValue::MultiParams(vec![
153                ParameterValue::String("mango".to_owned()),
154                ParameterValue::String("papaya".to_owned()),
155                ParameterValue::String("kiwi".to_owned()),
156            ]),
157        ),
158        (
159            "start_time".to_owned(),
160            QueryParameterValue::SingleParam(ParameterValue::Typed {
161                value: "now()".to_owned(),
162                r#type: ParamType::Vrl,
163            }),
164        ),
165    ])
166}
167
168fn headers_examples() -> HashMap<String, Vec<String>> {
169    HashMap::<_, _>::from_iter([
170        (
171            "Accept".to_owned(),
172            vec!["text/plain".to_owned(), "text/html".to_owned()],
173        ),
174        (
175            "X-My-Custom-Header".to_owned(),
176            vec![
177                "a".to_owned(),
178                "vector".to_owned(),
179                "of".to_owned(),
180                "values".to_owned(),
181            ],
182        ),
183    ])
184}
185
186/// Helper function to get all VRL functions for compilation
187fn get_vrl_functions() -> Vec<Box<dyn Function>> {
188    vrl::stdlib::all()
189        .into_iter()
190        .chain(vector_lib::enrichment::vrl_functions())
191        .chain(vector_vrl_functions::all())
192        .collect()
193}
194
195/// Helper function to compile a VRL parameter value into a Program
196fn compile_parameter_vrl(
197    param: &ParameterValue,
198    functions: &[Box<dyn Function>],
199) -> Result<Option<Program>, sources::BuildError> {
200    if !param.is_vrl() {
201        return Ok(None);
202    }
203
204    let state = TypeState::default();
205    let config = CompileConfig::default();
206
207    match compile_vrl(param.value(), functions, &state, config) {
208        Ok(compilation_result) => {
209            if !compilation_result.warnings.is_empty() {
210                let warnings = format_vrl_diagnostics(param.value(), compilation_result.warnings);
211                warn!(message = "VRL compilation warnings.", %warnings);
212            }
213            Ok(Some(compilation_result.program))
214        }
215        Err(diagnostics) => {
216            let error = format_vrl_diagnostics(param.value(), diagnostics);
217            Err(sources::BuildError::VrlCompilationError {
218                message: format!("VRL compilation failed: {}", error),
219            })
220        }
221    }
222}
223
224impl Default for HttpClientConfig {
225    fn default() -> Self {
226        Self {
227            endpoint: "http://localhost:9898/logs".to_string(),
228            query: HashMap::new(),
229            interval: default_interval(),
230            timeout: default_timeout(),
231            decoding: default_decoding(),
232            framing: default_framing_message_based(),
233            headers: HashMap::new(),
234            method: default_http_method(),
235            body: None,
236            tls: None,
237            auth: None,
238            log_namespace: None,
239        }
240    }
241}
242
243impl_generate_config_from_default!(HttpClientConfig);
244
245#[derive(Clone)]
246pub struct CompiledParam {
247    value: String,
248    program: Option<Program>,
249}
250
251#[derive(Clone)]
252pub enum CompiledQueryParameterValue {
253    SingleParam(Box<CompiledParam>),
254    MultiParams(Vec<CompiledParam>),
255}
256
257impl CompiledQueryParameterValue {
258    fn has_vrl(&self) -> bool {
259        match self {
260            CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
261            CompiledQueryParameterValue::MultiParams(params) => {
262                params.iter().any(|p| p.program.is_some())
263            }
264        }
265    }
266}
267
268#[derive(Clone)]
269pub struct Query {
270    original: HashMap<String, QueryParameterValue>,
271    compiled: HashMap<String, CompiledQueryParameterValue>,
272    has_vrl: bool,
273}
274
275impl Query {
276    pub fn new(params: &HashMap<String, QueryParameterValue>) -> Result<Self, sources::BuildError> {
277        let functions = get_vrl_functions();
278
279        let mut compiled: HashMap<String, CompiledQueryParameterValue> = HashMap::new();
280
281        for (k, v) in params.iter() {
282            let compiled_param = Self::compile_param(v, &functions)?;
283            compiled.insert(k.clone(), compiled_param);
284        }
285
286        let has_vrl = compiled.values().any(|v| v.has_vrl());
287
288        Ok(Query {
289            original: params.clone(),
290            compiled,
291            has_vrl,
292        })
293    }
294
295    fn compile_value(
296        param: &ParameterValue,
297        functions: &[Box<dyn Function>],
298    ) -> Result<CompiledParam, sources::BuildError> {
299        let program = compile_parameter_vrl(param, functions)?;
300
301        Ok(CompiledParam {
302            value: param.value().to_string(),
303            program,
304        })
305    }
306
307    fn compile_param(
308        value: &QueryParameterValue,
309        functions: &[Box<dyn Function>],
310    ) -> Result<CompiledQueryParameterValue, sources::BuildError> {
311        match value {
312            QueryParameterValue::SingleParam(param) => {
313                Ok(CompiledQueryParameterValue::SingleParam(Box::new(
314                    Self::compile_value(param, functions)?,
315                )))
316            }
317            QueryParameterValue::MultiParams(params) => {
318                let compiled = params
319                    .iter()
320                    .map(|p| Self::compile_value(p, functions))
321                    .collect::<Result<Vec<_>, _>>()?;
322                Ok(CompiledQueryParameterValue::MultiParams(compiled))
323            }
324        }
325    }
326}
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "http_client")]
330impl SourceConfig for HttpClientConfig {
331    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
332        let query = Query::new(&self.query)?;
333        let functions = get_vrl_functions();
334
335        // Compile body if present
336        let body = self
337            .body
338            .as_ref()
339            .map(|body_param| -> Result<CompiledParam, sources::BuildError> {
340                let program = compile_parameter_vrl(body_param, &functions)?;
341                Ok(CompiledParam {
342                    value: body_param.value().to_string(),
343                    program,
344                })
345            })
346            .transpose()?;
347
348        // Build the base URLs
349        let endpoints = [self.endpoint.clone()];
350        let urls: Vec<Uri> = endpoints
351            .iter()
352            .map(|s| {
353                let uri = s.parse::<Uri>().context(sources::UriParseSnafu)?;
354                // For URLs with VRL expressions, add query parameters dynamically during request
355                // For URLs without VRL expressions, add query parameters now
356                Ok(if query.has_vrl {
357                    uri
358                } else {
359                    build_url(&uri, &query.original)
360                })
361            })
362            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
363
364        let tls = TlsSettings::from_options(self.tls.as_ref())?;
365
366        let log_namespace = cx.log_namespace(self.log_namespace);
367
368        // build the decoder
369        let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
370
371        let content_type = self.decoding.content_type(&self.framing).to_string();
372
373        // Create context with the config for dynamic query parameter and body evaluation
374        let context = HttpClientContext {
375            decoder,
376            log_namespace,
377            query,
378            body,
379        };
380
381        warn_if_interval_too_low(self.timeout, self.interval);
382
383        let inputs = GenericHttpClientInputs {
384            urls,
385            interval: self.interval,
386            timeout: self.timeout,
387            headers: self.headers.clone(),
388            content_type,
389            auth: self.auth.clone(),
390            tls,
391            proxy: cx.proxy.clone(),
392            shutdown: cx.shutdown,
393        };
394
395        Ok(call(inputs, context, cx.out, self.method).boxed())
396    }
397
398    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
399        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
400        // and is merged here.
401        let log_namespace = global_log_namespace.merge(self.log_namespace);
402
403        let schema_definition = self
404            .decoding
405            .schema_definition(log_namespace)
406            .with_standard_vector_source_metadata();
407
408        vec![SourceOutput::new_maybe_logs(
409            self.decoding.output_type(),
410            schema_definition,
411        )]
412    }
413
414    fn can_acknowledge(&self) -> bool {
415        false
416    }
417}
418
419impl HttpClientConfig {
420    pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
421        let decoding = self.decoding.clone();
422        let framing = self.framing.clone();
423        let log_namespace =
424            log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
425
426        DecodingConfig::new(framing, decoding, log_namespace)
427    }
428}
429
430/// Captures the configuration options required to decode the incoming requests into events.
431#[derive(Clone)]
432pub struct HttpClientContext {
433    pub decoder: Decoder,
434    pub log_namespace: LogNamespace,
435    query: Query,
436    body: Option<CompiledParam>,
437}
438
439impl HttpClientContext {
440    /// Decode the events from the byte buffer
441    fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
442        let mut events = Vec::new();
443        loop {
444            match self.decoder.decode_eof(buf) {
445                Ok(Some((next, _))) => {
446                    events.extend(next);
447                }
448                Ok(None) => break,
449                Err(error) => {
450                    // Error is logged by `crate::codecs::Decoder`, no further
451                    // handling is needed here.
452                    if !error.can_continue() {
453                        break;
454                    }
455                    break;
456                }
457            }
458        }
459        events
460    }
461}
462
463impl HttpClientBuilder for HttpClientContext {
464    type Context = HttpClientContext;
465
466    /// No additional context from request data is needed from this particular client.
467    fn build(&self, _uri: &Uri) -> Self::Context {
468        self.clone()
469    }
470}
471
472fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
473    let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
474    let timezone = TimeZone::default();
475
476    Runtime::default()
477        .resolve(&mut target, program, &timezone)
478        .map_err(|error| {
479            warn!(message = "VRL runtime error.", source = %value, %error);
480        })
481        .ok()
482        .and_then(|vrl_value| {
483            let json_value = serde_json::to_value(vrl_value).ok()?;
484
485            // Properly handle VRL values, so that key1: `upcase("foo")` will resolve
486            // properly as endpoint.com/key1=FOO and not endpoint.com/key1="FOO"
487            // similarly, `now()` should resolve to endpoint.com/key1=2025-06-07T10:39:08.662735Z
488            // and not endpoint.com/key1=t'2025-06-07T10:39:08.662735Z'
489            let resolved_string = match json_value {
490                serde_json::Value::String(s) => s,
491                value => value.to_string(),
492            };
493            Some(resolved_string)
494        })
495}
496
497/// Resolve a compiled parameter, handling VRL evaluation if present
498fn resolve_compiled_param(compiled: &CompiledParam) -> Option<String> {
499    match &compiled.program {
500        Some(program) => resolve_vrl(&compiled.value, program),
501        None => Some(compiled.value.clone()),
502    }
503}
504
505impl http_client::HttpClientContext for HttpClientContext {
506    /// Decodes the HTTP response body into events per the decoder configured.
507    fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
508        // get the body into a byte array
509        let mut buf = BytesMut::new();
510        buf.extend_from_slice(body);
511
512        let events = self.decode_events(&mut buf);
513
514        Some(events)
515    }
516
517    /// Get the request body to send with the HTTP request
518    fn get_request_body(&self) -> Option<String> {
519        self.body.as_ref().and_then(resolve_compiled_param)
520    }
521
522    /// Process the URL dynamically before each request
523    fn process_url(&self, url: &Uri) -> Option<Uri> {
524        if !self.query.has_vrl {
525            return None;
526        }
527
528        // Resolve all query parameters with VRL expressions
529        let processed_query: Option<HashMap<_, _>> = self
530            .query
531            .compiled
532            .iter()
533            .map(|(name, value)| {
534                let resolved = match value {
535                    CompiledQueryParameterValue::SingleParam(param) => {
536                        let result = resolve_compiled_param(param)?;
537                        QueryParameterValue::SingleParam(ParameterValue::String(result))
538                    }
539                    CompiledQueryParameterValue::MultiParams(params) => {
540                        let results: Option<Vec<_>> = params
541                            .iter()
542                            .map(|p| resolve_compiled_param(p).map(ParameterValue::String))
543                            .collect();
544                        QueryParameterValue::MultiParams(results?)
545                    }
546                };
547                Some((name.clone(), resolved))
548            })
549            .collect();
550
551        // Build base URI and add query parameters
552        let base_uri = Uri::builder()
553            .scheme(
554                url.scheme()
555                    .cloned()
556                    .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
557            )
558            .authority(
559                url.authority()
560                    .cloned()
561                    .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
562            )
563            .path_and_query(url.path().to_string())
564            .build()
565            .ok()?;
566
567        Some(build_url(&base_uri, &processed_query?))
568    }
569
570    /// Enriches events with source_type, timestamp
571    fn enrich_events(&mut self, events: &mut Vec<Event>) {
572        let now = Utc::now();
573
574        for event in events {
575            match event {
576                Event::Log(log) => {
577                    self.log_namespace.insert_standard_vector_source_metadata(
578                        log,
579                        HttpClientConfig::NAME,
580                        now,
581                    );
582                }
583                Event::Metric(metric) => {
584                    if let Some(source_type_key) = log_schema().source_type_key() {
585                        metric.replace_tag(
586                            source_type_key.to_string(),
587                            HttpClientConfig::NAME.to_string(),
588                        );
589                    }
590                }
591                Event::Trace(trace) => {
592                    trace.maybe_insert(log_schema().source_type_key_target_path(), || {
593                        Bytes::from(HttpClientConfig::NAME).into()
594                    });
595                }
596            }
597        }
598    }
599}