1use bytes::{Bytes, BytesMut};
5use chrono::Utc;
6use futures_util::FutureExt;
7use http::{response::Parts, Uri};
8use serde_with::serde_as;
9use snafu::ResultExt;
10use std::{collections::HashMap, time::Duration};
11use tokio_util::codec::Decoder as _;
12use vrl::diagnostic::Formatter;
13
14use crate::http::{ParamType, ParameterValue, QueryParameterValue, QueryParameters};
15use crate::sources::util::http_client;
16use crate::{
17 codecs::{Decoder, DecodingConfig},
18 config::{SourceConfig, SourceContext},
19 http::Auth,
20 serde::{default_decoding, default_framing_message_based},
21 sources,
22 sources::util::{
23 http::HttpMethod,
24 http_client::{
25 build_url, call, default_interval, default_timeout, warn_if_interval_too_low,
26 GenericHttpClientInputs, HttpClientBuilder,
27 },
28 },
29 tls::{TlsConfig, TlsSettings},
30};
31use vector_lib::codecs::{
32 decoding::{DeserializerConfig, FramingConfig},
33 StreamDecodingError,
34};
35use vector_lib::config::{log_schema, LogNamespace, SourceOutput};
36use vector_lib::configurable::configurable_component;
37use vector_lib::{
38 compile_vrl,
39 event::{Event, LogEvent, VrlTarget},
40 TimeZone,
41};
42use vrl::{
43 compiler::{runtime::Runtime, CompileConfig, Function, Program},
44 prelude::TypeState,
45};
46
47#[serde_as]
49#[configurable_component(source(
50 "http_client",
51 "Pull observability data from an HTTP server at a configured interval."
52))]
53#[derive(Clone, Debug)]
54pub struct HttpClientConfig {
55 #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
59 pub endpoint: String,
60
61 #[serde(default = "default_interval")]
65 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66 #[serde(rename = "scrape_interval_secs")]
67 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
68 pub interval: Duration,
69
70 #[serde(default = "default_timeout")]
72 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
73 #[serde(rename = "scrape_timeout_secs")]
74 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
75 pub timeout: Duration,
76
77 #[serde(default)]
88 #[configurable(metadata(
89 docs::additional_props_description = "A query string parameter and its value(s)."
90 ))]
91 #[configurable(metadata(docs::examples = "query_examples()"))]
92 pub query: QueryParameters,
93
94 #[configurable(derived)]
95 #[serde(default = "default_decoding")]
96 pub decoding: DeserializerConfig,
97
98 #[configurable(derived)]
100 #[serde(default = "default_framing_message_based")]
101 pub framing: FramingConfig,
102
103 #[serde(default)]
107 #[configurable(metadata(
108 docs::additional_props_description = "An HTTP request header and its value(s)."
109 ))]
110 #[configurable(metadata(docs::examples = "headers_examples()"))]
111 pub headers: HashMap<String, Vec<String>>,
112
113 #[serde(default = "default_http_method")]
115 pub method: HttpMethod,
116
117 #[configurable(derived)]
119 pub tls: Option<TlsConfig>,
120
121 #[configurable(derived)]
123 pub auth: Option<Auth>,
124
125 #[configurable(metadata(docs::hidden))]
127 #[serde(default)]
128 pub log_namespace: Option<bool>,
129}
130
131const fn default_http_method() -> HttpMethod {
132 HttpMethod::Get
133}
134
135fn query_examples() -> QueryParameters {
136 HashMap::<_, _>::from_iter([
137 (
138 "field".to_owned(),
139 QueryParameterValue::SingleParam(ParameterValue::String("value".to_owned())),
140 ),
141 (
142 "fruit".to_owned(),
143 QueryParameterValue::MultiParams(vec![
144 ParameterValue::String("mango".to_owned()),
145 ParameterValue::String("papaya".to_owned()),
146 ParameterValue::String("kiwi".to_owned()),
147 ]),
148 ),
149 (
150 "start_time".to_owned(),
151 QueryParameterValue::SingleParam(ParameterValue::Typed {
152 value: "now()".to_owned(),
153 r#type: ParamType::Vrl,
154 }),
155 ),
156 ])
157}
158
159fn headers_examples() -> HashMap<String, Vec<String>> {
160 HashMap::<_, _>::from_iter([
161 (
162 "Accept".to_owned(),
163 vec!["text/plain".to_owned(), "text/html".to_owned()],
164 ),
165 (
166 "X-My-Custom-Header".to_owned(),
167 vec![
168 "a".to_owned(),
169 "vector".to_owned(),
170 "of".to_owned(),
171 "values".to_owned(),
172 ],
173 ),
174 ])
175}
176
177impl Default for HttpClientConfig {
178 fn default() -> Self {
179 Self {
180 endpoint: "http://localhost:9898/logs".to_string(),
181 query: HashMap::new(),
182 interval: default_interval(),
183 timeout: default_timeout(),
184 decoding: default_decoding(),
185 framing: default_framing_message_based(),
186 headers: HashMap::new(),
187 method: default_http_method(),
188 tls: None,
189 auth: None,
190 log_namespace: None,
191 }
192 }
193}
194
195impl_generate_config_from_default!(HttpClientConfig);
196
197#[derive(Clone)]
198pub struct CompiledParam {
199 value: String,
200 program: Option<Program>,
201}
202
203#[derive(Clone)]
204pub enum CompiledQueryParameterValue {
205 SingleParam(Box<CompiledParam>),
206 MultiParams(Vec<CompiledParam>),
207}
208
209#[derive(Clone)]
210pub struct Query {
211 original: HashMap<String, QueryParameterValue>,
212 compiled: HashMap<String, CompiledQueryParameterValue>,
213 has_vrl: bool,
214}
215
216impl Query {
217 pub fn new(params: &HashMap<String, QueryParameterValue>) -> Self {
218 let functions = vrl::stdlib::all()
219 .into_iter()
220 .chain(vector_lib::enrichment::vrl_functions())
221 .chain(vector_vrl_functions::all())
222 .collect::<Vec<_>>();
223
224 let compiled: HashMap<String, CompiledQueryParameterValue> = params
225 .iter()
226 .map(|(k, v)| (k.clone(), Self::compile_param(v, &functions)))
227 .collect();
228
229 let has_vrl = compiled.values().any(|compiled| match compiled {
230 CompiledQueryParameterValue::SingleParam(param) => param.program.is_some(),
231 CompiledQueryParameterValue::MultiParams(params) => {
232 params.iter().any(|p| p.program.is_some())
233 }
234 });
235
236 Query {
237 original: params.clone(),
238 compiled,
239 has_vrl,
240 }
241 }
242
243 fn compile_value(param: &ParameterValue, functions: &[Box<dyn Function>]) -> CompiledParam {
244 let program = if param.is_vrl() {
245 let state = TypeState::default();
246 let config = CompileConfig::default();
247
248 match compile_vrl(param.value(), functions, &state, config) {
249 Ok(compilation_result) => {
250 if !compilation_result.warnings.is_empty() {
251 let warnings = Formatter::new(param.value(), compilation_result.warnings)
252 .colored()
253 .to_string();
254 warn!(message = "VRL compilation warnings.", %warnings, internal_log_rate_limit = true);
255 }
256 Some(compilation_result.program)
257 }
258 Err(diagnostics) => {
259 let error = Formatter::new(param.value(), diagnostics)
260 .colored()
261 .to_string();
262 warn!(message = "VRL compilation failed.", %error, internal_log_rate_limit = true);
263 None
264 }
265 }
266 } else {
267 None
268 };
269
270 CompiledParam {
271 value: param.value().to_string(),
272 program,
273 }
274 }
275
276 fn compile_param(
277 value: &QueryParameterValue,
278 functions: &[Box<dyn Function>],
279 ) -> CompiledQueryParameterValue {
280 match value {
281 QueryParameterValue::SingleParam(param) => CompiledQueryParameterValue::SingleParam(
282 Box::new(Self::compile_value(param, functions)),
283 ),
284 QueryParameterValue::MultiParams(params) => {
285 let compiled = params
286 .iter()
287 .map(|p| Self::compile_value(p, functions))
288 .collect();
289 CompiledQueryParameterValue::MultiParams(compiled)
290 }
291 }
292 }
293}
294
295#[async_trait::async_trait]
296#[typetag::serde(name = "http_client")]
297impl SourceConfig for HttpClientConfig {
298 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
299 let query = Query::new(&self.query.clone());
300
301 let endpoints = [self.endpoint.clone()];
303 let urls: Vec<Uri> = endpoints
304 .iter()
305 .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
306 .map(|r| {
307 if query.has_vrl {
308 r
311 } else {
312 r.map(|uri| build_url(&uri, &query.original))
314 }
315 })
316 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
317
318 let tls = TlsSettings::from_options(self.tls.as_ref())?;
319
320 let log_namespace = cx.log_namespace(self.log_namespace);
321
322 let decoder = self.get_decoding_config(Some(log_namespace)).build()?;
324
325 let content_type = self.decoding.content_type(&self.framing).to_string();
326
327 let context = HttpClientContext {
329 decoder,
330 log_namespace,
331 query,
332 };
333
334 warn_if_interval_too_low(self.timeout, self.interval);
335
336 let inputs = GenericHttpClientInputs {
337 urls,
338 interval: self.interval,
339 timeout: self.timeout,
340 headers: self.headers.clone(),
341 content_type,
342 auth: self.auth.clone(),
343 tls,
344 proxy: cx.proxy.clone(),
345 shutdown: cx.shutdown,
346 };
347
348 Ok(call(inputs, context, cx.out, self.method).boxed())
349 }
350
351 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
352 let log_namespace = global_log_namespace.merge(self.log_namespace);
355
356 let schema_definition = self
357 .decoding
358 .schema_definition(log_namespace)
359 .with_standard_vector_source_metadata();
360
361 vec![SourceOutput::new_maybe_logs(
362 self.decoding.output_type(),
363 schema_definition,
364 )]
365 }
366
367 fn can_acknowledge(&self) -> bool {
368 false
369 }
370}
371
372impl HttpClientConfig {
373 pub fn get_decoding_config(&self, log_namespace: Option<LogNamespace>) -> DecodingConfig {
374 let decoding = self.decoding.clone();
375 let framing = self.framing.clone();
376 let log_namespace =
377 log_namespace.unwrap_or_else(|| self.log_namespace.unwrap_or(false).into());
378
379 DecodingConfig::new(framing, decoding, log_namespace)
380 }
381}
382
383#[derive(Clone)]
385pub struct HttpClientContext {
386 pub decoder: Decoder,
387 pub log_namespace: LogNamespace,
388 query: Query,
389}
390
391impl HttpClientContext {
392 fn decode_events(&mut self, buf: &mut BytesMut) -> Vec<Event> {
394 let mut events = Vec::new();
395 loop {
396 match self.decoder.decode_eof(buf) {
397 Ok(Some((next, _))) => {
398 events.extend(next);
399 }
400 Ok(None) => break,
401 Err(error) => {
402 if !error.can_continue() {
405 break;
406 }
407 break;
408 }
409 }
410 }
411 events
412 }
413}
414
415impl HttpClientBuilder for HttpClientContext {
416 type Context = HttpClientContext;
417
418 fn build(&self, _uri: &Uri) -> Self::Context {
420 self.clone()
421 }
422}
423
424fn resolve_vrl(value: &str, program: &Program) -> Option<String> {
425 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
426 let timezone = TimeZone::default();
427
428 Runtime::default()
429 .resolve(&mut target, program, &timezone)
430 .map_err(|error| {
431 warn!(message = "VRL runtime error.", source = %value, %error, internal_log_rate_limit = true);
432 })
433 .ok()
434 .and_then(|vrl_value| {
435 let json_value = serde_json::to_value(vrl_value).ok()?;
436
437 let resolved_string = match json_value {
442 serde_json::Value::String(s) => s,
443 value => value.to_string(),
444 };
445 Some(resolved_string)
446 })
447}
448
449impl http_client::HttpClientContext for HttpClientContext {
450 fn on_response(&mut self, _url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
452 let mut buf = BytesMut::new();
454 buf.extend_from_slice(body);
455
456 let events = self.decode_events(&mut buf);
457
458 Some(events)
459 }
460
461 fn process_url(&self, url: &Uri) -> Option<Uri> {
463 let query: &Query = &self.query;
465 if !query.has_vrl {
466 return None;
467 }
468
469 let mut processed_query = HashMap::new();
470
471 for (param_name, compiled_value) in &query.compiled {
472 match compiled_value {
473 CompiledQueryParameterValue::SingleParam(compiled_param) => {
474 let result = match &compiled_param.program {
475 Some(prog) => resolve_vrl(&compiled_param.value, prog)?,
476 None => compiled_param.value.clone(),
477 };
478
479 processed_query.insert(
480 param_name.clone(),
481 QueryParameterValue::SingleParam(ParameterValue::String(result)),
482 );
483 }
484 CompiledQueryParameterValue::MultiParams(compiled_params) => {
485 let mut results = Vec::new();
486
487 for param in compiled_params {
488 let result = match ¶m.program {
489 Some(p) => resolve_vrl(¶m.value, p)?,
490 None => param.value.clone(),
491 };
492 results.push(ParameterValue::String(result));
493 }
494
495 processed_query.insert(
496 param_name.clone(),
497 QueryParameterValue::MultiParams(results),
498 );
499 }
500 };
501 }
502
503 let base_uri = Uri::builder()
505 .scheme(
506 url.scheme()
507 .cloned()
508 .unwrap_or_else(|| http::uri::Scheme::try_from("http").unwrap()),
509 )
510 .authority(
511 url.authority()
512 .cloned()
513 .unwrap_or_else(|| http::uri::Authority::try_from("localhost").unwrap()),
514 )
515 .path_and_query(url.path().to_string())
516 .build()
517 .ok()?;
518
519 Some(build_url(&base_uri, &processed_query))
520 }
521
522 fn enrich_events(&mut self, events: &mut Vec<Event>) {
524 let now = Utc::now();
525
526 for event in events {
527 match event {
528 Event::Log(log) => {
529 self.log_namespace.insert_standard_vector_source_metadata(
530 log,
531 HttpClientConfig::NAME,
532 now,
533 );
534 }
535 Event::Metric(metric) => {
536 if let Some(source_type_key) = log_schema().source_type_key() {
537 metric.replace_tag(
538 source_type_key.to_string(),
539 HttpClientConfig::NAME.to_string(),
540 );
541 }
542 }
543 Event::Trace(trace) => {
544 trace.maybe_insert(log_schema().source_type_key_target_path(), || {
545 Bytes::from(HttpClientConfig::NAME).into()
546 });
547 }
548 }
549 }
550 }
551}