vector/sources/http_client/
client.rs

1//! Generalized HTTP client source.
2//! Calls an endpoint at an interval, decoding the HTTP responses into events.
3
4use bytes::{Bytes, BytesMut};
5use chrono::Utc;
6use futures_util::FutureExt;
7use http::{response::Parts, Uri};
8use serde_with::serde_as;
9use snafu::ResultExt;
10use std::{collections::HashMap, time::Duration};
11use tokio_util::codec::Decoder as _;
12use vrl::diagnostic::Formatter;
13
14use crate::http::{ParamType, ParameterValue, QueryParameterValue, QueryParameters};
15use crate::sources::util::http_client;
16use crate::{
17    codecs::{Decoder, DecodingConfig},
18    config::{SourceConfig, SourceContext},
19    http::Auth,
20    serde::{default_decoding, default_framing_message_based},
21    sources,
22    sources::util::{
23        http::HttpMethod,
24        http_client::{
25            build_url, call, default_interval, default_timeout, warn_if_interval_too_low,
26            GenericHttpClientInputs, HttpClientBuilder,
27        },
28    },
29    tls::{TlsConfig, TlsSettings},
30};
31use vector_lib::codecs::{
32    decoding::{DeserializerConfig, FramingConfig},
33    StreamDecodingError,
34};
35use vector_lib::config::{log_schema, LogNamespace, SourceOutput};
36use vector_lib::configurable::configurable_component;
37use vector_lib::{
38    compile_vrl,
39    event::{Event, LogEvent, VrlTarget},
40    TimeZone,
41};
42use vrl::{
43    compiler::{runtime::Runtime, CompileConfig, Function, Program},
44    prelude::TypeState,
45};
46
47/// Configuration for the `http_client` source.
48#[serde_as]
49#[configurable_component(source(
50    "http_client",
51    "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55    /// The HTTP endpoint to collect events from.
56    ///
57    /// The full path must be specified.
58    #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59    pub endpoint: String,
60
61    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
62    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
63    /// to a value lower than the scrape interval to prevent this from happening.
64    #[serde(default = "default_interval")]
65    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66    #[serde(rename = "scrape_interval_secs")]
67    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68    pub interval: Duration,
69
70    /// The timeout for each scrape request.
71    #[serde(default = "default_timeout")]
72    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73    #[serde(rename = "scrape_timeout_secs")]
74    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75    pub timeout: Duration,
76
77    /// Custom parameters for the HTTP request query string.
78    ///
79    /// One or more values for the same parameter key can be provided.
80    ///
81    /// The parameters provided in this option are appended to any parameters
82    /// manually provided in the `endpoint` option.
83    ///
84    /// VRL functions are supported within query parameters. You can
85    /// use functions like `now()` to dynamically modify query
86    /// parameter values.
87    #[serde(default)]
88    #[configurable(metadata(
89        docs::additional_props_description = "A query string parameter and its value(s)."
90    ))]
91    #[configurable(metadata(docs::examples = "query_examples()"))]
92    pub query: QueryParameters,
93
94    #[configurable(derived)]
95    #[serde(default = "default_decoding")]
96    pub decoding: DeserializerConfig,
97
98    /// Framing to use in the decoding.
99    #[configurable(derived)]
100    #[serde(default = "default_framing_message_based")]
101    pub framing: FramingConfig,
102
103    /// Headers to apply to the HTTP requests.
104    ///
105    /// One or more values for the same header can be provided.
106    #[serde(default)]
107    #[configurable(metadata(
108        docs::additional_props_description = "An HTTP request header and its value(s)."
109    ))]
110    #[configurable(metadata(docs::examples = "headers_examples()"))]
111    pub headers: HashMap<String, Vec<String>>,
112
113    /// Specifies the method of the HTTP request.
114    #[serde(default = "default_http_method")]
115    pub method: HttpMethod,
116
117    /// TLS configuration.
118    #[configurable(derived)]
119    pub tls: Option<TlsConfig>,
120
121    /// HTTP Authentication.
122    #[configurable(derived)]
123    pub auth: Option<Auth>,
124
125    /// The namespace to use for logs. This overrides the global setting.
126    #[configurable(metadata(docs::hidden))]
127    #[serde(default)]
128    pub log_namespace: Option<bool>,
129}
130
131const fn default_http_method() -> HttpMethod {
132    HttpMethod::Get
133}
134
135fn query_examples() -> QueryParameters {
136    HashMap::<_, _>::from_iter([
137        (
138            "field".to_owned(),
139            QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
140        ),
141        (
142            "fruit".to_owned(),
143            QueryParameterValue::MultiParams(vec![
144                ParameterValue::String("mango".to_owned()),
145                ParameterValue::String("papaya".to_owned()),
146                ParameterValue::String("kiwi".to_owned()),
147            ]),
148        ),
149        (
150            "start_time".to_owned(),
151            QueryParameterValue::SingleParam(ParameterValue::Typed {
152                value: "now()".to_owned(),
153                r#type: ParamType::Vrl,
154            }),
155        ),
156    ])
157}
158
159fn headers_examples() -> HashMap<String, Vec<String>> {
160    HashMap::<_, _>::from_iter([
161        (
162            "Accept".to_owned(),
163            vec!["text/plain".to_owned(), "text/html".to_owned()],
164        ),
165        (
166            "X-My-Custom-Header".to_owned(),
167            vec![
168                "a".to_owned(),
169                "vector".to_owned(),
170                "of".to_owned(),
171                "values".to_owned(),
172            ],
173        ),
174    ])
175}
176
177impl Default for HttpClientConfig {
178    fn default() -> Self {
179        Self {
180            endpoint: "http://localhost:9898/logs".to_string(),
181            query: HashMap::new(),
182            interval: default_interval(),
183            timeout: default_timeout(),
184            decoding: default_decoding(),
185            framing: default_framing_message_based(),
186            headers: HashMap::new(),
187            method: default_http_method(),
188            tls: None,
189            auth: None,
190            log_namespace: None,
191        }
192    }
193}
194
195impl_generate_config_from_default!(HttpClientConfig);
196
197#[derive(Clone)]
198pub struct CompiledParam {
199    value: String,
200    program: Option<Program>,
201}
202
203#[derive(Clone)]
204pub enum CompiledQueryParameterValue {
205    SingleParam(Box<CompiledParam>),
206    MultiParams(Vec<CompiledParam>),
207}
208
209#[derive(Clone)]
210pub struct Query {
211    original: HashMap<String, QueryParameterValue>,
212    compiled: HashMap<String, CompiledQueryParameterValue>,
213    has_vrl: bool,
214}
215
216impl Query {
217    pub fn new(params: &HashMap<String, QueryParameterValue>) -> Self {
218        let functions = vrl::stdlib::all()
219            .into_iter()
220            .chain(vector_lib::enrichment::vrl_functions())
221            .chain(vector_vrl_functions::all())
222            .collect::<Vec<_>>();
223
224        let compiled: HashMap<String, CompiledQueryParameterValue> = params
225            .iter()
226            .map(|(k, v)| (k.clone(), Self::compile_param(v, &functions)))
227            .collect();
228
229        let has_vrl = compiled.values().any(|compiled| match compiled {
230            CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
231            CompiledQueryParameterValue::MultiParams(params) => {
232                params.iter().any(|p| p.program.is_some())
233            }
234        });
235
236        Query {
237            original: params.clone(),
238            compiled,
239            has_vrl,
240        }
241    }
242
243    fn compile_value(param: &ParameterValue, functions: &[Box<dyn Function>]) -> CompiledParam {
244        let program = if param.is_vrl() {
245            let state = TypeState::default();
246            let config = CompileConfig::default();
247
248            match compile_vrl(param.value(), functions, &state, config) {
249                Ok(compilation_result) => {
250                    if !compilation_result.warnings.is_empty() {
251                        let warnings = Formatter::new(param.value(), compilation_result.warnings)
252                            .colored()
253                            .to_string();
254                        warn!(message = "VRL compilation warnings.", %warnings, internal_log_rate_limit = true);
255                    }
256                    Some(compilation_result.program)
257                }
258                Err(diagnostics) => {
259                    let error = Formatter::new(param.value(), diagnostics)
260                        .colored()
261                        .to_string();
262                    warn!(message = "VRL compilation failed.", %error, internal_log_rate_limit = true);
263                    None
264                }
265            }
266        } else {
267            None
268        };
269
270        CompiledParam {
271            value: param.value().to_string(),
272            program,
273        }
274    }
275
276    fn compile_param(
277        value: &QueryParameterValue,
278        functions: &[Box<dyn Function>],
279    ) -> CompiledQueryParameterValue {
280        match value {
281            QueryParameterValue::SingleParam(param) => CompiledQueryParameterValue::SingleParam(
282                Box::new(Self::compile_value(param, functions)),
283            ),
284            QueryParameterValue::MultiParams(params) => {
285                let compiled = params
286                    .iter()
287                    .map(|p| Self::compile_value(p, functions))
288                    .collect();
289                CompiledQueryParameterValue::MultiParams(compiled)
290            }
291        }
292    }
293}
294
295#[async_trait::async_trait]
296#[typetag::serde(name = "http_client")]
297impl SourceConfig for HttpClientConfig {
298    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
299        let query = Query::new(&self.query.clone());
300
301        // Build the base URLs
302        let endpoints = [self.endpoint.clone()];
303        let urls: Vec<Uri> = endpoints
304            .iter()
305            .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
306            .map(|r| {
307                if query.has_vrl {
308                    // For URLs with VRL expressions, don't add query parameters here
309                    // They'll be added dynamically during the HTTP request
310                    r
311                } else {
312                    // For URLs without VRL expressions, add query parameters now
313                    r.map(|uri| build_url(&uri, &query.original))
314                }
315            })
316            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
317
318        let tls = TlsSettings::from_options(self.tls.as_ref())?;
319
320        let log_namespace = cx.log_namespace(self.log_namespace);
321
322        // build the decoder
323        let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
324
325        let content_type = self.decoding.content_type(&self.framing).to_string();
326
327        // Create context with the config for dynamic query parameter evaluation
328        let context = HttpClientContext {
329            decoder,
330            log_namespace,
331            query,
332        };
333
334        warn_if_interval_too_low(self.timeout, self.interval);
335
336        let inputs = GenericHttpClientInputs {
337            urls,
338            interval: self.interval,
339            timeout: self.timeout,
340            headers: self.headers.clone(),
341            content_type,
342            auth: self.auth.clone(),
343            tls,
344            proxy: cx.proxy.clone(),
345            shutdown: cx.shutdown,
346        };
347
348        Ok(call(inputs, context, cx.out, self.method).boxed())
349    }
350
351    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
352        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
353        // and is merged here.
354        let log_namespace = global_log_namespace.merge(self.log_namespace);
355
356        let schema_definition = self
357            .decoding
358            .schema_definition(log_namespace)
359            .with_standard_vector_source_metadata();
360
361        vec![SourceOutput::new_maybe_logs(
362            self.decoding.output_type(),
363            schema_definition,
364        )]
365    }
366
367    fn can_acknowledge(&self) -> bool {
368        false
369    }
370}
371
372impl HttpClientConfig {
373    pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
374        let decoding = self.decoding.clone();
375        let framing = self.framing.clone();
376        let log_namespace =
377            log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
378
379        DecodingConfig::new(framing, decoding, log_namespace)
380    }
381}
382
383/// Captures the configuration options required to decode the incoming requests into events.
384#[derive(Clone)]
385pub struct HttpClientContext {
386    pub decoder: Decoder,
387    pub log_namespace: LogNamespace,
388    query: Query,
389}
390
391impl HttpClientContext {
392    /// Decode the events from the byte buffer
393    fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
394        let mut events = Vec::new();
395        loop {
396            match self.decoder.decode_eof(buf) {
397                Ok(Some((next, _))) => {
398                    events.extend(next);
399                }
400                Ok(None) => break,
401                Err(error) => {
402                    // Error is logged by `crate::codecs::Decoder`, no further
403                    // handling is needed here.
404                    if !error.can_continue() {
405                        break;
406                    }
407                    break;
408                }
409            }
410        }
411        events
412    }
413}
414
415impl HttpClientBuilder for HttpClientContext {
416    type Context = HttpClientContext;
417
418    /// No additional context from request data is needed from this particular client.
419    fn build(&self, _uri: &Uri) -> Self::Context {
420        self.clone()
421    }
422}
423
424fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
425    let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
426    let timezone = TimeZone::default();
427
428    Runtime::default()
429        .resolve(&mut target, program, &timezone)
430        .map_err(|error| {
431            warn!(message = "VRL runtime error.", source = %value, %error, internal_log_rate_limit = true);
432        })
433        .ok()
434        .and_then(|vrl_value| {
435            let json_value = serde_json::to_value(vrl_value).ok()?;
436
437            // Properly handle VRL values, so that key1: `upcase("foo")` will resolve
438            // properly as endpoint.com/key1=FOO and not endpoint.com/key1="FOO"
439            // similarly, `now()` should resolve to endpoint.com/key1=2025-06-07T10:39:08.662735Z
440            // and not endpoint.com/key1=t'2025-06-07T10:39:08.662735Z'
441            let resolved_string = match json_value {
442                serde_json::Value::String(s) => s,
443                value => value.to_string(),
444            };
445            Some(resolved_string)
446        })
447}
448
449impl http_client::HttpClientContext for HttpClientContext {
450    /// Decodes the HTTP response body into events per the decoder configured.
451    fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
452        // get the body into a byte array
453        let mut buf = BytesMut::new();
454        buf.extend_from_slice(body);
455
456        let events = self.decode_events(&mut buf);
457
458        Some(events)
459    }
460
461    /// Process the URL dynamically before each request
462    fn process_url(&self, url: &Uri) -> Option<Uri> {
463        // Early exit if there is no VRL to process
464        let query: &Query = &self.query;
465        if !query.has_vrl {
466            return None;
467        }
468
469        let mut processed_query = HashMap::new();
470
471        for (param_name, compiled_value) in &query.compiled {
472            match compiled_value {
473                CompiledQueryParameterValue::SingleParam(compiled_param) => {
474                    let result = match &compiled_param.program {
475                        Some(prog) => resolve_vrl(&compiled_param.value, prog)?,
476                        None => compiled_param.value.clone(),
477                    };
478
479                    processed_query.insert(
480                        param_name.clone(),
481                        QueryParameterValue::SingleParam(ParameterValue::String(result)),
482                    );
483                }
484                CompiledQueryParameterValue::MultiParams(compiled_params) => {
485                    let mut results = Vec::new();
486
487                    for param in compiled_params {
488                        let result = match &param.program {
489                            Some(p) => resolve_vrl(&param.value, p)?,
490                            None => param.value.clone(),
491                        };
492                        results.push(ParameterValue::String(result));
493                    }
494
495                    processed_query.insert(
496                        param_name.clone(),
497                        QueryParameterValue::MultiParams(results),
498                    );
499                }
500            };
501        }
502
503        // Extract the base URI without query parameters to avoid parameter duplication
504        let base_uri = Uri::builder()
505            .scheme(
506                url.scheme()
507                    .cloned()
508                    .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
509            )
510            .authority(
511                url.authority()
512                    .cloned()
513                    .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
514            )
515            .path_and_query(url.path().to_string())
516            .build()
517            .ok()?;
518
519        Some(build_url(&base_uri, &processed_query))
520    }
521
522    /// Enriches events with source_type, timestamp
523    fn enrich_events(&mut self, events: &mut Vec<Event>) {
524        let now = Utc::now();
525
526        for event in events {
527            match event {
528                Event::Log(log) => {
529                    self.log_namespace.insert_standard_vector_source_metadata(
530                        log,
531                        HttpClientConfig::NAME,
532                        now,
533                    );
534                }
535                Event::Metric(metric) => {
536                    if let Some(source_type_key) = log_schema().source_type_key() {
537                        metric.replace_tag(
538                            source_type_key.to_string(),
539                            HttpClientConfig::NAME.to_string(),
540                        );
541                    }
542                }
543                Event::Trace(trace) => {
544                    trace.maybe_insert(log_schema().source_type_key_target_path(), || {
545                        Bytes::from(HttpClientConfig::NAME).into()
546                    });
547                }
548            }
549        }
550    }
551}