1use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14 TimeZone,
15 codecs::{
16 StreamDecodingError,
17 decoding::{DeserializerConfig, FramingConfig},
18 },
19 compile_vrl,
20 config::{LogNamespace, SourceOutput, log_schema},
21 configurable::configurable_component,
22 event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25 compiler::{CompileConfig, Function, Program, runtime::Runtime},
26 prelude::TypeState,
27};
28
29use crate::{
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext},
32 format_vrl_diagnostics,
33 http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34 serde::{default_decoding, default_framing_message_based},
35 sources,
36 sources::util::{
37 http::HttpMethod,
38 http_client,
39 http_client::{
40 GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41 default_timeout, warn_if_interval_too_low,
42 },
43 },
44 tls::{TlsConfig, TlsSettings},
45};
46
47#[serde_as]
49#[configurable_component(source(
50 "http_client",
51 "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55 #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59 pub endpoint: String,
60
61 #[serde(default = "default_interval")]
65 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66 #[serde(rename = "scrape_interval_secs")]
67 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68 pub interval: Duration,
69
70 #[serde(default = "default_timeout")]
72 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73 #[serde(rename = "scrape_timeout_secs")]
74 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75 pub timeout: Duration,
76
77 #[serde(default)]
88 #[configurable(metadata(
89 docs::additional_props_description = "A query string parameter and its value(s)."
90 ))]
91 #[configurable(metadata(docs::examples = "query_examples()"))]
92 pub query: QueryParameters,
93
94 #[configurable(derived)]
95 #[serde(default = "default_decoding")]
96 pub decoding: DeserializerConfig,
97
98 #[configurable(derived)]
100 #[serde(default = "default_framing_message_based")]
101 pub framing: FramingConfig,
102
103 #[serde(default)]
107 #[configurable(metadata(
108 docs::additional_props_description = "An HTTP request header and its value(s)."
109 ))]
110 #[configurable(metadata(docs::examples = "headers_examples()"))]
111 pub headers: HashMap<String, Vec<String>>,
112
113 #[serde(default = "default_http_method")]
115 pub method: HttpMethod,
116
117 #[serde(default)]
124 pub body: Option<ParameterValue>,
125
126 #[configurable(derived)]
128 pub tls: Option<TlsConfig>,
129
130 #[configurable(derived)]
132 pub auth: Option<Auth>,
133
134 #[configurable(metadata(docs::hidden))]
136 #[serde(default)]
137 pub log_namespace: Option<bool>,
138}
139
140const fn default_http_method() -> HttpMethod {
141 HttpMethod::Get
142}
143
144fn query_examples() -> QueryParameters {
145 HashMap::<_, _>::from_iter([
146 (
147 "field".to_owned(),
148 QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
149 ),
150 (
151 "fruit".to_owned(),
152 QueryParameterValue::MultiParams(vec![
153 ParameterValue::String("mango".to_owned()),
154 ParameterValue::String("papaya".to_owned()),
155 ParameterValue::String("kiwi".to_owned()),
156 ]),
157 ),
158 (
159 "start_time".to_owned(),
160 QueryParameterValue::SingleParam(ParameterValue::Typed {
161 value: "now()".to_owned(),
162 r#type: ParamType::Vrl,
163 }),
164 ),
165 ])
166}
167
168fn headers_examples() -> HashMap<String, Vec<String>> {
169 HashMap::<_, _>::from_iter([
170 (
171 "Accept".to_owned(),
172 vec!["text/plain".to_owned(), "text/html".to_owned()],
173 ),
174 (
175 "X-My-Custom-Header".to_owned(),
176 vec![
177 "a".to_owned(),
178 "vector".to_owned(),
179 "of".to_owned(),
180 "values".to_owned(),
181 ],
182 ),
183 ])
184}
185
186fn get_vrl_functions() -> Vec<Box<dyn Function>> {
188 vrl::stdlib::all()
189 .into_iter()
190 .chain(vector_lib::enrichment::vrl_functions())
191 .chain(vector_vrl_functions::all())
192 .collect()
193}
194
195fn compile_parameter_vrl(
197 param: &ParameterValue,
198 functions: &[Box<dyn Function>],
199) -> Result<Option<Program>, sources::BuildError> {
200 if !param.is_vrl() {
201 return Ok(None);
202 }
203
204 let state = TypeState::default();
205 let config = CompileConfig::default();
206
207 match compile_vrl(param.value(), functions, &state, config) {
208 Ok(compilation_result) => {
209 if !compilation_result.warnings.is_empty() {
210 let warnings = format_vrl_diagnostics(param.value(), compilation_result.warnings);
211 warn!(message = "VRL compilation warnings.", %warnings);
212 }
213 Ok(Some(compilation_result.program))
214 }
215 Err(diagnostics) => {
216 let error = format_vrl_diagnostics(param.value(), diagnostics);
217 Err(sources::BuildError::VrlCompilationError {
218 message: format!("VRL compilation failed: {}", error),
219 })
220 }
221 }
222}
223
224impl Default for HttpClientConfig {
225 fn default() -> Self {
226 Self {
227 endpoint: "http://localhost:9898/logs".to_string(),
228 query: HashMap::new(),
229 interval: default_interval(),
230 timeout: default_timeout(),
231 decoding: default_decoding(),
232 framing: default_framing_message_based(),
233 headers: HashMap::new(),
234 method: default_http_method(),
235 body: None,
236 tls: None,
237 auth: None,
238 log_namespace: None,
239 }
240 }
241}
242
243impl_generate_config_from_default!(HttpClientConfig);
244
245#[derive(Clone)]
246pub struct CompiledParam {
247 value: String,
248 program: Option<Program>,
249}
250
251#[derive(Clone)]
252pub enum CompiledQueryParameterValue {
253 SingleParam(Box<CompiledParam>),
254 MultiParams(Vec<CompiledParam>),
255}
256
257impl CompiledQueryParameterValue {
258 fn has_vrl(&self) -> bool {
259 match self {
260 CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
261 CompiledQueryParameterValue::MultiParams(params) => {
262 params.iter().any(|p| p.program.is_some())
263 }
264 }
265 }
266}
267
268#[derive(Clone)]
269pub struct Query {
270 original: HashMap<String, QueryParameterValue>,
271 compiled: HashMap<String, CompiledQueryParameterValue>,
272 has_vrl: bool,
273}
274
275impl Query {
276 pub fn new(params: &HashMap<String, QueryParameterValue>) -> Result<Self, sources::BuildError> {
277 let functions = get_vrl_functions();
278
279 let mut compiled: HashMap<String, CompiledQueryParameterValue> = HashMap::new();
280
281 for (k, v) in params.iter() {
282 let compiled_param = Self::compile_param(v, &functions)?;
283 compiled.insert(k.clone(), compiled_param);
284 }
285
286 let has_vrl = compiled.values().any(|v| v.has_vrl());
287
288 Ok(Query {
289 original: params.clone(),
290 compiled,
291 has_vrl,
292 })
293 }
294
295 fn compile_value(
296 param: &ParameterValue,
297 functions: &[Box<dyn Function>],
298 ) -> Result<CompiledParam, sources::BuildError> {
299 let program = compile_parameter_vrl(param, functions)?;
300
301 Ok(CompiledParam {
302 value: param.value().to_string(),
303 program,
304 })
305 }
306
307 fn compile_param(
308 value: &QueryParameterValue,
309 functions: &[Box<dyn Function>],
310 ) -> Result<CompiledQueryParameterValue, sources::BuildError> {
311 match value {
312 QueryParameterValue::SingleParam(param) => {
313 Ok(CompiledQueryParameterValue::SingleParam(Box::new(
314 Self::compile_value(param, functions)?,
315 )))
316 }
317 QueryParameterValue::MultiParams(params) => {
318 let compiled = params
319 .iter()
320 .map(|p| Self::compile_value(p, functions))
321 .collect::<Result<Vec<_>, _>>()?;
322 Ok(CompiledQueryParameterValue::MultiParams(compiled))
323 }
324 }
325 }
326}
327
328#[async_trait::async_trait]
329#[typetag::serde(name = "http_client")]
330impl SourceConfig for HttpClientConfig {
331 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
332 let query = Query::new(&self.query)?;
333 let functions = get_vrl_functions();
334
335 let body = self
337 .body
338 .as_ref()
339 .map(|body_param| -> Result<CompiledParam, sources::BuildError> {
340 let program = compile_parameter_vrl(body_param, &functions)?;
341 Ok(CompiledParam {
342 value: body_param.value().to_string(),
343 program,
344 })
345 })
346 .transpose()?;
347
348 let endpoints = [self.endpoint.clone()];
350 let urls: Vec<Uri> = endpoints
351 .iter()
352 .map(|s| {
353 let uri = s.parse::<Uri>().context(sources::UriParseSnafu)?;
354 Ok(if query.has_vrl {
357 uri
358 } else {
359 build_url(&uri, &query.original)
360 })
361 })
362 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
363
364 let tls = TlsSettings::from_options(self.tls.as_ref())?;
365
366 let log_namespace = cx.log_namespace(self.log_namespace);
367
368 let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
370
371 let content_type = self.decoding.content_type(&self.framing).to_string();
372
373 let context = HttpClientContext {
375 decoder,
376 log_namespace,
377 query,
378 body,
379 };
380
381 warn_if_interval_too_low(self.timeout, self.interval);
382
383 let inputs = GenericHttpClientInputs {
384 urls,
385 interval: self.interval,
386 timeout: self.timeout,
387 headers: self.headers.clone(),
388 content_type,
389 auth: self.auth.clone(),
390 tls,
391 proxy: cx.proxy.clone(),
392 shutdown: cx.shutdown,
393 };
394
395 Ok(call(inputs, context, cx.out, self.method).boxed())
396 }
397
398 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
399 let log_namespace = global_log_namespace.merge(self.log_namespace);
402
403 let schema_definition = self
404 .decoding
405 .schema_definition(log_namespace)
406 .with_standard_vector_source_metadata();
407
408 vec![SourceOutput::new_maybe_logs(
409 self.decoding.output_type(),
410 schema_definition,
411 )]
412 }
413
414 fn can_acknowledge(&self) -> bool {
415 false
416 }
417}
418
419impl HttpClientConfig {
420 pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
421 let decoding = self.decoding.clone();
422 let framing = self.framing.clone();
423 let log_namespace =
424 log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
425
426 DecodingConfig::new(framing, decoding, log_namespace)
427 }
428}
429
430#[derive(Clone)]
432pub struct HttpClientContext {
433 pub decoder: Decoder,
434 pub log_namespace: LogNamespace,
435 query: Query,
436 body: Option<CompiledParam>,
437}
438
439impl HttpClientContext {
440 fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
442 let mut events = Vec::new();
443 loop {
444 match self.decoder.decode_eof(buf) {
445 Ok(Some((next, _))) => {
446 events.extend(next);
447 }
448 Ok(None) => break,
449 Err(error) => {
450 if !error.can_continue() {
453 break;
454 }
455 break;
456 }
457 }
458 }
459 events
460 }
461}
462
463impl HttpClientBuilder for HttpClientContext {
464 type Context = HttpClientContext;
465
466 fn build(&self, _uri: &Uri) -> Self::Context {
468 self.clone()
469 }
470}
471
472fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
473 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
474 let timezone = TimeZone::default();
475
476 Runtime::default()
477 .resolve(&mut target, program, &timezone)
478 .map_err(|error| {
479 warn!(message = "VRL runtime error.", source = %value, %error);
480 })
481 .ok()
482 .and_then(|vrl_value| {
483 let json_value = serde_json::to_value(vrl_value).ok()?;
484
485 let resolved_string = match json_value {
490 serde_json::Value::String(s) => s,
491 value => value.to_string(),
492 };
493 Some(resolved_string)
494 })
495}
496
497fn resolve_compiled_param(compiled: &CompiledParam) -> Option<String> {
499 match &compiled.program {
500 Some(program) => resolve_vrl(&compiled.value, program),
501 None => Some(compiled.value.clone()),
502 }
503}
504
505impl http_client::HttpClientContext for HttpClientContext {
506 fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
508 let mut buf = BytesMut::new();
510 buf.extend_from_slice(body);
511
512 let events = self.decode_events(&mut buf);
513
514 Some(events)
515 }
516
517 fn get_request_body(&self) -> Option<String> {
519 self.body.as_ref().and_then(resolve_compiled_param)
520 }
521
522 fn process_url(&self, url: &Uri) -> Option<Uri> {
524 if !self.query.has_vrl {
525 return None;
526 }
527
528 let processed_query: Option<HashMap<_, _>> = self
530 .query
531 .compiled
532 .iter()
533 .map(|(name, value)| {
534 let resolved = match value {
535 CompiledQueryParameterValue::SingleParam(param) => {
536 let result = resolve_compiled_param(param)?;
537 QueryParameterValue::SingleParam(ParameterValue::String(result))
538 }
539 CompiledQueryParameterValue::MultiParams(params) => {
540 let results: Option<Vec<_>> = params
541 .iter()
542 .map(|p| resolve_compiled_param(p).map(ParameterValue::String))
543 .collect();
544 QueryParameterValue::MultiParams(results?)
545 }
546 };
547 Some((name.clone(), resolved))
548 })
549 .collect();
550
551 let base_uri = Uri::builder()
553 .scheme(
554 url.scheme()
555 .cloned()
556 .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
557 )
558 .authority(
559 url.authority()
560 .cloned()
561 .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
562 )
563 .path_and_query(url.path().to_string())
564 .build()
565 .ok()?;
566
567 Some(build_url(&base_uri, &processed_query?))
568 }
569
570 fn enrich_events(&mut self, events: &mut Vec<Event>) {
572 let now = Utc::now();
573
574 for event in events {
575 match event {
576 Event::Log(log) => {
577 self.log_namespace.insert_standard_vector_source_metadata(
578 log,
579 HttpClientConfig::NAME,
580 now,
581 );
582 }
583 Event::Metric(metric) => {
584 if let Some(source_type_key) = log_schema().source_type_key() {
585 metric.replace_tag(
586 source_type_key.to_string(),
587 HttpClientConfig::NAME.to_string(),
588 );
589 }
590 }
591 Event::Trace(trace) => {
592 trace.maybe_insert(log_schema().source_type_key_target_path(), || {
593 Bytes::from(HttpClientConfig::NAME).into()
594 });
595 }
596 }
597 }
598 }
599}