1use std::{collections::HashMap, time::Duration};
5
6use bytes::{Bytes, BytesMut};
7use chrono::Utc;
8use futures_util::FutureExt;
9use http::{Uri, response::Parts};
10use serde_with::serde_as;
11use snafu::ResultExt;
12use tokio_util::codec::Decoder as _;
13use vector_lib::{
14 TimeZone,
15 codecs::{
16 StreamDecodingError,
17 decoding::{DeserializerConfig, FramingConfig},
18 },
19 compile_vrl,
20 config::{LogNamespace, SourceOutput, log_schema},
21 configurable::configurable_component,
22 event::{Event, LogEvent, VrlTarget},
23};
24use vrl::{
25 compiler::{CompileConfig, Function, Program, runtime::Runtime},
26 prelude::TypeState,
27};
28
29use crate::{
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext},
32 format_vrl_diagnostics,
33 http::{Auth, ParamType, ParameterValue, QueryParameterValue, QueryParameters},
34 serde::{default_decoding, default_framing_message_based},
35 sources,
36 sources::util::{
37 http::HttpMethod,
38 http_client,
39 http_client::{
40 GenericHttpClientInputs, HttpClientBuilder, build_url, call, default_interval,
41 default_timeout, warn_if_interval_too_low,
42 },
43 },
44 tls::{TlsConfig, TlsSettings},
45};
46
47#[serde_as]
49#[configurable_component(source(
50 "http_client",
51 "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55 #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59 pub endpoint: String,
60
61 #[serde(default = "default_interval")]
65 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66 #[serde(rename = "scrape_interval_secs")]
67 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68 pub interval: Duration,
69
70 #[serde(default = "default_timeout")]
72 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73 #[serde(rename = "scrape_timeout_secs")]
74 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75 pub timeout: Duration,
76
77 #[serde(default)]
88 #[configurable(metadata(
89 docs::additional_props_description = "A query string parameter and its value(s)."
90 ))]
91 #[configurable(metadata(docs::examples = "query_examples()"))]
92 pub query: QueryParameters,
93
94 #[configurable(derived)]
95 #[serde(default = "default_decoding")]
96 pub decoding: DeserializerConfig,
97
98 #[configurable(derived)]
100 #[serde(default = "default_framing_message_based")]
101 pub framing: FramingConfig,
102
103 #[serde(default)]
107 #[configurable(metadata(
108 docs::additional_props_description = "An HTTP request header and its value(s)."
109 ))]
110 #[configurable(metadata(docs::examples = "headers_examples()"))]
111 pub headers: HashMap<String, Vec<String>>,
112
113 #[serde(default = "default_http_method")]
115 pub method: HttpMethod,
116
117 #[configurable(derived)]
119 pub tls: Option<TlsConfig>,
120
121 #[configurable(derived)]
123 pub auth: Option<Auth>,
124
125 #[configurable(metadata(docs::hidden))]
127 #[serde(default)]
128 pub log_namespace: Option<bool>,
129}
130
131const fn default_http_method() -> HttpMethod {
132 HttpMethod::Get
133}
134
135fn query_examples() -> QueryParameters {
136 HashMap::<_, _>::from_iter([
137 (
138 "field".to_owned(),
139 QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
140 ),
141 (
142 "fruit".to_owned(),
143 QueryParameterValue::MultiParams(vec![
144 ParameterValue::String("mango".to_owned()),
145 ParameterValue::String("papaya".to_owned()),
146 ParameterValue::String("kiwi".to_owned()),
147 ]),
148 ),
149 (
150 "start_time".to_owned(),
151 QueryParameterValue::SingleParam(ParameterValue::Typed {
152 value: "now()".to_owned(),
153 r#type: ParamType::Vrl,
154 }),
155 ),
156 ])
157}
158
159fn headers_examples() -> HashMap<String, Vec<String>> {
160 HashMap::<_, _>::from_iter([
161 (
162 "Accept".to_owned(),
163 vec!["text/plain".to_owned(), "text/html".to_owned()],
164 ),
165 (
166 "X-My-Custom-Header".to_owned(),
167 vec![
168 "a".to_owned(),
169 "vector".to_owned(),
170 "of".to_owned(),
171 "values".to_owned(),
172 ],
173 ),
174 ])
175}
176
177impl Default for HttpClientConfig {
178 fn default() -> Self {
179 Self {
180 endpoint: "http://localhost:9898/logs".to_string(),
181 query: HashMap::new(),
182 interval: default_interval(),
183 timeout: default_timeout(),
184 decoding: default_decoding(),
185 framing: default_framing_message_based(),
186 headers: HashMap::new(),
187 method: default_http_method(),
188 tls: None,
189 auth: None,
190 log_namespace: None,
191 }
192 }
193}
194
195impl_generate_config_from_default!(HttpClientConfig);
196
197#[derive(Clone)]
198pub struct CompiledParam {
199 value: String,
200 program: Option<Program>,
201}
202
203#[derive(Clone)]
204pub enum CompiledQueryParameterValue {
205 SingleParam(Box<CompiledParam>),
206 MultiParams(Vec<CompiledParam>),
207}
208
209#[derive(Clone)]
210pub struct Query {
211 original: HashMap<String, QueryParameterValue>,
212 compiled: HashMap<String, CompiledQueryParameterValue>,
213 has_vrl: bool,
214}
215
216impl Query {
217 pub fn new(params: &HashMap<String, QueryParameterValue>) -> Self {
218 let functions = vrl::stdlib::all()
219 .into_iter()
220 .chain(vector_lib::enrichment::vrl_functions())
221 .chain(vector_vrl_functions::all())
222 .collect::<Vec<_>>();
223
224 let compiled: HashMap<String, CompiledQueryParameterValue> = params
225 .iter()
226 .map(|(k, v)| (k.clone(), Self::compile_param(v, &functions)))
227 .collect();
228
229 let has_vrl = compiled.values().any(|compiled| match compiled {
230 CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
231 CompiledQueryParameterValue::MultiParams(params) => {
232 params.iter().any(|p| p.program.is_some())
233 }
234 });
235
236 Query {
237 original: params.clone(),
238 compiled,
239 has_vrl,
240 }
241 }
242
243 fn compile_value(param: &ParameterValue, functions: &[Box<dyn Function>]) -> CompiledParam {
244 let program = if param.is_vrl() {
245 let state = TypeState::default();
246 let config = CompileConfig::default();
247
248 match compile_vrl(param.value(), functions, &state, config) {
249 Ok(compilation_result) => {
250 if !compilation_result.warnings.is_empty() {
251 let warnings =
252 format_vrl_diagnostics(param.value(), compilation_result.warnings);
253 warn!(message = "VRL compilation warnings.", %warnings);
254 }
255 Some(compilation_result.program)
256 }
257 Err(diagnostics) => {
258 let error = format_vrl_diagnostics(param.value(), diagnostics);
259 warn!(message = "VRL compilation failed.", %error);
260 None
261 }
262 }
263 } else {
264 None
265 };
266
267 CompiledParam {
268 value: param.value().to_string(),
269 program,
270 }
271 }
272
273 fn compile_param(
274 value: &QueryParameterValue,
275 functions: &[Box<dyn Function>],
276 ) -> CompiledQueryParameterValue {
277 match value {
278 QueryParameterValue::SingleParam(param) => CompiledQueryParameterValue::SingleParam(
279 Box::new(Self::compile_value(param, functions)),
280 ),
281 QueryParameterValue::MultiParams(params) => {
282 let compiled = params
283 .iter()
284 .map(|p| Self::compile_value(p, functions))
285 .collect();
286 CompiledQueryParameterValue::MultiParams(compiled)
287 }
288 }
289 }
290}
291
292#[async_trait::async_trait]
293#[typetag::serde(name = "http_client")]
294impl SourceConfig for HttpClientConfig {
295 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
296 let query = Query::new(&self.query.clone());
297
298 let endpoints = [self.endpoint.clone()];
300 let urls: Vec<Uri> = endpoints
301 .iter()
302 .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
303 .map(|r| {
304 if query.has_vrl {
305 r
308 } else {
309 r.map(|uri| build_url(&uri, &query.original))
311 }
312 })
313 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
314
315 let tls = TlsSettings::from_options(self.tls.as_ref())?;
316
317 let log_namespace = cx.log_namespace(self.log_namespace);
318
319 let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
321
322 let content_type = self.decoding.content_type(&self.framing).to_string();
323
324 let context = HttpClientContext {
326 decoder,
327 log_namespace,
328 query,
329 };
330
331 warn_if_interval_too_low(self.timeout, self.interval);
332
333 let inputs = GenericHttpClientInputs {
334 urls,
335 interval: self.interval,
336 timeout: self.timeout,
337 headers: self.headers.clone(),
338 content_type,
339 auth: self.auth.clone(),
340 tls,
341 proxy: cx.proxy.clone(),
342 shutdown: cx.shutdown,
343 };
344
345 Ok(call(inputs, context, cx.out, self.method).boxed())
346 }
347
348 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
349 let log_namespace = global_log_namespace.merge(self.log_namespace);
352
353 let schema_definition = self
354 .decoding
355 .schema_definition(log_namespace)
356 .with_standard_vector_source_metadata();
357
358 vec![SourceOutput::new_maybe_logs(
359 self.decoding.output_type(),
360 schema_definition,
361 )]
362 }
363
364 fn can_acknowledge(&self) -> bool {
365 false
366 }
367}
368
369impl HttpClientConfig {
370 pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
371 let decoding = self.decoding.clone();
372 let framing = self.framing.clone();
373 let log_namespace =
374 log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
375
376 DecodingConfig::new(framing, decoding, log_namespace)
377 }
378}
379
380#[derive(Clone)]
382pub struct HttpClientContext {
383 pub decoder: Decoder,
384 pub log_namespace: LogNamespace,
385 query: Query,
386}
387
388impl HttpClientContext {
389 fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
391 let mut events = Vec::new();
392 loop {
393 match self.decoder.decode_eof(buf) {
394 Ok(Some((next, _))) => {
395 events.extend(next);
396 }
397 Ok(None) => break,
398 Err(error) => {
399 if !error.can_continue() {
402 break;
403 }
404 break;
405 }
406 }
407 }
408 events
409 }
410}
411
412impl HttpClientBuilder for HttpClientContext {
413 type Context = HttpClientContext;
414
415 fn build(&self, _uri: &Uri) -> Self::Context {
417 self.clone()
418 }
419}
420
421fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
422 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
423 let timezone = TimeZone::default();
424
425 Runtime::default()
426 .resolve(&mut target, program, &timezone)
427 .map_err(|error| {
428 warn!(message = "VRL runtime error.", source = %value, %error);
429 })
430 .ok()
431 .and_then(|vrl_value| {
432 let json_value = serde_json::to_value(vrl_value).ok()?;
433
434 let resolved_string = match json_value {
439 serde_json::Value::String(s) => s,
440 value => value.to_string(),
441 };
442 Some(resolved_string)
443 })
444}
445
446impl http_client::HttpClientContext for HttpClientContext {
447 fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
449 let mut buf = BytesMut::new();
451 buf.extend_from_slice(body);
452
453 let events = self.decode_events(&mut buf);
454
455 Some(events)
456 }
457
458 fn process_url(&self, url: &Uri) -> Option<Uri> {
460 let query: &Query = &self.query;
462 if !query.has_vrl {
463 return None;
464 }
465
466 let mut processed_query = HashMap::new();
467
468 for (param_name, compiled_value) in &query.compiled {
469 match compiled_value {
470 CompiledQueryParameterValue::SingleParam(compiled_param) => {
471 let result = match &compiled_param.program {
472 Some(prog) => resolve_vrl(&compiled_param.value, prog)?,
473 None => compiled_param.value.clone(),
474 };
475
476 processed_query.insert(
477 param_name.clone(),
478 QueryParameterValue::SingleParam(ParameterValue::String(result)),
479 );
480 }
481 CompiledQueryParameterValue::MultiParams(compiled_params) => {
482 let mut results = Vec::new();
483
484 for param in compiled_params {
485 let result = match ¶m.program {
486 Some(p) => resolve_vrl(¶m.value, p)?,
487 None => param.value.clone(),
488 };
489 results.push(ParameterValue::String(result));
490 }
491
492 processed_query.insert(
493 param_name.clone(),
494 QueryParameterValue::MultiParams(results),
495 );
496 }
497 };
498 }
499
500 let base_uri = Uri::builder()
502 .scheme(
503 url.scheme()
504 .cloned()
505 .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
506 )
507 .authority(
508 url.authority()
509 .cloned()
510 .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
511 )
512 .path_and_query(url.path().to_string())
513 .build()
514 .ok()?;
515
516 Some(build_url(&base_uri, &processed_query))
517 }
518
519 fn enrich_events(&mut self, events: &mut Vec<Event>) {
521 let now = Utc::now();
522
523 for event in events {
524 match event {
525 Event::Log(log) => {
526 self.log_namespace.insert_standard_vector_source_metadata(
527 log,
528 HttpClientConfig::NAME,
529 now,
530 );
531 }
532 Event::Metric(metric) => {
533 if let Some(source_type_key) = log_schema().source_type_key() {
534 metric.replace_tag(
535 source_type_key.to_string(),
536 HttpClientConfig::NAME.to_string(),
537 );
538 }
539 }
540 Event::Trace(trace) => {
541 trace.maybe_insert(log_schema().source_type_key_target_path(), || {
542 Bytes::from(HttpClientConfig::NAME).into()
543 });
544 }
545 }
546 }
547 }
548}