1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17 include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::{convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc, time::Duration};
21
22use bytes::{Buf, Bytes};
23use chrono::{DateTime, Utc, serde::ts_milliseconds};
24use flate2::read::{MultiGzDecoder, ZlibDecoder};
25use futures::FutureExt;
26use http::StatusCode;
27use hyper::{Server, service::make_service_fn};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::Snafu;
31use tokio::net::TcpStream;
32use tower::ServiceBuilder;
33use tracing::Span;
34use vector_lib::{
35 codecs::decoding::{DeserializerConfig, FramingConfig},
36 config::{LegacyKey, LogNamespace},
37 configurable::configurable_component,
38 event::{BatchNotifier, BatchStatus},
39 internal_event::{EventsReceived, Registered},
40 lookup::owned_value_path,
41 schema::meaning,
42 tls::MaybeTlsIncomingStream,
43};
44use vrl::{
45 path::OwnedTargetPath,
46 value::{Kind, kind::Collection},
47};
48use warp::{Filter, Reply, filters::BoxedFilter, reject::Rejection, reply::Response};
49
50use crate::{
51 SourceSender,
52 codecs::{Decoder, DecodingConfig},
53 common::http::ErrorMessage,
54 config::{
55 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
56 SourceContext, SourceOutput, log_schema,
57 },
58 event::Event,
59 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
60 internal_events::{HttpBytesReceived, StreamClosedError},
61 schema,
62 serde::{bool_or_struct, default_decoding, default_framing_message_based},
63 sources::{self, util::http::emit_decompress_error},
64 tls::{MaybeTlsSettings, TlsEnableableConfig},
65};
66
67pub const LOGS: &str = "logs";
68pub const METRICS: &str = "metrics";
69pub const TRACES: &str = "traces";
70
71#[configurable_component(source(
73 "datadog_agent",
74 "Receive logs, metrics, and traces collected by a Datadog Agent."
75))]
76#[derive(Clone, Debug)]
77pub struct DatadogAgentConfig {
78 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82 #[configurable(metadata(docs::examples = "localhost:80"))]
83 address: SocketAddr,
84
85 #[configurable(metadata(docs::advanced))]
88 #[serde(default = "crate::serde::default_true")]
89 store_api_key: bool,
90
91 #[configurable(metadata(docs::advanced))]
93 #[serde(default = "crate::serde::default_false")]
94 disable_logs: bool,
95
96 #[configurable(metadata(docs::advanced))]
98 #[serde(default = "crate::serde::default_false")]
99 disable_metrics: bool,
100
101 #[configurable(metadata(docs::advanced))]
103 #[serde(default = "crate::serde::default_false")]
104 disable_traces: bool,
105
106 #[configurable(metadata(docs::advanced))]
113 #[serde(default = "crate::serde::default_false")]
114 multiple_outputs: bool,
115
116 #[configurable(metadata(docs::advanced))]
119 #[serde(default = "crate::serde::default_false")]
120 parse_ddtags: bool,
121
122 #[configurable(metadata(docs::advanced))]
127 #[serde(default = "crate::serde::default_true")]
128 split_metric_namespace: bool,
129
130 #[serde(default)]
132 #[configurable(metadata(docs::hidden))]
133 log_namespace: Option<bool>,
134
135 #[configurable(derived)]
136 tls: Option<TlsEnableableConfig>,
137
138 #[configurable(derived)]
139 #[serde(default = "default_framing_message_based")]
140 framing: FramingConfig,
141
142 #[configurable(derived)]
143 #[serde(default = "default_decoding")]
144 decoding: DeserializerConfig,
145
146 #[configurable(derived)]
147 #[serde(default, deserialize_with = "bool_or_struct")]
148 acknowledgements: SourceAcknowledgementsConfig,
149
150 #[configurable(derived)]
151 #[serde(default)]
152 keepalive: KeepaliveConfig,
153}
154
155impl GenerateConfig for DatadogAgentConfig {
156 fn generate_config() -> toml::Value {
157 toml::Value::try_from(Self {
158 address: "0.0.0.0:8080".parse().unwrap(),
159 tls: None,
160 store_api_key: true,
161 framing: default_framing_message_based(),
162 decoding: default_decoding(),
163 acknowledgements: SourceAcknowledgementsConfig::default(),
164 disable_logs: false,
165 disable_metrics: false,
166 disable_traces: false,
167 multiple_outputs: false,
168 parse_ddtags: false,
169 split_metric_namespace: true,
170 log_namespace: Some(false),
171 keepalive: KeepaliveConfig::default(),
172 })
173 .unwrap()
174 }
175}
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "datadog_agent")]
179impl SourceConfig for DatadogAgentConfig {
180 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
181 let log_namespace = cx.log_namespace(self.log_namespace);
182
183 let logs_schema_definition = cx
184 .schema_definitions
185 .get(&Some(LOGS.to_owned()))
186 .or_else(|| cx.schema_definitions.get(&None))
187 .cloned();
188
189 let decoder =
190 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
191 .build()?;
192
193 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
194 let source = DatadogAgentSource::new(
195 self.store_api_key,
196 decoder,
197 tls.http_protocol_name(),
198 logs_schema_definition,
199 log_namespace,
200 self.parse_ddtags,
201 self.split_metric_namespace,
202 );
203 let listener = tls.bind(&self.address).await?;
204 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
205 let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
206 let shutdown = cx.shutdown;
207 let keepalive_settings = self.keepalive.clone();
208
209 info!(message = "Building HTTP server.", address = %self.address);
210
211 Ok(Box::pin(async move {
212 let routes = filters.recover(|r: Rejection| async move {
213 if let Some(e_msg) = r.find::<ErrorMessage>() {
214 let json = warp::reply::json(e_msg);
215 Ok(warp::reply::with_status(json, e_msg.status_code()))
216 } else {
217 Err(r)
219 }
220 });
221
222 let span = Span::current();
223 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
224 let svc = ServiceBuilder::new()
225 .layer(build_http_trace_layer(span.clone()))
226 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
227 MaxConnectionAgeLayer::new(
228 Duration::from_secs(secs),
229 keepalive_settings.max_connection_age_jitter_factor,
230 conn.peer_addr(),
231 )
232 }))
233 .service(warp::service(routes.clone()));
234 futures_util::future::ok::<_, Infallible>(svc)
235 });
236
237 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
238 .serve(make_svc)
239 .with_graceful_shutdown(shutdown.map(|_| ()))
240 .await
241 .map_err(|err| {
242 error!("An error occurred: {:?}.", err);
243 })?;
244
245 Ok(())
246 }))
247 }
248
249 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
250 let definition = self
251 .decoding
252 .schema_definition(global_log_namespace.merge(self.log_namespace))
253 .with_source_metadata(
257 Self::NAME,
258 Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
259 &owned_value_path!("status"),
260 Kind::bytes(),
261 Some(meaning::SEVERITY),
262 )
263 .with_source_metadata(
264 Self::NAME,
265 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
266 &owned_value_path!("timestamp"),
267 Kind::timestamp(),
268 Some(meaning::TIMESTAMP),
269 )
270 .with_source_metadata(
271 Self::NAME,
272 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
273 &owned_value_path!("hostname"),
274 Kind::bytes(),
275 Some(meaning::HOST),
276 )
277 .with_source_metadata(
278 Self::NAME,
279 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
280 &owned_value_path!("service"),
281 Kind::bytes(),
282 Some(meaning::SERVICE),
283 )
284 .with_source_metadata(
285 Self::NAME,
286 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
287 &owned_value_path!("ddsource"),
288 Kind::bytes(),
289 Some(meaning::SOURCE),
290 )
291 .with_source_metadata(
292 Self::NAME,
293 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
294 &owned_value_path!("ddtags"),
295 if self.parse_ddtags {
296 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
297 } else {
298 Kind::bytes()
299 },
300 Some(meaning::TAGS),
301 )
302 .with_standard_vector_source_metadata();
303
304 let mut output = Vec::with_capacity(1);
305
306 if self.multiple_outputs {
307 if !self.disable_logs {
308 output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
309 }
310 if !self.disable_metrics {
311 output.push(SourceOutput::new_metrics().with_port(METRICS))
312 }
313 if !self.disable_traces {
314 output.push(SourceOutput::new_traces().with_port(TRACES))
315 }
316 } else {
317 output.push(SourceOutput::new_maybe_logs(
318 DataType::all_bits(),
319 definition,
320 ))
321 }
322 output
323 }
324
325 fn resources(&self) -> Vec<Resource> {
326 vec![Resource::tcp(self.address)]
327 }
328
329 fn can_acknowledge(&self) -> bool {
330 true
331 }
332}
333
334#[derive(Clone, Copy, Debug, Snafu)]
335pub(crate) enum ApiError {
336 ServerShutdown,
337}
338
339impl warp::reject::Reject for ApiError {}
340
341#[derive(Deserialize)]
342pub struct ApiKeyQueryParams {
343 #[serde(rename = "dd-api-key")]
344 pub dd_api_key: Option<String>,
345}
346
347#[derive(Clone)]
348pub(crate) struct DatadogAgentSource {
349 pub(crate) api_key_extractor: ApiKeyExtractor,
350 pub(crate) log_schema_host_key: OwnedTargetPath,
351 pub(crate) log_schema_source_type_key: OwnedTargetPath,
352 pub(crate) log_namespace: LogNamespace,
353 pub(crate) decoder: Decoder,
354 protocol: &'static str,
355 logs_schema_definition: Option<Arc<schema::Definition>>,
356 events_received: Registered<EventsReceived>,
357 parse_ddtags: bool,
358 split_metric_namespace: bool,
359}
360
361#[derive(Clone)]
362pub struct ApiKeyExtractor {
363 matcher: Regex,
364 store_api_key: bool,
365}
366
367impl ApiKeyExtractor {
368 pub fn extract(
369 &self,
370 path: &str,
371 header: Option<String>,
372 query_params: Option<String>,
373 ) -> Option<Arc<str>> {
374 if !self.store_api_key {
375 return None;
376 }
377 self.matcher
379 .captures(path)
380 .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
381 .or_else(|| query_params.map(Arc::from))
383 .or_else(|| header.map(Arc::from))
385 }
386}
387
388impl DatadogAgentSource {
389 pub(crate) fn new(
390 store_api_key: bool,
391 decoder: Decoder,
392 protocol: &'static str,
393 logs_schema_definition: Option<schema::Definition>,
394 log_namespace: LogNamespace,
395 parse_ddtags: bool,
396 split_metric_namespace: bool,
397 ) -> Self {
398 Self {
399 api_key_extractor: ApiKeyExtractor {
400 store_api_key,
401 matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
402 .expect("static regex always compiles"),
403 },
404 log_schema_host_key: log_schema()
405 .host_key_target_path()
406 .expect("global log_schema.host_key to be valid path")
407 .clone(),
408 log_schema_source_type_key: log_schema()
409 .source_type_key_target_path()
410 .expect("global log_schema.source_type_key to be valid path")
411 .clone(),
412 decoder,
413 protocol,
414 logs_schema_definition: logs_schema_definition.map(Arc::new),
415 log_namespace,
416 events_received: register!(EventsReceived),
417 parse_ddtags,
418 split_metric_namespace,
419 }
420 }
421
422 fn build_warp_filters(
423 &self,
424 out: SourceSender,
425 acknowledgements: bool,
426 config: &DatadogAgentConfig,
427 ) -> crate::Result<BoxedFilter<(Response,)>> {
428 let mut filters = (!config.disable_logs).then(|| {
429 logs::build_warp_filter(
430 acknowledgements,
431 config.multiple_outputs,
432 out.clone(),
433 self.clone(),
434 )
435 });
436
437 if !config.disable_traces {
438 let trace_filter = traces::build_warp_filter(
439 acknowledgements,
440 config.multiple_outputs,
441 out.clone(),
442 self.clone(),
443 );
444 filters = filters
445 .map(|f| f.or(trace_filter.clone()).unify().boxed())
446 .or(Some(trace_filter));
447 }
448
449 if !config.disable_metrics {
450 let metrics_filter = metrics::build_warp_filter(
451 acknowledgements,
452 config.multiple_outputs,
453 out,
454 self.clone(),
455 );
456 filters = filters
457 .map(|f| f.or(metrics_filter.clone()).unify().boxed())
458 .or(Some(metrics_filter));
459 }
460
461 filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
462 }
463
464 pub(crate) fn decode(
465 &self,
466 header: &Option<String>,
467 mut body: Bytes,
468 path: &str,
469 ) -> Result<Bytes, ErrorMessage> {
470 if let Some(encodings) = header {
471 for encoding in encodings.rsplit(',').map(str::trim) {
472 body = match encoding {
473 "identity" => body,
474 "gzip" | "x-gzip" => {
475 let mut decoded = Vec::new();
476 MultiGzDecoder::new(body.reader())
477 .read_to_end(&mut decoded)
478 .map_err(|error| emit_decompress_error(encoding, error))?;
479 decoded.into()
480 }
481 "zstd" => {
482 let mut decoded = Vec::new();
483 zstd::stream::copy_decode(body.reader(), &mut decoded)
484 .map_err(|error| emit_decompress_error(encoding, error))?;
485 decoded.into()
486 }
487 "deflate" | "x-deflate" => {
488 let mut decoded = Vec::new();
489 ZlibDecoder::new(body.reader())
490 .read_to_end(&mut decoded)
491 .map_err(|error| emit_decompress_error(encoding, error))?;
492 decoded.into()
493 }
494 encoding => {
495 return Err(ErrorMessage::new(
496 StatusCode::UNSUPPORTED_MEDIA_TYPE,
497 format!("Unsupported encoding {encoding}"),
498 ));
499 }
500 }
501 }
502 }
503 emit!(HttpBytesReceived {
504 byte_size: body.len(),
505 http_path: path,
506 protocol: self.protocol,
507 });
508 Ok(body)
509 }
510}
511
512pub(crate) async fn handle_request(
513 events: Result<Vec<Event>, ErrorMessage>,
514 acknowledgements: bool,
515 mut out: SourceSender,
516 output: Option<&str>,
517) -> Result<Response, Rejection> {
518 match events {
519 Ok(mut events) => {
520 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
521 let count = events.len();
522
523 if let Some(name) = output {
524 out.send_batch_named(name, events).await
525 } else {
526 out.send_batch(events).await
527 }
528 .map_err(|_| {
529 emit!(StreamClosedError { count });
530 warp::reject::custom(ApiError::ServerShutdown)
531 })?;
532 match receiver {
533 None => Ok(warp::reply().into_response()),
534 Some(receiver) => match receiver.await {
535 BatchStatus::Delivered => Ok(warp::reply().into_response()),
536 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
537 StatusCode::INTERNAL_SERVER_ERROR,
538 "Error delivering contents to sink".into(),
539 ))),
540 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
541 StatusCode::BAD_REQUEST,
542 "Contents failed to deliver to sink".into(),
543 ))),
544 },
545 }
546 }
547 Err(err) => Err(warp::reject::custom(err)),
548 }
549}
550
551#[derive(Clone, Debug, Deserialize, Serialize)]
553#[serde(deny_unknown_fields)]
554struct LogMsg {
555 pub message: Bytes,
556 pub status: Bytes,
557 #[serde(
558 deserialize_with = "ts_milliseconds::deserialize",
559 serialize_with = "ts_milliseconds::serialize"
560 )]
561 pub timestamp: DateTime<Utc>,
562 pub hostname: Bytes,
563 pub service: Bytes,
564 pub ddsource: Bytes,
565 pub ddtags: Bytes,
566}