1use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14 TimeZone,
15 codecs::{
16 StreamDecodingError,
17 decoding::{DeserializerConfig, FramingConfig},
18 },
19 compile_vrl,
20 config::{LogNamespace, SourceOutput, log_schema},
21 configurable::configurable_component,
22 event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25 compiler::{CompileConfig, Function, Program, runtime::Runtime},
26 prelude::TypeState,
27};
28
29use crate::{
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext},
32 format_vrl_diagnostics,
33 http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34 serde::{default_decoding, default_framing_message_based},
35 sources,
36 sources::util::{
37 http::HttpMethod,
38 http_client,
39 http_client::{
40 GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41 default_timeout, warn_if_interval_too_low,
42 },
43 },
44 tls::{TlsConfig, TlsSettings},
45};
46
47#[serde_as]
49#[configurable_component(source(
50 "http_client",
51 "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55 #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59 pub endpoint: String,
60
61 #[serde(default = "default_interval")]
65 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66 #[serde(rename = "scrape_interval_secs")]
67 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68 pub interval: Duration,
69
70 #[serde(default = "default_timeout")]
72 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73 #[serde(rename = "scrape_timeout_secs")]
74 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75 pub timeout: Duration,
76
77 #[serde(default)]
88 #[configurable(metadata(
89 docs::additional_props_description = "A query string parameter and its value(s)."
90 ))]
91 #[configurable(metadata(docs::examples = "query_examples()"))]
92 pub query: QueryParameters,
93
94 #[configurable(derived)]
95 #[serde(default = "default_decoding")]
96 pub decoding: DeserializerConfig,
97
98 #[configurable(derived)]
100 #[serde(default = "default_framing_message_based")]
101 pub framing: FramingConfig,
102
103 #[serde(default)]
107 #[configurable(metadata(
108 docs::additional_props_description = "An HTTP request header and its value(s)."
109 ))]
110 #[configurable(metadata(docs::examples = "headers_examples()"))]
111 pub headers: HashMap<String, Vec<String>>,
112
113 #[serde(default = "default_http_method")]
115 pub method: HttpMethod,
116
117 #[serde(default)]
124 pub body: Option<ParameterValue>,
125
126 #[configurable(derived)]
128 pub tls: Option<TlsConfig>,
129
130 #[configurable(derived)]
132 pub auth: Option<Auth>,
133
134 #[configurable(metadata(docs::hidden))]
136 #[serde(default)]
137 pub log_namespace: Option<bool>,
138}
139
140const fn default_http_method() -> HttpMethod {
141 HttpMethod::Get
142}
143
144fn query_examples() -> QueryParameters {
145 HashMap::<_, _>::from_iter([
146 (
147 "field".to_owned(),
148 QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
149 ),
150 (
151 "fruit".to_owned(),
152 QueryParameterValue::MultiParams(vec![
153 ParameterValue::String("mango".to_owned()),
154 ParameterValue::String("papaya".to_owned()),
155 ParameterValue::String("kiwi".to_owned()),
156 ]),
157 ),
158 (
159 "start_time".to_owned(),
160 QueryParameterValue::SingleParam(ParameterValue::Typed {
161 value: "now()".to_owned(),
162 r#type: ParamType::Vrl,
163 }),
164 ),
165 ])
166}
167
168fn headers_examples() -> HashMap<String, Vec<String>> {
169 HashMap::<_, _>::from_iter([
170 (
171 "Accept".to_owned(),
172 vec!["text/plain".to_owned(), "text/html".to_owned()],
173 ),
174 (
175 "X-My-Custom-Header".to_owned(),
176 vec![
177 "a".to_owned(),
178 "vector".to_owned(),
179 "of".to_owned(),
180 "values".to_owned(),
181 ],
182 ),
183 ])
184}
185
186fn compile_parameter_vrl(
188 param: &ParameterValue,
189 functions: &[Box<dyn Function>],
190) -> Result<Option<Program>, sources::BuildError> {
191 if !param.is_vrl() {
192 return Ok(None);
193 }
194
195 let state = TypeState::default();
196 let config = CompileConfig::default();
197
198 match compile_vrl(param.value(), functions, &state, config) {
199 Ok(compilation_result) => {
200 if !compilation_result.warnings.is_empty() {
201 let warnings = format_vrl_diagnostics(param.value(), compilation_result.warnings);
202 warn!(message = "VRL compilation warnings.", %warnings);
203 }
204 Ok(Some(compilation_result.program))
205 }
206 Err(diagnostics) => {
207 let error = format_vrl_diagnostics(param.value(), diagnostics);
208 Err(sources::BuildError::VrlCompilationError {
209 message: format!("VRL compilation failed: {}", error),
210 })
211 }
212 }
213}
214
215impl Default for HttpClientConfig {
216 fn default() -> Self {
217 Self {
218 endpoint: "http://localhost:9898/logs".to_string(),
219 query: HashMap::new(),
220 interval: default_interval(),
221 timeout: default_timeout(),
222 decoding: default_decoding(),
223 framing: default_framing_message_based(),
224 headers: HashMap::new(),
225 method: default_http_method(),
226 body: None,
227 tls: None,
228 auth: None,
229 log_namespace: None,
230 }
231 }
232}
233
234impl_generate_config_from_default!(HttpClientConfig);
235
236#[derive(Clone)]
237pub struct CompiledParam {
238 value: String,
239 program: Option<Program>,
240}
241
242#[derive(Clone)]
243pub enum CompiledQueryParameterValue {
244 SingleParam(Box<CompiledParam>),
245 MultiParams(Vec<CompiledParam>),
246}
247
248impl CompiledQueryParameterValue {
249 fn has_vrl(&self) -> bool {
250 match self {
251 CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
252 CompiledQueryParameterValue::MultiParams(params) => {
253 params.iter().any(|p| p.program.is_some())
254 }
255 }
256 }
257}
258
259#[derive(Clone)]
260pub struct Query {
261 original: HashMap<String, QueryParameterValue>,
262 compiled: HashMap<String, CompiledQueryParameterValue>,
263 has_vrl: bool,
264}
265
266impl Query {
267 pub fn new(params: &HashMap<String, QueryParameterValue>) -> Result<Self, sources::BuildError> {
268 let functions = vector_vrl_functions::all();
269
270 let mut compiled: HashMap<String, CompiledQueryParameterValue> = HashMap::new();
271
272 for (k, v) in params.iter() {
273 let compiled_param = Self::compile_param(v, &functions)?;
274 compiled.insert(k.clone(), compiled_param);
275 }
276
277 let has_vrl = compiled.values().any(|v| v.has_vrl());
278
279 Ok(Query {
280 original: params.clone(),
281 compiled,
282 has_vrl,
283 })
284 }
285
286 fn compile_value(
287 param: &ParameterValue,
288 functions: &[Box<dyn Function>],
289 ) -> Result<CompiledParam, sources::BuildError> {
290 let program = compile_parameter_vrl(param, functions)?;
291
292 Ok(CompiledParam {
293 value: param.value().to_string(),
294 program,
295 })
296 }
297
298 fn compile_param(
299 value: &QueryParameterValue,
300 functions: &[Box<dyn Function>],
301 ) -> Result<CompiledQueryParameterValue, sources::BuildError> {
302 match value {
303 QueryParameterValue::SingleParam(param) => {
304 Ok(CompiledQueryParameterValue::SingleParam(Box::new(
305 Self::compile_value(param, functions)?,
306 )))
307 }
308 QueryParameterValue::MultiParams(params) => {
309 let compiled = params
310 .iter()
311 .map(|p| Self::compile_value(p, functions))
312 .collect::<Result<Vec<_>, _>>()?;
313 Ok(CompiledQueryParameterValue::MultiParams(compiled))
314 }
315 }
316 }
317}
318
319#[async_trait::async_trait]
320#[typetag::serde(name = "http_client")]
321impl SourceConfig for HttpClientConfig {
322 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
323 let query = Query::new(&self.query)?;
324 let functions = vector_vrl_functions::all();
325
326 let body = self
328 .body
329 .as_ref()
330 .map(|body_param| -> Result<CompiledParam, sources::BuildError> {
331 let program = compile_parameter_vrl(body_param, &functions)?;
332 Ok(CompiledParam {
333 value: body_param.value().to_string(),
334 program,
335 })
336 })
337 .transpose()?;
338
339 let endpoints = [self.endpoint.clone()];
341 let urls: Vec<Uri> = endpoints
342 .iter()
343 .map(|s| {
344 let uri = s.parse::<Uri>().context(sources::UriParseSnafu)?;
345 Ok(if query.has_vrl {
348 uri
349 } else {
350 build_url(&uri, &query.original)
351 })
352 })
353 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
354
355 let tls = TlsSettings::from_options(self.tls.as_ref())?;
356
357 let log_namespace = cx.log_namespace(self.log_namespace);
358
359 let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
361
362 let content_type = self.decoding.content_type(&self.framing).to_string();
363
364 let context = HttpClientContext {
366 decoder,
367 log_namespace,
368 query,
369 body,
370 };
371
372 warn_if_interval_too_low(self.timeout, self.interval);
373
374 let inputs = GenericHttpClientInputs {
375 urls,
376 interval: self.interval,
377 timeout: self.timeout,
378 headers: self.headers.clone(),
379 content_type,
380 auth: self.auth.clone(),
381 tls,
382 proxy: cx.proxy.clone(),
383 shutdown: cx.shutdown,
384 };
385
386 Ok(call(inputs, context, cx.out, self.method).boxed())
387 }
388
389 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
390 let log_namespace = global_log_namespace.merge(self.log_namespace);
393
394 let schema_definition = self
395 .decoding
396 .schema_definition(log_namespace)
397 .with_standard_vector_source_metadata();
398
399 vec![SourceOutput::new_maybe_logs(
400 self.decoding.output_type(),
401 schema_definition,
402 )]
403 }
404
405 fn can_acknowledge(&self) -> bool {
406 false
407 }
408}
409
410impl HttpClientConfig {
411 pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
412 let decoding = self.decoding.clone();
413 let framing = self.framing.clone();
414 let log_namespace =
415 log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
416
417 DecodingConfig::new(framing, decoding, log_namespace)
418 }
419}
420
421#[derive(Clone)]
423pub struct HttpClientContext {
424 pub decoder: Decoder,
425 pub log_namespace: LogNamespace,
426 query: Query,
427 body: Option<CompiledParam>,
428}
429
430impl HttpClientContext {
431 fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
433 let mut events = Vec::new();
434 loop {
435 match self.decoder.decode_eof(buf) {
436 Ok(Some((next, _))) => {
437 events.extend(next);
438 }
439 Ok(None) => break,
440 Err(error) => {
441 if !error.can_continue() {
444 break;
445 }
446 break;
447 }
448 }
449 }
450 events
451 }
452}
453
454impl HttpClientBuilder for HttpClientContext {
455 type Context = HttpClientContext;
456
457 fn build(&self, _uri: &Uri) -> Self::Context {
459 self.clone()
460 }
461}
462
463fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
464 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
465 let timezone = TimeZone::default();
466
467 Runtime::default()
468 .resolve(&mut target, program, &timezone)
469 .map_err(|error| {
470 warn!(message = "VRL runtime error.", source = %value, %error);
471 })
472 .ok()
473 .and_then(|vrl_value| {
474 let json_value = serde_json::to_value(vrl_value).ok()?;
475
476 let resolved_string = match json_value {
481 serde_json::Value::String(s) => s,
482 value => value.to_string(),
483 };
484 Some(resolved_string)
485 })
486}
487
488fn resolve_compiled_param(compiled: &CompiledParam) -> Option<String> {
490 match &compiled.program {
491 Some(program) => resolve_vrl(&compiled.value, program),
492 None => Some(compiled.value.clone()),
493 }
494}
495
496impl http_client::HttpClientContext for HttpClientContext {
497 fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
499 let mut buf = BytesMut::new();
501 buf.extend_from_slice(body);
502
503 let events = self.decode_events(&mut buf);
504
505 Some(events)
506 }
507
508 fn get_request_body(&self) -> Option<String> {
510 self.body.as_ref().and_then(resolve_compiled_param)
511 }
512
513 fn process_url(&self, url: &Uri) -> Option<Uri> {
515 if !self.query.has_vrl {
516 return None;
517 }
518
519 let processed_query: Option<HashMap<_, _>> = self
521 .query
522 .compiled
523 .iter()
524 .map(|(name, value)| {
525 let resolved = match value {
526 CompiledQueryParameterValue::SingleParam(param) => {
527 let result = resolve_compiled_param(param)?;
528 QueryParameterValue::SingleParam(ParameterValue::String(result))
529 }
530 CompiledQueryParameterValue::MultiParams(params) => {
531 let results: Option<Vec<_>> = params
532 .iter()
533 .map(|p| resolve_compiled_param(p).map(ParameterValue::String))
534 .collect();
535 QueryParameterValue::MultiParams(results?)
536 }
537 };
538 Some((name.clone(), resolved))
539 })
540 .collect();
541
542 let base_uri = Uri::builder()
544 .scheme(
545 url.scheme()
546 .cloned()
547 .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
548 )
549 .authority(
550 url.authority()
551 .cloned()
552 .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
553 )
554 .path_and_query(url.path().to_string())
555 .build()
556 .ok()?;
557
558 Some(build_url(&base_uri, &processed_query?))
559 }
560
561 fn enrich_events(&mut self, events: &mut Vec<Event>) {
563 let now = Utc::now();
564
565 for event in events {
566 match event {
567 Event::Log(log) => {
568 self.log_namespace.insert_standard_vector_source_metadata(
569 log,
570 HttpClientConfig::NAME,
571 now,
572 );
573 }
574 Event::Metric(metric) => {
575 if let Some(source_type_key) = log_schema().source_type_key() {
576 metric.replace_tag(
577 source_type_key.to_string(),
578 HttpClientConfig::NAME.to_string(),
579 );
580 }
581 }
582 Event::Trace(trace) => {
583 trace.maybe_insert(log_schema().source_type_key_target_path(), || {
584 Bytes::from(HttpClientConfig::NAME).into()
585 });
586 }
587 }
588 }
589 }
590}