vector/sources/http_client/
client.rs

1//! Generalized HTTP client source.
2//! Calls an endpoint at an interval, decoding the HTTP responses into events.
3
4use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14    TimeZone,
15    codecs::{
16        StreamDecodingError,
17        decoding::{DeserializerConfig, FramingConfig},
18    },
19    compile_vrl,
20    config::{LogNamespace, SourceOutput, log_schema},
21    configurable::configurable_component,
22    event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25    compiler::{CompileConfig, Function, Program, runtime::Runtime},
26    prelude::TypeState,
27};
28
29use crate::{
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext},
32    format_vrl_diagnostics,
33    http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34    serde::{default_decoding, default_framing_message_based},
35    sources,
36    sources::util::{
37        http::HttpMethod,
38        http_client,
39        http_client::{
40            GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41            default_timeout, warn_if_interval_too_low,
42        },
43    },
44    tls::{TlsConfig, TlsSettings},
45};
46
47/// Configuration for the `http_client` source.
48#[serde_as]
49#[configurable_component(source(
50    "http_client",
51    "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55    /// The HTTP endpoint to collect events from.
56    ///
57    /// The full path must be specified.
58    #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59    pub endpoint: String,
60
61    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
62    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
63    /// to a value lower than the scrape interval to prevent this from happening.
64    #[serde(default = "default_interval")]
65    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66    #[serde(rename = "scrape_interval_secs")]
67    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68    pub interval: Duration,
69
70    /// The timeout for each scrape request.
71    #[serde(default = "default_timeout")]
72    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73    #[serde(rename = "scrape_timeout_secs")]
74    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75    pub timeout: Duration,
76
77    /// Custom parameters for the HTTP request query string.
78    ///
79    /// One or more values for the same parameter key can be provided.
80    ///
81    /// The parameters provided in this option are appended to any parameters
82    /// manually provided in the `endpoint` option.
83    ///
84    /// VRL functions are supported within query parameters. You can
85    /// use functions like `now()` to dynamically modify query
86    /// parameter values.
87    #[serde(default)]
88    #[configurable(metadata(
89        docs::additional_props_description = "A query string parameter and its value(s)."
90    ))]
91    #[configurable(metadata(docs::examples = "query_examples()"))]
92    pub query: QueryParameters,
93
94    #[configurable(derived)]
95    #[serde(default = "default_decoding")]
96    pub decoding: DeserializerConfig,
97
98    /// Framing to use in the decoding.
99    #[configurable(derived)]
100    #[serde(default = "default_framing_message_based")]
101    pub framing: FramingConfig,
102
103    /// Headers to apply to the HTTP requests.
104    ///
105    /// One or more values for the same header can be provided.
106    #[serde(default)]
107    #[configurable(metadata(
108        docs::additional_props_description = "An HTTP request header and its value(s)."
109    ))]
110    #[configurable(metadata(docs::examples = "headers_examples()"))]
111    pub headers: HashMap<String, Vec<String>>,
112
113    /// Specifies the method of the HTTP request.
114    #[serde(default = "default_http_method")]
115    pub method: HttpMethod,
116
117    /// TLS configuration.
118    #[configurable(derived)]
119    pub tls: Option<TlsConfig>,
120
121    /// HTTP Authentication.
122    #[configurable(derived)]
123    pub auth: Option<Auth>,
124
125    /// The namespace to use for logs. This overrides the global setting.
126    #[configurable(metadata(docs::hidden))]
127    #[serde(default)]
128    pub log_namespace: Option<bool>,
129}
130
131const fn default_http_method() -> HttpMethod {
132    HttpMethod::Get
133}
134
135fn query_examples() -> QueryParameters {
136    HashMap::<_, _>::from_iter([
137        (
138            "field".to_owned(),
139            QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
140        ),
141        (
142            "fruit".to_owned(),
143            QueryParameterValue::MultiParams(vec![
144                ParameterValue::String("mango".to_owned()),
145                ParameterValue::String("papaya".to_owned()),
146                ParameterValue::String("kiwi".to_owned()),
147            ]),
148        ),
149        (
150            "start_time".to_owned(),
151            QueryParameterValue::SingleParam(ParameterValue::Typed {
152                value: "now()".to_owned(),
153                r#type: ParamType::Vrl,
154            }),
155        ),
156    ])
157}
158
159fn headers_examples() -> HashMap<String, Vec<String>> {
160    HashMap::<_, _>::from_iter([
161        (
162            "Accept".to_owned(),
163            vec!["text/plain".to_owned(), "text/html".to_owned()],
164        ),
165        (
166            "X-My-Custom-Header".to_owned(),
167            vec![
168                "a".to_owned(),
169                "vector".to_owned(),
170                "of".to_owned(),
171                "values".to_owned(),
172            ],
173        ),
174    ])
175}
176
177impl Default for HttpClientConfig {
178    fn default() -> Self {
179        Self {
180            endpoint: "http://localhost:9898/logs".to_string(),
181            query: HashMap::new(),
182            interval: default_interval(),
183            timeout: default_timeout(),
184            decoding: default_decoding(),
185            framing: default_framing_message_based(),
186            headers: HashMap::new(),
187            method: default_http_method(),
188            tls: None,
189            auth: None,
190            log_namespace: None,
191        }
192    }
193}
194
195impl_generate_config_from_default!(HttpClientConfig);
196
197#[derive(Clone)]
198pub struct CompiledParam {
199    value: String,
200    program: Option<Program>,
201}
202
203#[derive(Clone)]
204pub enum CompiledQueryParameterValue {
205    SingleParam(Box<CompiledParam>),
206    MultiParams(Vec<CompiledParam>),
207}
208
209#[derive(Clone)]
210pub struct Query {
211    original: HashMap<String, QueryParameterValue>,
212    compiled: HashMap<String, CompiledQueryParameterValue>,
213    has_vrl: bool,
214}
215
216impl Query {
217    pub fn new(params: &HashMap<String, QueryParameterValue>) -> Self {
218        let functions = vrl::stdlib::all()
219            .into_iter()
220            .chain(vector_lib::enrichment::vrl_functions())
221            .chain(vector_vrl_functions::all())
222            .collect::<Vec<_>>();
223
224        let compiled: HashMap<String, CompiledQueryParameterValue> = params
225            .iter()
226            .map(|(k, v)| (k.clone(), Self::compile_param(v, &functions)))
227            .collect();
228
229        let has_vrl = compiled.values().any(|compiled| match compiled {
230            CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
231            CompiledQueryParameterValue::MultiParams(params) => {
232                params.iter().any(|p| p.program.is_some())
233            }
234        });
235
236        Query {
237            original: params.clone(),
238            compiled,
239            has_vrl,
240        }
241    }
242
243    fn compile_value(param: &ParameterValue, functions: &[Box<dyn Function>]) -> CompiledParam {
244        let program = if param.is_vrl() {
245            let state = TypeState::default();
246            let config = CompileConfig::default();
247
248            match compile_vrl(param.value(), functions, &state, config) {
249                Ok(compilation_result) => {
250                    if !compilation_result.warnings.is_empty() {
251                        let warnings =
252                            format_vrl_diagnostics(param.value(), compilation_result.warnings);
253                        warn!(message = "VRL compilation warnings.", %warnings);
254                    }
255                    Some(compilation_result.program)
256                }
257                Err(diagnostics) => {
258                    let error = format_vrl_diagnostics(param.value(), diagnostics);
259                    warn!(message = "VRL compilation failed.", %error);
260                    None
261                }
262            }
263        } else {
264            None
265        };
266
267        CompiledParam {
268            value: param.value().to_string(),
269            program,
270        }
271    }
272
273    fn compile_param(
274        value: &QueryParameterValue,
275        functions: &[Box<dyn Function>],
276    ) -> CompiledQueryParameterValue {
277        match value {
278            QueryParameterValue::SingleParam(param) => CompiledQueryParameterValue::SingleParam(
279                Box::new(Self::compile_value(param, functions)),
280            ),
281            QueryParameterValue::MultiParams(params) => {
282                let compiled = params
283                    .iter()
284                    .map(|p| Self::compile_value(p, functions))
285                    .collect();
286                CompiledQueryParameterValue::MultiParams(compiled)
287            }
288        }
289    }
290}
291
292#[async_trait::async_trait]
293#[typetag::serde(name = "http_client")]
294impl SourceConfig for HttpClientConfig {
295    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
296        let query = Query::new(&self.query.clone());
297
298        // Build the base URLs
299        let endpoints = [self.endpoint.clone()];
300        let urls: Vec<Uri> = endpoints
301            .iter()
302            .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
303            .map(|r| {
304                if query.has_vrl {
305                    // For URLs with VRL expressions, don't add query parameters here
306                    // They'll be added dynamically during the HTTP request
307                    r
308                } else {
309                    // For URLs without VRL expressions, add query parameters now
310                    r.map(|uri| build_url(&uri, &query.original))
311                }
312            })
313            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
314
315        let tls = TlsSettings::from_options(self.tls.as_ref())?;
316
317        let log_namespace = cx.log_namespace(self.log_namespace);
318
319        // build the decoder
320        let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
321
322        let content_type = self.decoding.content_type(&self.framing).to_string();
323
324        // Create context with the config for dynamic query parameter evaluation
325        let context = HttpClientContext {
326            decoder,
327            log_namespace,
328            query,
329        };
330
331        warn_if_interval_too_low(self.timeout, self.interval);
332
333        let inputs = GenericHttpClientInputs {
334            urls,
335            interval: self.interval,
336            timeout: self.timeout,
337            headers: self.headers.clone(),
338            content_type,
339            auth: self.auth.clone(),
340            tls,
341            proxy: cx.proxy.clone(),
342            shutdown: cx.shutdown,
343        };
344
345        Ok(call(inputs, context, cx.out, self.method).boxed())
346    }
347
348    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
349        // There is a global and per-source `log_namespace` config. The source config overrides the global setting,
350        // and is merged here.
351        let log_namespace = global_log_namespace.merge(self.log_namespace);
352
353        let schema_definition = self
354            .decoding
355            .schema_definition(log_namespace)
356            .with_standard_vector_source_metadata();
357
358        vec![SourceOutput::new_maybe_logs(
359            self.decoding.output_type(),
360            schema_definition,
361        )]
362    }
363
364    fn can_acknowledge(&self) -> bool {
365        false
366    }
367}
368
369impl HttpClientConfig {
370    pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
371        let decoding = self.decoding.clone();
372        let framing = self.framing.clone();
373        let log_namespace =
374            log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
375
376        DecodingConfig::new(framing, decoding, log_namespace)
377    }
378}
379
380/// Captures the configuration options required to decode the incoming requests into events.
381#[derive(Clone)]
382pub struct HttpClientContext {
383    pub decoder: Decoder,
384    pub log_namespace: LogNamespace,
385    query: Query,
386}
387
388impl HttpClientContext {
389    /// Decode the events from the byte buffer
390    fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
391        let mut events = Vec::new();
392        loop {
393            match self.decoder.decode_eof(buf) {
394                Ok(Some((next, _))) => {
395                    events.extend(next);
396                }
397                Ok(None) => break,
398                Err(error) => {
399                    // Error is logged by `crate::codecs::Decoder`, no further
400                    // handling is needed here.
401                    if !error.can_continue() {
402                        break;
403                    }
404                    break;
405                }
406            }
407        }
408        events
409    }
410}
411
412impl HttpClientBuilder for HttpClientContext {
413    type Context = HttpClientContext;
414
415    /// No additional context from request data is needed from this particular client.
416    fn build(&self, _uri: &Uri) -> Self::Context {
417        self.clone()
418    }
419}
420
421fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
422    let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
423    let timezone = TimeZone::default();
424
425    Runtime::default()
426        .resolve(&mut target, program, &timezone)
427        .map_err(|error| {
428            warn!(message = "VRL runtime error.", source = %value, %error);
429        })
430        .ok()
431        .and_then(|vrl_value| {
432            let json_value = serde_json::to_value(vrl_value).ok()?;
433
434            // Properly handle VRL values, so that key1: `upcase("foo")` will resolve
435            // properly as endpoint.com/key1=FOO and not endpoint.com/key1="FOO"
436            // similarly, `now()` should resolve to endpoint.com/key1=2025-06-07T10:39:08.662735Z
437            // and not endpoint.com/key1=t'2025-06-07T10:39:08.662735Z'
438            let resolved_string = match json_value {
439                serde_json::Value::String(s) => s,
440                value => value.to_string(),
441            };
442            Some(resolved_string)
443        })
444}
445
446impl http_client::HttpClientContext for HttpClientContext {
447    /// Decodes the HTTP response body into events per the decoder configured.
448    fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
449        // get the body into a byte array
450        let mut buf = BytesMut::new();
451        buf.extend_from_slice(body);
452
453        let events = self.decode_events(&mut buf);
454
455        Some(events)
456    }
457
458    /// Process the URL dynamically before each request
459    fn process_url(&self, url: &Uri) -> Option<Uri> {
460        // Early exit if there is no VRL to process
461        let query: &Query = &self.query;
462        if !query.has_vrl {
463            return None;
464        }
465
466        let mut processed_query = HashMap::new();
467
468        for (param_name, compiled_value) in &query.compiled {
469            match compiled_value {
470                CompiledQueryParameterValue::SingleParam(compiled_param) => {
471                    let result = match &compiled_param.program {
472                        Some(prog) => resolve_vrl(&compiled_param.value, prog)?,
473                        None => compiled_param.value.clone(),
474                    };
475
476                    processed_query.insert(
477                        param_name.clone(),
478                        QueryParameterValue::SingleParam(ParameterValue::String(result)),
479                    );
480                }
481                CompiledQueryParameterValue::MultiParams(compiled_params) => {
482                    let mut results = Vec::new();
483
484                    for param in compiled_params {
485                        let result = match &param.program {
486                            Some(p) => resolve_vrl(&param.value, p)?,
487                            None => param.value.clone(),
488                        };
489                        results.push(ParameterValue::String(result));
490                    }
491
492                    processed_query.insert(
493                        param_name.clone(),
494                        QueryParameterValue::MultiParams(results),
495                    );
496                }
497            };
498        }
499
500        // Extract the base URI without query parameters to avoid parameter duplication
501        let base_uri = Uri::builder()
502            .scheme(
503                url.scheme()
504                    .cloned()
505                    .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
506            )
507            .authority(
508                url.authority()
509                    .cloned()
510                    .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
511            )
512            .path_and_query(url.path().to_string())
513            .build()
514            .ok()?;
515
516        Some(build_url(&base_uri, &processed_query))
517    }
518
519    /// Enriches events with source_type, timestamp
520    fn enrich_events(&mut self, events: &mut Vec<Event>) {
521        let now = Utc::now();
522
523        for event in events {
524            match event {
525                Event::Log(log) => {
526                    self.log_namespace.insert_standard_vector_source_metadata(
527                        log,
528                        HttpClientConfig::NAME,
529                        now,
530                    );
531                }
532                Event::Metric(metric) => {
533                    if let Some(source_type_key) = log_schema().source_type_key() {
534                        metric.replace_tag(
535                            source_type_key.to_string(),
536                            HttpClientConfig::NAME.to_string(),
537                        );
538                    }
539                }
540                Event::Trace(trace) => {
541                    trace.maybe_insert(log_schema().source_type_key_target_path(), || {
542                        Bytes::from(HttpClientConfig::NAME).into()
543                    });
544                }
545            }
546        }
547    }
548}