1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17 include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::convert::Infallible;
21use std::time::Duration;
22use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc};
23
24use bytes::{Buf, Bytes};
25use chrono::{serde::ts_milliseconds, DateTime, Utc};
26use flate2::read::{MultiGzDecoder, ZlibDecoder};
27use futures::FutureExt;
28use http::StatusCode;
29use hyper::service::make_service_fn;
30use hyper::Server;
31use regex::Regex;
32use serde::{Deserialize, Serialize};
33use snafu::Snafu;
34use tokio::net::TcpStream;
35use tower::ServiceBuilder;
36use tracing::Span;
37use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
38use vector_lib::config::{LegacyKey, LogNamespace};
39use vector_lib::configurable::configurable_component;
40use vector_lib::event::{BatchNotifier, BatchStatus};
41use vector_lib::internal_event::{EventsReceived, Registered};
42use vector_lib::lookup::owned_value_path;
43use vector_lib::schema::meaning;
44use vector_lib::tls::MaybeTlsIncomingStream;
45use vrl::path::OwnedTargetPath;
46use vrl::value::kind::Collection;
47use vrl::value::Kind;
48use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply};
49
50use crate::common::http::ErrorMessage;
51use crate::http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer};
52use crate::{
53 codecs::{Decoder, DecodingConfig},
54 config::{
55 log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
56 SourceContext, SourceOutput,
57 },
58 event::Event,
59 internal_events::{HttpBytesReceived, HttpDecompressError, StreamClosedError},
60 schema,
61 serde::{bool_or_struct, default_decoding, default_framing_message_based},
62 sources::{self},
63 tls::{MaybeTlsSettings, TlsEnableableConfig},
64 SourceSender,
65};
66
67pub const LOGS: &str = "logs";
68pub const METRICS: &str = "metrics";
69pub const TRACES: &str = "traces";
70
71#[configurable_component(source(
73 "datadog_agent",
74 "Receive logs, metrics, and traces collected by a Datadog Agent."
75))]
76#[derive(Clone, Debug)]
77pub struct DatadogAgentConfig {
78 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82 #[configurable(metadata(docs::examples = "localhost:80"))]
83 address: SocketAddr,
84
85 #[configurable(metadata(docs::advanced))]
88 #[serde(default = "crate::serde::default_true")]
89 store_api_key: bool,
90
91 #[configurable(metadata(docs::advanced))]
93 #[serde(default = "crate::serde::default_false")]
94 disable_logs: bool,
95
96 #[configurable(metadata(docs::advanced))]
98 #[serde(default = "crate::serde::default_false")]
99 disable_metrics: bool,
100
101 #[configurable(metadata(docs::advanced))]
103 #[serde(default = "crate::serde::default_false")]
104 disable_traces: bool,
105
106 #[configurable(metadata(docs::advanced))]
113 #[serde(default = "crate::serde::default_false")]
114 multiple_outputs: bool,
115
116 #[configurable(metadata(docs::advanced))]
119 #[serde(default = "crate::serde::default_false")]
120 parse_ddtags: bool,
121
122 #[serde(default)]
124 #[configurable(metadata(docs::hidden))]
125 log_namespace: Option<bool>,
126
127 #[configurable(derived)]
128 tls: Option<TlsEnableableConfig>,
129
130 #[configurable(derived)]
131 #[serde(default = "default_framing_message_based")]
132 framing: FramingConfig,
133
134 #[configurable(derived)]
135 #[serde(default = "default_decoding")]
136 decoding: DeserializerConfig,
137
138 #[configurable(derived)]
139 #[serde(default, deserialize_with = "bool_or_struct")]
140 acknowledgements: SourceAcknowledgementsConfig,
141
142 #[configurable(derived)]
143 #[serde(default)]
144 keepalive: KeepaliveConfig,
145}
146
147impl GenerateConfig for DatadogAgentConfig {
148 fn generate_config() -> toml::Value {
149 toml::Value::try_from(Self {
150 address: "0.0.0.0:8080".parse().unwrap(),
151 tls: None,
152 store_api_key: true,
153 framing: default_framing_message_based(),
154 decoding: default_decoding(),
155 acknowledgements: SourceAcknowledgementsConfig::default(),
156 disable_logs: false,
157 disable_metrics: false,
158 disable_traces: false,
159 multiple_outputs: false,
160 parse_ddtags: false,
161 log_namespace: Some(false),
162 keepalive: KeepaliveConfig::default(),
163 })
164 .unwrap()
165 }
166}
167
168#[async_trait::async_trait]
169#[typetag::serde(name = "datadog_agent")]
170impl SourceConfig for DatadogAgentConfig {
171 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
172 let log_namespace = cx.log_namespace(self.log_namespace);
173
174 let logs_schema_definition = cx
175 .schema_definitions
176 .get(&Some(LOGS.to_owned()))
177 .or_else(|| cx.schema_definitions.get(&None))
178 .cloned();
179
180 let decoder =
181 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
182 .build()?;
183
184 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
185 let source = DatadogAgentSource::new(
186 self.store_api_key,
187 decoder,
188 tls.http_protocol_name(),
189 logs_schema_definition,
190 log_namespace,
191 self.parse_ddtags,
192 );
193 let listener = tls.bind(&self.address).await?;
194 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
195 let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
196 let shutdown = cx.shutdown;
197 let keepalive_settings = self.keepalive.clone();
198
199 info!(message = "Building HTTP server.", address = %self.address);
200
201 Ok(Box::pin(async move {
202 let routes = filters.recover(|r: Rejection| async move {
203 if let Some(e_msg) = r.find::<ErrorMessage>() {
204 let json = warp::reply::json(e_msg);
205 Ok(warp::reply::with_status(json, e_msg.status_code()))
206 } else {
207 Err(r)
209 }
210 });
211
212 let span = Span::current();
213 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
214 let svc = ServiceBuilder::new()
215 .layer(build_http_trace_layer(span.clone()))
216 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
217 MaxConnectionAgeLayer::new(
218 Duration::from_secs(secs),
219 keepalive_settings.max_connection_age_jitter_factor,
220 conn.peer_addr(),
221 )
222 }))
223 .service(warp::service(routes.clone()));
224 futures_util::future::ok::<_, Infallible>(svc)
225 });
226
227 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
228 .serve(make_svc)
229 .with_graceful_shutdown(shutdown.map(|_| ()))
230 .await
231 .map_err(|err| {
232 error!("An error occurred: {:?}.", err);
233 })?;
234
235 Ok(())
236 }))
237 }
238
239 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
240 let definition = self
241 .decoding
242 .schema_definition(global_log_namespace.merge(self.log_namespace))
243 .with_source_metadata(
247 Self::NAME,
248 Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
249 &owned_value_path!("status"),
250 Kind::bytes(),
251 Some(meaning::SEVERITY),
252 )
253 .with_source_metadata(
254 Self::NAME,
255 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
256 &owned_value_path!("timestamp"),
257 Kind::timestamp(),
258 Some(meaning::TIMESTAMP),
259 )
260 .with_source_metadata(
261 Self::NAME,
262 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
263 &owned_value_path!("hostname"),
264 Kind::bytes(),
265 Some(meaning::HOST),
266 )
267 .with_source_metadata(
268 Self::NAME,
269 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
270 &owned_value_path!("service"),
271 Kind::bytes(),
272 Some(meaning::SERVICE),
273 )
274 .with_source_metadata(
275 Self::NAME,
276 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
277 &owned_value_path!("ddsource"),
278 Kind::bytes(),
279 Some(meaning::SOURCE),
280 )
281 .with_source_metadata(
282 Self::NAME,
283 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
284 &owned_value_path!("ddtags"),
285 if self.parse_ddtags {
286 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
287 } else {
288 Kind::bytes()
289 },
290 Some(meaning::TAGS),
291 )
292 .with_standard_vector_source_metadata();
293
294 let mut output = Vec::with_capacity(1);
295
296 if self.multiple_outputs {
297 if !self.disable_logs {
298 output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
299 }
300 if !self.disable_metrics {
301 output.push(SourceOutput::new_metrics().with_port(METRICS))
302 }
303 if !self.disable_traces {
304 output.push(SourceOutput::new_traces().with_port(TRACES))
305 }
306 } else {
307 output.push(SourceOutput::new_maybe_logs(
308 DataType::all_bits(),
309 definition,
310 ))
311 }
312 output
313 }
314
315 fn resources(&self) -> Vec<Resource> {
316 vec![Resource::tcp(self.address)]
317 }
318
319 fn can_acknowledge(&self) -> bool {
320 true
321 }
322}
323
324#[derive(Clone, Copy, Debug, Snafu)]
325pub(crate) enum ApiError {
326 ServerShutdown,
327}
328
329impl warp::reject::Reject for ApiError {}
330
331#[derive(Deserialize)]
332pub struct ApiKeyQueryParams {
333 #[serde(rename = "dd-api-key")]
334 pub dd_api_key: Option<String>,
335}
336
337#[derive(Clone)]
338pub(crate) struct DatadogAgentSource {
339 pub(crate) api_key_extractor: ApiKeyExtractor,
340 pub(crate) log_schema_host_key: OwnedTargetPath,
341 pub(crate) log_schema_source_type_key: OwnedTargetPath,
342 pub(crate) log_namespace: LogNamespace,
343 pub(crate) decoder: Decoder,
344 protocol: &'static str,
345 logs_schema_definition: Option<Arc<schema::Definition>>,
346 events_received: Registered<EventsReceived>,
347 parse_ddtags: bool,
348}
349
350#[derive(Clone)]
351pub struct ApiKeyExtractor {
352 matcher: Regex,
353 store_api_key: bool,
354}
355
356impl ApiKeyExtractor {
357 pub fn extract(
358 &self,
359 path: &str,
360 header: Option<String>,
361 query_params: Option<String>,
362 ) -> Option<Arc<str>> {
363 if !self.store_api_key {
364 return None;
365 }
366 self.matcher
368 .captures(path)
369 .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
370 .or_else(|| query_params.map(Arc::from))
372 .or_else(|| header.map(Arc::from))
374 }
375}
376
377impl DatadogAgentSource {
378 pub(crate) fn new(
379 store_api_key: bool,
380 decoder: Decoder,
381 protocol: &'static str,
382 logs_schema_definition: Option<schema::Definition>,
383 log_namespace: LogNamespace,
384 parse_ddtags: bool,
385 ) -> Self {
386 Self {
387 api_key_extractor: ApiKeyExtractor {
388 store_api_key,
389 matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
390 .expect("static regex always compiles"),
391 },
392 log_schema_host_key: log_schema()
393 .host_key_target_path()
394 .expect("global log_schema.host_key to be valid path")
395 .clone(),
396 log_schema_source_type_key: log_schema()
397 .source_type_key_target_path()
398 .expect("global log_schema.source_type_key to be valid path")
399 .clone(),
400 decoder,
401 protocol,
402 logs_schema_definition: logs_schema_definition.map(Arc::new),
403 log_namespace,
404 events_received: register!(EventsReceived),
405 parse_ddtags,
406 }
407 }
408
409 fn build_warp_filters(
410 &self,
411 out: SourceSender,
412 acknowledgements: bool,
413 config: &DatadogAgentConfig,
414 ) -> crate::Result<BoxedFilter<(Response,)>> {
415 let mut filters = (!config.disable_logs).then(|| {
416 logs::build_warp_filter(
417 acknowledgements,
418 config.multiple_outputs,
419 out.clone(),
420 self.clone(),
421 )
422 });
423
424 if !config.disable_traces {
425 let trace_filter = traces::build_warp_filter(
426 acknowledgements,
427 config.multiple_outputs,
428 out.clone(),
429 self.clone(),
430 );
431 filters = filters
432 .map(|f| f.or(trace_filter.clone()).unify().boxed())
433 .or(Some(trace_filter));
434 }
435
436 if !config.disable_metrics {
437 let metrics_filter = metrics::build_warp_filter(
438 acknowledgements,
439 config.multiple_outputs,
440 out,
441 self.clone(),
442 );
443 filters = filters
444 .map(|f| f.or(metrics_filter.clone()).unify().boxed())
445 .or(Some(metrics_filter));
446 }
447
448 filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
449 }
450
451 pub(crate) fn decode(
452 &self,
453 header: &Option<String>,
454 mut body: Bytes,
455 path: &str,
456 ) -> Result<Bytes, ErrorMessage> {
457 if let Some(encodings) = header {
458 for encoding in encodings.rsplit(',').map(str::trim) {
459 body = match encoding {
460 "identity" => body,
461 "gzip" | "x-gzip" => {
462 let mut decoded = Vec::new();
463 MultiGzDecoder::new(body.reader())
464 .read_to_end(&mut decoded)
465 .map_err(|error| handle_decode_error(encoding, error))?;
466 decoded.into()
467 }
468 "zstd" => {
469 let mut decoded = Vec::new();
470 zstd::stream::copy_decode(body.reader(), &mut decoded)
471 .map_err(|error| handle_decode_error(encoding, error))?;
472 decoded.into()
473 }
474 "deflate" | "x-deflate" => {
475 let mut decoded = Vec::new();
476 ZlibDecoder::new(body.reader())
477 .read_to_end(&mut decoded)
478 .map_err(|error| handle_decode_error(encoding, error))?;
479 decoded.into()
480 }
481 encoding => {
482 return Err(ErrorMessage::new(
483 StatusCode::UNSUPPORTED_MEDIA_TYPE,
484 format!("Unsupported encoding {encoding}"),
485 ))
486 }
487 }
488 }
489 }
490 emit!(HttpBytesReceived {
491 byte_size: body.len(),
492 http_path: path,
493 protocol: self.protocol,
494 });
495 Ok(body)
496 }
497}
498
499pub(crate) async fn handle_request(
500 events: Result<Vec<Event>, ErrorMessage>,
501 acknowledgements: bool,
502 mut out: SourceSender,
503 output: Option<&str>,
504) -> Result<Response, Rejection> {
505 match events {
506 Ok(mut events) => {
507 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
508 let count = events.len();
509
510 if let Some(name) = output {
511 out.send_batch_named(name, events).await
512 } else {
513 out.send_batch(events).await
514 }
515 .map_err(|_| {
516 emit!(StreamClosedError { count });
517 warp::reject::custom(ApiError::ServerShutdown)
518 })?;
519 match receiver {
520 None => Ok(warp::reply().into_response()),
521 Some(receiver) => match receiver.await {
522 BatchStatus::Delivered => Ok(warp::reply().into_response()),
523 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
524 StatusCode::INTERNAL_SERVER_ERROR,
525 "Error delivering contents to sink".into(),
526 ))),
527 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
528 StatusCode::BAD_REQUEST,
529 "Contents failed to deliver to sink".into(),
530 ))),
531 },
532 }
533 }
534 Err(err) => Err(warp::reject::custom(err)),
535 }
536}
537
538fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
539 emit!(HttpDecompressError {
540 encoding,
541 error: &error
542 });
543 ErrorMessage::new(
544 StatusCode::UNPROCESSABLE_ENTITY,
545 format!("Failed decompressing payload with {encoding} decoder."),
546 )
547}
548
549#[derive(Clone, Debug, Deserialize, Serialize)]
551#[serde(deny_unknown_fields)]
552struct LogMsg {
553 pub message: Bytes,
554 pub status: Bytes,
555 #[serde(
556 deserialize_with = "ts_milliseconds::deserialize",
557 serialize_with = "ts_milliseconds::serialize"
558 )]
559 pub timestamp: DateTime<Utc>,
560 pub hostname: Bytes,
561 pub service: Bytes,
562 pub ddsource: Bytes,
563 pub ddtags: Bytes,
564}