1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17 include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::{convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc, time::Duration};
21
22use bytes::{Buf, Bytes};
23use chrono::{DateTime, Utc, serde::ts_milliseconds};
24use flate2::read::{MultiGzDecoder, ZlibDecoder};
25use futures::FutureExt;
26use http::StatusCode;
27use hyper::{Server, service::make_service_fn};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use serde_with::serde_as;
31use snafu::Snafu;
32use tokio::net::TcpStream;
33use tower::ServiceBuilder;
34use tracing::Span;
35use vector_lib::{
36 codecs::decoding::{DeserializerConfig, FramingConfig},
37 config::{LegacyKey, LogNamespace},
38 configurable::configurable_component,
39 event::{BatchNotifier, BatchStatus},
40 internal_event::{EventsReceived, Registered},
41 lookup::owned_value_path,
42 schema::meaning,
43 source_sender::SendError,
44 tls::MaybeTlsIncomingStream,
45};
46use vrl::{
47 path::OwnedTargetPath,
48 value::{Kind, kind::Collection},
49};
50use warp::{Filter, Reply, filters::BoxedFilter, reject::Rejection, reply::Response};
51
52use crate::{
53 SourceSender,
54 codecs::{Decoder, DecodingConfig},
55 common::http::ErrorMessage,
56 config::{
57 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
58 SourceContext, SourceOutput, log_schema,
59 },
60 event::Event,
61 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
62 internal_events::{HttpBytesReceived, StreamClosedError},
63 schema,
64 serde::{bool_or_struct, default_decoding, default_framing_message_based},
65 sources::{self, util::http::emit_decompress_error},
66 tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69pub const LOGS: &str = "logs";
70pub const METRICS: &str = "metrics";
71pub const TRACES: &str = "traces";
72
73#[configurable_component(source(
75 "datadog_agent",
76 "Receive logs, metrics, and traces collected by a Datadog Agent."
77))]
78#[serde_as]
79#[derive(Clone, Debug)]
80pub struct DatadogAgentConfig {
81 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
85 #[configurable(metadata(docs::examples = "localhost:80"))]
86 address: SocketAddr,
87
88 #[configurable(metadata(docs::advanced))]
91 #[serde(default = "crate::serde::default_true")]
92 store_api_key: bool,
93
94 #[configurable(metadata(docs::advanced))]
96 #[serde(default = "crate::serde::default_false")]
97 disable_logs: bool,
98
99 #[configurable(metadata(docs::advanced))]
101 #[serde(default = "crate::serde::default_false")]
102 disable_metrics: bool,
103
104 #[configurable(metadata(docs::advanced))]
106 #[serde(default = "crate::serde::default_false")]
107 disable_traces: bool,
108
109 #[configurable(metadata(docs::advanced))]
116 #[serde(default = "crate::serde::default_false")]
117 multiple_outputs: bool,
118
119 #[configurable(metadata(docs::advanced))]
122 #[serde(default = "crate::serde::default_false")]
123 parse_ddtags: bool,
124
125 #[configurable(metadata(docs::advanced))]
130 #[serde(default = "crate::serde::default_true")]
131 split_metric_namespace: bool,
132
133 #[serde(default)]
135 #[configurable(metadata(docs::hidden))]
136 log_namespace: Option<bool>,
137
138 #[configurable(derived)]
139 tls: Option<TlsEnableableConfig>,
140
141 #[configurable(derived)]
142 #[serde(default = "default_framing_message_based")]
143 framing: FramingConfig,
144
145 #[configurable(derived)]
146 #[serde(default = "default_decoding")]
147 decoding: DeserializerConfig,
148
149 #[configurable(derived)]
150 #[serde(default, deserialize_with = "bool_or_struct")]
151 acknowledgements: SourceAcknowledgementsConfig,
152
153 #[configurable(derived)]
154 #[serde(default)]
155 keepalive: KeepaliveConfig,
156
157 #[serde_as(as = "Option<serde_with::DurationSecondsWithFrac<f64>>")]
167 send_timeout_secs: Option<f64>,
168}
169
170impl GenerateConfig for DatadogAgentConfig {
171 fn generate_config() -> toml::Value {
172 toml::Value::try_from(Self {
173 address: "0.0.0.0:8080".parse().unwrap(),
174 tls: None,
175 store_api_key: true,
176 framing: default_framing_message_based(),
177 decoding: default_decoding(),
178 acknowledgements: SourceAcknowledgementsConfig::default(),
179 disable_logs: false,
180 disable_metrics: false,
181 disable_traces: false,
182 multiple_outputs: false,
183 parse_ddtags: false,
184 split_metric_namespace: true,
185 log_namespace: Some(false),
186 keepalive: KeepaliveConfig::default(),
187 send_timeout_secs: None,
188 })
189 .unwrap()
190 }
191}
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "datadog_agent")]
195impl SourceConfig for DatadogAgentConfig {
196 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
197 let log_namespace = cx.log_namespace(self.log_namespace);
198
199 let logs_schema_definition = cx
200 .schema_definitions
201 .get(&Some(LOGS.to_owned()))
202 .or_else(|| cx.schema_definitions.get(&None))
203 .cloned();
204
205 let decoder =
206 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
207 .build()?;
208
209 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
210 let source = DatadogAgentSource::new(
211 self.store_api_key,
212 decoder,
213 tls.http_protocol_name(),
214 logs_schema_definition,
215 log_namespace,
216 self.parse_ddtags,
217 self.split_metric_namespace,
218 );
219 let listener = tls.bind(&self.address).await?;
220 let handler = RequestHandler {
221 acknowledgements: cx.do_acknowledgements(self.acknowledgements),
222 multiple_outputs: self.multiple_outputs,
223 out: cx.out,
224 };
225 let filters = source.build_warp_filters(handler, self)?;
226 let shutdown = cx.shutdown;
227 let keepalive_settings = self.keepalive.clone();
228
229 info!(message = "Building HTTP server.", address = %self.address);
230
231 Ok(Box::pin(async move {
232 let routes = filters.recover(|r: Rejection| async move {
233 if let Some(e_msg) = r.find::<ErrorMessage>() {
234 let json = warp::reply::json(e_msg);
235 Ok(warp::reply::with_status(json, e_msg.status_code()))
236 } else {
237 Err(r)
239 }
240 });
241
242 let span = Span::current();
243 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
244 let svc = ServiceBuilder::new()
245 .layer(build_http_trace_layer(span.clone()))
246 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
247 MaxConnectionAgeLayer::new(
248 Duration::from_secs(secs),
249 keepalive_settings.max_connection_age_jitter_factor,
250 conn.peer_addr(),
251 )
252 }))
253 .service(warp::service(routes.clone()));
254 futures_util::future::ok::<_, Infallible>(svc)
255 });
256
257 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
258 .serve(make_svc)
259 .with_graceful_shutdown(shutdown.map(|_| ()))
260 .await
261 .map_err(|err| {
262 error!("An error occurred: {:?}.", err);
263 })?;
264
265 Ok(())
266 }))
267 }
268
269 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
270 let definition = self
271 .decoding
272 .schema_definition(global_log_namespace.merge(self.log_namespace))
273 .with_source_metadata(
277 Self::NAME,
278 Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
279 &owned_value_path!("status"),
280 Kind::bytes(),
281 Some(meaning::SEVERITY),
282 )
283 .with_source_metadata(
284 Self::NAME,
285 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
286 &owned_value_path!("timestamp"),
287 Kind::timestamp(),
288 Some(meaning::TIMESTAMP),
289 )
290 .with_source_metadata(
291 Self::NAME,
292 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
293 &owned_value_path!("hostname"),
294 Kind::bytes(),
295 Some(meaning::HOST),
296 )
297 .with_source_metadata(
298 Self::NAME,
299 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
300 &owned_value_path!("service"),
301 Kind::bytes(),
302 Some(meaning::SERVICE),
303 )
304 .with_source_metadata(
305 Self::NAME,
306 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
307 &owned_value_path!("ddsource"),
308 Kind::bytes(),
309 Some(meaning::SOURCE),
310 )
311 .with_source_metadata(
312 Self::NAME,
313 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
314 &owned_value_path!("ddtags"),
315 if self.parse_ddtags {
316 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
317 } else {
318 Kind::bytes()
319 },
320 Some(meaning::TAGS),
321 )
322 .with_standard_vector_source_metadata();
323
324 let mut output = Vec::with_capacity(1);
325
326 if self.multiple_outputs {
327 if !self.disable_logs {
328 output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
329 }
330 if !self.disable_metrics {
331 output.push(SourceOutput::new_metrics().with_port(METRICS))
332 }
333 if !self.disable_traces {
334 output.push(SourceOutput::new_traces().with_port(TRACES))
335 }
336 } else {
337 output.push(SourceOutput::new_maybe_logs(
338 DataType::all_bits(),
339 definition,
340 ))
341 }
342 output
343 }
344
345 fn resources(&self) -> Vec<Resource> {
346 vec![Resource::tcp(self.address)]
347 }
348
349 fn can_acknowledge(&self) -> bool {
350 true
351 }
352
353 fn send_timeout(&self) -> Option<Duration> {
354 self.send_timeout_secs.map(Duration::from_secs_f64)
355 }
356}
357
358#[derive(Clone, Copy, Debug, Snafu)]
359pub(crate) enum ApiError {
360 ServerShutdown,
361}
362
363impl warp::reject::Reject for ApiError {}
364
365#[derive(Deserialize)]
366pub struct ApiKeyQueryParams {
367 #[serde(rename = "dd-api-key")]
368 pub dd_api_key: Option<String>,
369}
370
371#[derive(Clone)]
372pub(crate) struct DatadogAgentSource {
373 pub(crate) api_key_extractor: ApiKeyExtractor,
374 pub(crate) log_schema_host_key: OwnedTargetPath,
375 pub(crate) log_schema_source_type_key: OwnedTargetPath,
376 pub(crate) log_namespace: LogNamespace,
377 pub(crate) decoder: Decoder,
378 protocol: &'static str,
379 logs_schema_definition: Option<Arc<schema::Definition>>,
380 events_received: Registered<EventsReceived>,
381 parse_ddtags: bool,
382 split_metric_namespace: bool,
383}
384
385#[derive(Clone)]
386pub struct ApiKeyExtractor {
387 matcher: Regex,
388 store_api_key: bool,
389}
390
391impl ApiKeyExtractor {
392 pub fn extract(
393 &self,
394 path: &str,
395 header: Option<String>,
396 query_params: Option<String>,
397 ) -> Option<Arc<str>> {
398 if !self.store_api_key {
399 return None;
400 }
401 self.matcher
403 .captures(path)
404 .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
405 .or_else(|| query_params.map(Arc::from))
407 .or_else(|| header.map(Arc::from))
409 }
410}
411
412impl DatadogAgentSource {
413 pub(crate) fn new(
414 store_api_key: bool,
415 decoder: Decoder,
416 protocol: &'static str,
417 logs_schema_definition: Option<schema::Definition>,
418 log_namespace: LogNamespace,
419 parse_ddtags: bool,
420 split_metric_namespace: bool,
421 ) -> Self {
422 Self {
423 api_key_extractor: ApiKeyExtractor {
424 store_api_key,
425 matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
426 .expect("static regex always compiles"),
427 },
428 log_schema_host_key: log_schema()
429 .host_key_target_path()
430 .expect("global log_schema.host_key to be valid path")
431 .clone(),
432 log_schema_source_type_key: log_schema()
433 .source_type_key_target_path()
434 .expect("global log_schema.source_type_key to be valid path")
435 .clone(),
436 decoder,
437 protocol,
438 logs_schema_definition: logs_schema_definition.map(Arc::new),
439 log_namespace,
440 events_received: register!(EventsReceived),
441 parse_ddtags,
442 split_metric_namespace,
443 }
444 }
445
446 fn build_warp_filters(
447 &self,
448 handler: RequestHandler,
449 config: &DatadogAgentConfig,
450 ) -> crate::Result<BoxedFilter<(Response,)>> {
451 let mut filters =
452 (!config.disable_logs).then(|| logs::build_warp_filter(handler.clone(), self.clone()));
453
454 if !config.disable_traces {
455 let trace_filter = traces::build_warp_filter(handler.clone(), self.clone());
456 filters = filters
457 .map(|f| f.or(trace_filter.clone()).unify().boxed())
458 .or(Some(trace_filter));
459 }
460
461 if !config.disable_metrics {
462 let metrics_filter = metrics::build_warp_filter(handler, self.clone());
463 filters = filters
464 .map(|f| f.or(metrics_filter.clone()).unify().boxed())
465 .or(Some(metrics_filter));
466 }
467
468 filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
469 }
470
471 pub(crate) fn decode(
472 &self,
473 header: &Option<String>,
474 mut body: Bytes,
475 path: &str,
476 ) -> Result<Bytes, ErrorMessage> {
477 if let Some(encodings) = header {
478 for encoding in encodings.rsplit(',').map(str::trim) {
479 body = match encoding {
480 "identity" => body,
481 "gzip" | "x-gzip" => {
482 let mut decoded = Vec::new();
483 MultiGzDecoder::new(body.reader())
484 .read_to_end(&mut decoded)
485 .map_err(|error| emit_decompress_error(encoding, error))?;
486 decoded.into()
487 }
488 "zstd" => {
489 let mut decoded = Vec::new();
490 zstd::stream::copy_decode(body.reader(), &mut decoded)
491 .map_err(|error| emit_decompress_error(encoding, error))?;
492 decoded.into()
493 }
494 "deflate" | "x-deflate" => {
495 let mut decoded = Vec::new();
496 ZlibDecoder::new(body.reader())
497 .read_to_end(&mut decoded)
498 .map_err(|error| emit_decompress_error(encoding, error))?;
499 decoded.into()
500 }
501 encoding => {
502 return Err(ErrorMessage::new(
503 StatusCode::UNSUPPORTED_MEDIA_TYPE,
504 format!("Unsupported encoding {encoding}"),
505 ));
506 }
507 }
508 }
509 }
510 emit!(HttpBytesReceived {
511 byte_size: body.len(),
512 http_path: path,
513 protocol: self.protocol,
514 });
515 Ok(body)
516 }
517}
518
519#[derive(Clone)]
520struct RequestHandler {
521 acknowledgements: bool,
522 multiple_outputs: bool,
523 out: SourceSender,
524}
525
526impl RequestHandler {
527 async fn handle_request(
528 mut self,
529 events: Result<Vec<Event>, ErrorMessage>,
530 output: &'static str,
531 ) -> Result<Response, Rejection> {
532 match events {
533 Ok(events) => self.handle_events(events, output).await,
534 Err(err) => Err(warp::reject::custom(err)),
535 }
536 }
537
538 async fn handle_events(
539 &mut self,
540 mut events: Vec<Event>,
541 output: &'static str,
542 ) -> Result<Response, Rejection> {
543 let receiver = BatchNotifier::maybe_apply_to(self.acknowledgements, &mut events);
544 let count = events.len();
545 let output = self.multiple_outputs.then_some(output);
546
547 let result = if let Some(name) = output {
548 self.out.send_batch_named(name, events).await
549 } else {
550 self.out.send_batch(events).await
551 };
552 match result {
553 Ok(()) => {}
554 Err(SendError::Closed) => {
555 emit!(StreamClosedError { count });
556 return Err(warp::reject::custom(ApiError::ServerShutdown));
557 }
558 Err(SendError::Timeout) => {
559 return Ok(warp::reply::with_status(
560 "Service unavailable",
561 StatusCode::SERVICE_UNAVAILABLE,
562 )
563 .into_response());
564 }
565 }
566 match receiver {
567 None => Ok(warp::reply().into_response()),
568 Some(receiver) => match receiver.await {
569 BatchStatus::Delivered => Ok(warp::reply().into_response()),
570 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
571 StatusCode::INTERNAL_SERVER_ERROR,
572 "Error delivering contents to sink".into(),
573 ))),
574 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
575 StatusCode::BAD_REQUEST,
576 "Contents failed to deliver to sink".into(),
577 ))),
578 },
579 }
580 }
581}
582
583#[derive(Clone, Debug, Deserialize, Serialize)]
585#[serde(deny_unknown_fields)]
586struct LogMsg {
587 pub message: Bytes,
588 pub status: Bytes,
589 #[serde(
590 deserialize_with = "ts_milliseconds::deserialize",
591 serialize_with = "ts_milliseconds::serialize"
592 )]
593 pub timestamp: DateTime<Utc>,
594 pub hostname: Bytes,
595 pub service: Bytes,
596 pub ddsource: Bytes,
597 pub ddtags: Bytes,
598}