vector/sources/http_client/
client.rs

1//! Generalized HTTP client source.
2//! Calls an endpoint at an interval, decoding the HTTP responses into events.
3
4use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14    TimeZone,
15    codecs::{
16        StreamDecodingError,
17        decoding::{DeserializerConfig, FramingConfig},
18    },
19    compile_vrl,
20    config::{LogNamespace, SourceOutput, log_schema},
21    configurable::configurable_component,
22    event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25    compiler::{CompileConfig, Function, Program, runtime::Runtime},
26    prelude::TypeState,
27};
28
29use crate::{
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext},
32    format_vrl_diagnostics,
33    http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34    serde::{default_decoding, default_framing_message_based},
35    sources,
36    sources::util::{
37        http::HttpMethod,
38        http_client,
39        http_client::{
40            GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41            default_timeout, warn_if_interval_too_low,
42        },
43    },
44    tls::{TlsConfig, TlsSettings},
45};
46
47/// Configuration for the `http_client` source.
48#[serde_as]
49#[configurable_component(source(
50    "http_client",
51    "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55    /// The HTTP endpoint to collect events from.
56    ///
57    /// The full path must be specified.
58    #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59    pub endpoint: String,
60
61    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
62    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
63    /// to a value lower than the scrape interval to prevent this from happening.
64    #[serde(default = "default_interval")]
65    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66    #[serde(rename = "scrape_interval_secs")]
67    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68    pub interval: Duration,
69
70    /// The timeout for each scrape request.
71    #[serde(default = "default_timeout")]
72    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73    #[serde(rename = "scrape_timeout_secs")]
74    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75    pub timeout: Duration,
76
77    /// Custom parameters for the HTTP request query string.
78    ///
79    /// One or more values for the same parameter key can be provided.
80    ///
81    /// The parameters provided in this option are appended to any parameters
82    /// manually provided in the `endpoint` option.
83    ///
84    /// VRL functions are supported within query parameters. You can
85    /// use functions like `now()` to dynamically modify query
86    /// parameter values.
87    #[serde(default)]
88    #[configurable(metadata(
89        docs::additional_props_description = "A query string parameter and its value(s)."
90    ))]
91    #[configurable(metadata(docs::examples = "query_examples()"))]
92    pub query: QueryParameters,
93
94    #[configurable(derived)]
95    #[serde(default = "default_decoding")]
96    pub decoding: DeserializerConfig,
97
98    /// Framing to use in the decoding.
99    #[configurable(derived)]
100    #[serde(default = "default_framing_message_based")]
101    pub framing: FramingConfig,
102
103    /// Headers to apply to the HTTP requests.
104    ///
105    /// One or more values for the same header can be provided.
106    #[serde(default)]
107    #[configurable(metadata(
108        docs::additional_props_description = "An HTTP request header and its value(s)."
109    ))]
110    #[configurable(metadata(docs::examples = "headers_examples()"))]
111    pub headers: HashMap<String, Vec<String>>,
112
113    /// Specifies the method of the HTTP request.
114    #[serde(default = "default_http_method")]
115    pub method: HttpMethod,
116
117    /// Raw data to send as the HTTP request body.
118    ///
119    /// Can be a static string or a VRL expression.
120    ///
121    /// When a body is provided, the `Content-Type` header is automatically set to
122    /// `application/json` unless explicitly overridden in the `headers` configuration.
123    #[serde(default)]
124    pub body: Option<ParameterValue>,
125
126    /// TLS configuration.
127    #[configurable(derived)]
128    pub tls: Option<TlsConfig>,
129
130    /// HTTP Authentication.
131    #[configurable(derived)]
132    pub auth: Option<Auth>,
133
134    /// The namespace to use for logs. This overrides the global setting.
135    #[configurable(metadata(docs::hidden))]
136    #[serde(default)]
137    pub log_namespace: Option<bool>,
138}
139
140const fn default_http_method() -> HttpMethod {
141    HttpMethod::Get
142}
143
144fn query_examples() -> QueryParameters {
145    HashMap::<_, _>::from_iter([
146        (
147            "field".to_owned(),
148            QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
149        ),
150        (
151            "fruit".to_owned(),
152            QueryParameterValue::MultiParams(vec![
153                ParameterValue::String("mango".to_owned()),
154                ParameterValue::String("papaya".to_owned()),
155                ParameterValue::String("kiwi".to_owned()),
156            ]),
157        ),
158        (
159            "start_time".to_owned(),
160            QueryParameterValue::SingleParam(ParameterValue::Typed {
161                value: "now()".to_owned(),
162                r#type: ParamType::Vrl,
163            }),
164        ),
165    ])
166}
167
168fn headers_examples() -> HashMap<String, Vec<String>> {
169    HashMap::<_, _>::from_iter([
170        (
171            "Accept".to_owned(),
172            vec!["text/plain".to_owned(), "text/html".to_owned()],
173        ),
174        (
175            "X-My-Custom-Header".to_owned(),
176            vec![
177                "a".to_owned(),
178                "vector".to_owned(),
179                "of".to_owned(),
180                "values".to_owned(),
181            ],
182        ),
183    ])
184}
185
186/// Helper function to compile a VRL parameter value into a Program
187fn compile_parameter_vrl(
188    param: &ParameterValue,
189    functions: &[Box<dyn Function>],
190) -> Result<Option<Program>, sources::BuildError> {
191    if !param.is_vrl() {
192        return Ok(None);
193    }
194
195    let state = TypeState::default();
196    let config = CompileConfig::default();
197
198    match compile_vrl(param.value(), functions, &state, config) {
199        Ok(compilation_result) => {
200            if !compilation_result.warnings.is_empty() {
201                let warnings = format_vrl_diagnostics(param.value(), compilation_result.warnings);
202                warn!(message = "VRL compilation warnings.", %warnings);
203            }
204            Ok(Some(compilation_result.program))
205        }
206        Err(diagnostics) => {
207            let error = format_vrl_diagnostics(param.value(), diagnostics);
208            Err(sources::BuildError::VrlCompilationError {
209                message: format!("VRL compilation failed: {}", error),
210            })
211        }
212    }
213}
214
215impl Default for HttpClientConfig {
216    fn default() -> Self {
217        Self {
218            endpoint: "http://localhost:9898/logs".to_string(),
219            query: HashMap::new(),
220            interval: default_interval(),
221            timeout: default_timeout(),
222            decoding: default_decoding(),
223            framing: default_framing_message_based(),
224            headers: HashMap::new(),
225            method: default_http_method(),
226            body: None,
227            tls: None,
228            auth: None,
229            log_namespace: None,
230        }
231    }
232}
233
234impl_generate_config_from_default!(HttpClientConfig);
235
236#[derive(Clone)]
237pub struct CompiledParam {
238    value: String,
239    program: Option<Program>,
240}
241
242#[derive(Clone)]
243pub enum CompiledQueryParameterValue {
244    SingleParam(Box<CompiledParam>),
245    MultiParams(Vec<CompiledParam>),
246}
247
248impl CompiledQueryParameterValue {
249    fn has_vrl(&self) -> bool {
250        match self {
251            CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
252            CompiledQueryParameterValue::MultiParams(params) => {
253                params.iter().any(|p| p.program.is_some())
254            }
255        }
256    }
257}
258
259#[derive(Clone)]
260pub struct Query {
261    original: HashMap<String, QueryParameterValue>,
262    compiled: HashMap<String, CompiledQueryParameterValue>,
263    has_vrl: bool,
264}
265
266impl Query {
267    pub fn new(params: &HashMap<String, QueryParameterValue>) -> Result<Self, sources::BuildError> {
268        let functions = vector_vrl_functions::all();
269
270        let mut compiled: HashMap<String, CompiledQueryParameterValue> = HashMap::new();
271
272        for (k, v) in params.iter() {
273            let compiled_param = Self::compile_param(v, &functions)?;
274            compiled.insert(k.clone(), compiled_param);
275        }
276
277        let has_vrl = compiled.values().any(|v| v.has_vrl());
278
279        Ok(Query {
280            original: params.clone(),
281            compiled,
282            has_vrl,
283        })
284    }
285
286    fn compile_value(
287        param: &ParameterValue,
288        functions: &[Box<dyn Function>],
289    ) -> Result<CompiledParam, sources::BuildError> {
290        let program = compile_parameter_vrl(param, functions)?;
291
292        Ok(CompiledParam {
293            value: param.value().to_string(),
294            program,
295        })
296    }
297
298    fn compile_param(
299        value: &QueryParameterValue,
300        functions: &[Box<dyn Function>],
301    ) -> Result<CompiledQueryParameterValue, sources::BuildError> {
302        match value {
303            QueryParameterValue::SingleParam(param) => {
304                Ok(CompiledQueryParameterValue::SingleParam(Box::new(
305                    Self::compile_value(param, functions)?,
306                )))
307            }
308            QueryParameterValue::MultiParams(params) => {
309                let compiled = params
310                    .iter()
311                    .map(|p| Self::compile_value(p, functions))
312                    .collect::<Result<Vec<_>, _>>()?;
313                Ok(CompiledQueryParameterValue::MultiParams(compiled))
314            }
315        }
316    }
317}
318
319#[async_trait::async_trait]
320#[typetag::serde(name = "http_client")]
321impl SourceConfig for HttpClientConfig {
322    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
323        let query = Query::new(&self.query)?;
324        let functions = vector_vrl_functions::all();
325
326        // Compile body if present
327        let body = self
328            .body
329            .as_ref()
330            .map(|body_param| -> Result<CompiledParam, sources::BuildError> {
331                let program = compile_parameter_vrl(body_param, &functions)?;
332                Ok(CompiledParam {
333                    value: body_param.value().to_string(),
334                    program,
335                })
336            })
337            .transpose()?;
338
339        // Build the base URLs
340        let endpoints = [self.endpoint.clone()];
341        let urls: Vec<Uri> = endpoints
342            .iter()
343            .map(|s| {
344                let uri = s.parse::<Uri>().context(sources::UriParseSnafu)?;
345                // For URLs with VRL expressions, add query parameters dynamically during request
346                // For URLs without VRL expressions, add query parameters now
347                Ok(if query.has_vrl {
348                    uri
349                } else {
350                    build_url(&uri, &query.original)
351                })
352            })
353            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
354
355        let tls = TlsSettings::from_options(self.tls.as_ref())?;
356
357        let log_namespace = cx.log_namespace(self.log_namespace);
358
359        // build the decoder
360        let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
361
362        let content_type = self.decoding.content_type(&self.framing).to_string();
363
364        // Create context with the config for dynamic query parameter and body evaluation
365        let context = HttpClientContext {
366            decoder,
367            log_namespace,
368            query,
369            body,
370        };
371
372        warn_if_interval_too_low(self.timeout, self.interval);
373
374        let inputs = GenericHttpClientInputs {
375            urls,
376            interval: self.interval,
377            timeout: self.timeout,
378            headers: self.headers.clone(),
379            content_type,
380            auth: self.auth.clone(),
381            tls,
382            proxy: cx.proxy.clone(),
383            shutdown: cx.shutdown,
384        };
385
386        Ok(call(inputs, context, cx.out, self.method).boxed())
387    }
388
389    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
390        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
391        // and is merged here.
392        let log_namespace = global_log_namespace.merge(self.log_namespace);
393
394        let schema_definition = self
395            .decoding
396            .schema_definition(log_namespace)
397            .with_standard_vector_source_metadata();
398
399        vec![SourceOutput::new_maybe_logs(
400            self.decoding.output_type(),
401            schema_definition,
402        )]
403    }
404
405    fn can_acknowledge(&self) -> bool {
406        false
407    }
408}
409
410impl HttpClientConfig {
411    pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
412        let decoding = self.decoding.clone();
413        let framing = self.framing.clone();
414        let log_namespace =
415            log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
416
417        DecodingConfig::new(framing, decoding, log_namespace)
418    }
419}
420
421/// Captures the configuration options required to decode the incoming requests into events.
422#[derive(Clone)]
423pub struct HttpClientContext {
424    pub decoder: Decoder,
425    pub log_namespace: LogNamespace,
426    query: Query,
427    body: Option<CompiledParam>,
428}
429
430impl HttpClientContext {
431    /// Decode the events from the byte buffer
432    fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
433        let mut events = Vec::new();
434        loop {
435            match self.decoder.decode_eof(buf) {
436                Ok(Some((next, _))) => {
437                    events.extend(next);
438                }
439                Ok(None) => break,
440                Err(error) => {
441                    // Error is logged by `vector_lib::codecs::Decoder`, no further
442                    // handling is needed here.
443                    if !error.can_continue() {
444                        break;
445                    }
446                    break;
447                }
448            }
449        }
450        events
451    }
452}
453
454impl HttpClientBuilder for HttpClientContext {
455    type Context = HttpClientContext;
456
457    /// No additional context from request data is needed from this particular client.
458    fn build(&self, _uri: &Uri) -> Self::Context {
459        self.clone()
460    }
461}
462
463fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
464    let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
465    let timezone = TimeZone::default();
466
467    Runtime::default()
468        .resolve(&mut target, program, &timezone)
469        .map_err(|error| {
470            warn!(message = "VRL runtime error.", source = %value, %error);
471        })
472        .ok()
473        .and_then(|vrl_value| {
474            let json_value = serde_json::to_value(vrl_value).ok()?;
475
476            // Properly handle VRL values, so that key1: `upcase("foo")` will resolve
477            // properly as endpoint.com/key1=FOO and not endpoint.com/key1="FOO"
478            // similarly, `now()` should resolve to endpoint.com/key1=2025-06-07T10:39:08.662735Z
479            // and not endpoint.com/key1=t'2025-06-07T10:39:08.662735Z'
480            let resolved_string = match json_value {
481                serde_json::Value::String(s) => s,
482                value => value.to_string(),
483            };
484            Some(resolved_string)
485        })
486}
487
488/// Resolve a compiled parameter, handling VRL evaluation if present
489fn resolve_compiled_param(compiled: &CompiledParam) -> Option<String> {
490    match &compiled.program {
491        Some(program) => resolve_vrl(&compiled.value, program),
492        None => Some(compiled.value.clone()),
493    }
494}
495
496impl http_client::HttpClientContext for HttpClientContext {
497    /// Decodes the HTTP response body into events per the decoder configured.
498    fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
499        // get the body into a byte array
500        let mut buf = BytesMut::new();
501        buf.extend_from_slice(body);
502
503        let events = self.decode_events(&mut buf);
504
505        Some(events)
506    }
507
508    /// Get the request body to send with the HTTP request
509    fn get_request_body(&self) -> Option<String> {
510        self.body.as_ref().and_then(resolve_compiled_param)
511    }
512
513    /// Process the URL dynamically before each request
514    fn process_url(&self, url: &Uri) -> Option<Uri> {
515        if !self.query.has_vrl {
516            return None;
517        }
518
519        // Resolve all query parameters with VRL expressions
520        let processed_query: Option<HashMap<_, _>> = self
521            .query
522            .compiled
523            .iter()
524            .map(|(name, value)| {
525                let resolved = match value {
526                    CompiledQueryParameterValue::SingleParam(param) => {
527                        let result = resolve_compiled_param(param)?;
528                        QueryParameterValue::SingleParam(ParameterValue::String(result))
529                    }
530                    CompiledQueryParameterValue::MultiParams(params) => {
531                        let results: Option<Vec<_>> = params
532                            .iter()
533                            .map(|p| resolve_compiled_param(p).map(ParameterValue::String))
534                            .collect();
535                        QueryParameterValue::MultiParams(results?)
536                    }
537                };
538                Some((name.clone(), resolved))
539            })
540            .collect();
541
542        // Build base URI and add query parameters
543        let base_uri = Uri::builder()
544            .scheme(
545                url.scheme()
546                    .cloned()
547                    .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
548            )
549            .authority(
550                url.authority()
551                    .cloned()
552                    .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
553            )
554            .path_and_query(url.path().to_string())
555            .build()
556            .ok()?;
557
558        Some(build_url(&base_uri, &processed_query?))
559    }
560
561    /// Enriches events with source_type, timestamp
562    fn enrich_events(&mut self, events: &mut Vec<Event>) {
563        let now = Utc::now();
564
565        for event in events {
566            match event {
567                Event::Log(log) => {
568                    self.log_namespace.insert_standard_vector_source_metadata(
569                        log,
570                        HttpClientConfig::NAME,
571                        now,
572                    );
573                }
574                Event::Metric(metric) => {
575                    if let Some(source_type_key) = log_schema().source_type_key() {
576                        metric.replace_tag(
577                            source_type_key.to_string(),
578                            HttpClientConfig::NAME.to_string(),
579                        );
580                    }
581                }
582                Event::Trace(trace) => {
583                    trace.maybe_insert(log_schema().source_type_key_target_path(), || {
584                        Bytes::from(HttpClientConfig::NAME).into()
585                    });
586                }
587            }
588        }
589    }
590}