vector/sources/datadog_agent/
mod.rs

1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17    include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::convert::Infallible;
21use std::time::Duration;
22use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc};
23
24use bytes::{Buf, Bytes};
25use chrono::{serde::ts_milliseconds, DateTime, Utc};
26use flate2::read::{MultiGzDecoder, ZlibDecoder};
27use futures::FutureExt;
28use http::StatusCode;
29use hyper::service::make_service_fn;
30use hyper::Server;
31use regex::Regex;
32use serde::{Deserialize, Serialize};
33use snafu::Snafu;
34use tokio::net::TcpStream;
35use tower::ServiceBuilder;
36use tracing::Span;
37use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
38use vector_lib::config::{LegacyKey, LogNamespace};
39use vector_lib::configurable::configurable_component;
40use vector_lib::event::{BatchNotifier, BatchStatus};
41use vector_lib::internal_event::{EventsReceived, Registered};
42use vector_lib::lookup::owned_value_path;
43use vector_lib::schema::meaning;
44use vector_lib::tls::MaybeTlsIncomingStream;
45use vrl::path::OwnedTargetPath;
46use vrl::value::kind::Collection;
47use vrl::value::Kind;
48use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Reply};
49
50use crate::common::http::ErrorMessage;
51use crate::http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer};
52use crate::{
53    codecs::{Decoder, DecodingConfig},
54    config::{
55        log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
56        SourceContext, SourceOutput,
57    },
58    event::Event,
59    internal_events::{HttpBytesReceived, HttpDecompressError, StreamClosedError},
60    schema,
61    serde::{bool_or_struct, default_decoding, default_framing_message_based},
62    sources::{self},
63    tls::{MaybeTlsSettings, TlsEnableableConfig},
64    SourceSender,
65};
66
67pub const LOGS: &str = "logs";
68pub const METRICS: &str = "metrics";
69pub const TRACES: &str = "traces";
70
71/// Configuration for the `datadog_agent` source.
72#[configurable_component(source(
73    "datadog_agent",
74    "Receive logs, metrics, and traces collected by a Datadog Agent."
75))]
76#[derive(Clone, Debug)]
77pub struct DatadogAgentConfig {
78    /// The socket address to accept connections on.
79    ///
80    /// It _must_ include a port.
81    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82    #[configurable(metadata(docs::examples = "localhost:80"))]
83    address: SocketAddr,
84
85    /// If this is set to `true`, when incoming events contain a Datadog API key, it is
86    /// stored in the event metadata and used if the event is sent to a Datadog sink.
87    #[configurable(metadata(docs::advanced))]
88    #[serde(default = "crate::serde::default_true")]
89    store_api_key: bool,
90
91    /// If this is set to `true`, logs are not accepted by the component.
92    #[configurable(metadata(docs::advanced))]
93    #[serde(default = "crate::serde::default_false")]
94    disable_logs: bool,
95
96    /// If this is set to `true`, metrics (beta) are not accepted by the component.
97    #[configurable(metadata(docs::advanced))]
98    #[serde(default = "crate::serde::default_false")]
99    disable_metrics: bool,
100
101    /// If this is set to `true`, traces (alpha) are not accepted by the component.
102    #[configurable(metadata(docs::advanced))]
103    #[serde(default = "crate::serde::default_false")]
104    disable_traces: bool,
105
106    /// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs.
107    ///
108    ///
109    /// For a source component named `agent`, the received logs, metrics (beta), and traces (alpha) can then be
110    /// configured as input to other components by specifying `agent.logs`, `agent.metrics`, and
111    /// `agent.traces`, respectively.
112    #[configurable(metadata(docs::advanced))]
113    #[serde(default = "crate::serde::default_false")]
114    multiple_outputs: bool,
115
116    /// If this is set to `true`, when log events contain the field `ddtags`, the string value that
117    /// contains a list of key:value pairs set by the Agent is parsed and expanded into an array.
118    #[configurable(metadata(docs::advanced))]
119    #[serde(default = "crate::serde::default_false")]
120    parse_ddtags: bool,
121
122    /// The namespace to use for logs. This overrides the global setting.
123    #[serde(default)]
124    #[configurable(metadata(docs::hidden))]
125    log_namespace: Option<bool>,
126
127    #[configurable(derived)]
128    tls: Option<TlsEnableableConfig>,
129
130    #[configurable(derived)]
131    #[serde(default = "default_framing_message_based")]
132    framing: FramingConfig,
133
134    #[configurable(derived)]
135    #[serde(default = "default_decoding")]
136    decoding: DeserializerConfig,
137
138    #[configurable(derived)]
139    #[serde(default, deserialize_with = "bool_or_struct")]
140    acknowledgements: SourceAcknowledgementsConfig,
141
142    #[configurable(derived)]
143    #[serde(default)]
144    keepalive: KeepaliveConfig,
145}
146
147impl GenerateConfig for DatadogAgentConfig {
148    fn generate_config() -> toml::Value {
149        toml::Value::try_from(Self {
150            address: "0.0.0.0:8080".parse().unwrap(),
151            tls: None,
152            store_api_key: true,
153            framing: default_framing_message_based(),
154            decoding: default_decoding(),
155            acknowledgements: SourceAcknowledgementsConfig::default(),
156            disable_logs: false,
157            disable_metrics: false,
158            disable_traces: false,
159            multiple_outputs: false,
160            parse_ddtags: false,
161            log_namespace: Some(false),
162            keepalive: KeepaliveConfig::default(),
163        })
164        .unwrap()
165    }
166}
167
168#[async_trait::async_trait]
169#[typetag::serde(name = "datadog_agent")]
170impl SourceConfig for DatadogAgentConfig {
171    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
172        let log_namespace = cx.log_namespace(self.log_namespace);
173
174        let logs_schema_definition = cx
175            .schema_definitions
176            .get(&Some(LOGS.to_owned()))
177            .or_else(|| cx.schema_definitions.get(&None))
178            .cloned();
179
180        let decoder =
181            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
182                .build()?;
183
184        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
185        let source = DatadogAgentSource::new(
186            self.store_api_key,
187            decoder,
188            tls.http_protocol_name(),
189            logs_schema_definition,
190            log_namespace,
191            self.parse_ddtags,
192        );
193        let listener = tls.bind(&self.address).await?;
194        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
195        let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
196        let shutdown = cx.shutdown;
197        let keepalive_settings = self.keepalive.clone();
198
199        info!(message = "Building HTTP server.", address = %self.address);
200
201        Ok(Box::pin(async move {
202            let routes = filters.recover(|r: Rejection| async move {
203                if let Some(e_msg) = r.find::<ErrorMessage>() {
204                    let json = warp::reply::json(e_msg);
205                    Ok(warp::reply::with_status(json, e_msg.status_code()))
206                } else {
207                    // other internal error - will return 500 internal server error
208                    Err(r)
209                }
210            });
211
212            let span = Span::current();
213            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
214                let svc = ServiceBuilder::new()
215                    .layer(build_http_trace_layer(span.clone()))
216                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
217                        MaxConnectionAgeLayer::new(
218                            Duration::from_secs(secs),
219                            keepalive_settings.max_connection_age_jitter_factor,
220                            conn.peer_addr(),
221                        )
222                    }))
223                    .service(warp::service(routes.clone()));
224                futures_util::future::ok::<_, Infallible>(svc)
225            });
226
227            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
228                .serve(make_svc)
229                .with_graceful_shutdown(shutdown.map(|_| ()))
230                .await
231                .map_err(|err| {
232                    error!("An error occurred: {:?}.", err);
233                })?;
234
235            Ok(())
236        }))
237    }
238
239    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
240        let definition = self
241            .decoding
242            .schema_definition(global_log_namespace.merge(self.log_namespace))
243            // NOTE: "status" is intentionally semantically mapped to "severity",
244            //       since that is what DD designates as the semantic meaning of status
245            // https://docs.datadoghq.com/logs/log_configuration/attributes_naming_convention/?s=severity#reserved-attributes
246            .with_source_metadata(
247                Self::NAME,
248                Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
249                &owned_value_path!("status"),
250                Kind::bytes(),
251                Some(meaning::SEVERITY),
252            )
253            .with_source_metadata(
254                Self::NAME,
255                Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
256                &owned_value_path!("timestamp"),
257                Kind::timestamp(),
258                Some(meaning::TIMESTAMP),
259            )
260            .with_source_metadata(
261                Self::NAME,
262                Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
263                &owned_value_path!("hostname"),
264                Kind::bytes(),
265                Some(meaning::HOST),
266            )
267            .with_source_metadata(
268                Self::NAME,
269                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
270                &owned_value_path!("service"),
271                Kind::bytes(),
272                Some(meaning::SERVICE),
273            )
274            .with_source_metadata(
275                Self::NAME,
276                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
277                &owned_value_path!("ddsource"),
278                Kind::bytes(),
279                Some(meaning::SOURCE),
280            )
281            .with_source_metadata(
282                Self::NAME,
283                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
284                &owned_value_path!("ddtags"),
285                if self.parse_ddtags {
286                    Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
287                } else {
288                    Kind::bytes()
289                },
290                Some(meaning::TAGS),
291            )
292            .with_standard_vector_source_metadata();
293
294        let mut output = Vec::with_capacity(1);
295
296        if self.multiple_outputs {
297            if !self.disable_logs {
298                output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
299            }
300            if !self.disable_metrics {
301                output.push(SourceOutput::new_metrics().with_port(METRICS))
302            }
303            if !self.disable_traces {
304                output.push(SourceOutput::new_traces().with_port(TRACES))
305            }
306        } else {
307            output.push(SourceOutput::new_maybe_logs(
308                DataType::all_bits(),
309                definition,
310            ))
311        }
312        output
313    }
314
315    fn resources(&self) -> Vec<Resource> {
316        vec![Resource::tcp(self.address)]
317    }
318
319    fn can_acknowledge(&self) -> bool {
320        true
321    }
322}
323
324#[derive(Clone, Copy, Debug, Snafu)]
325pub(crate) enum ApiError {
326    ServerShutdown,
327}
328
329impl warp::reject::Reject for ApiError {}
330
331#[derive(Deserialize)]
332pub struct ApiKeyQueryParams {
333    #[serde(rename = "dd-api-key")]
334    pub dd_api_key: Option<String>,
335}
336
337#[derive(Clone)]
338pub(crate) struct DatadogAgentSource {
339    pub(crate) api_key_extractor: ApiKeyExtractor,
340    pub(crate) log_schema_host_key: OwnedTargetPath,
341    pub(crate) log_schema_source_type_key: OwnedTargetPath,
342    pub(crate) log_namespace: LogNamespace,
343    pub(crate) decoder: Decoder,
344    protocol: &'static str,
345    logs_schema_definition: Option<Arc<schema::Definition>>,
346    events_received: Registered<EventsReceived>,
347    parse_ddtags: bool,
348}
349
350#[derive(Clone)]
351pub struct ApiKeyExtractor {
352    matcher: Regex,
353    store_api_key: bool,
354}
355
356impl ApiKeyExtractor {
357    pub fn extract(
358        &self,
359        path: &str,
360        header: Option<String>,
361        query_params: Option<String>,
362    ) -> Option<Arc<str>> {
363        if !self.store_api_key {
364            return None;
365        }
366        // Grab from URL first
367        self.matcher
368            .captures(path)
369            .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
370            // Try from query params
371            .or_else(|| query_params.map(Arc::from))
372            // Try from header next
373            .or_else(|| header.map(Arc::from))
374    }
375}
376
377impl DatadogAgentSource {
378    pub(crate) fn new(
379        store_api_key: bool,
380        decoder: Decoder,
381        protocol: &'static str,
382        logs_schema_definition: Option<schema::Definition>,
383        log_namespace: LogNamespace,
384        parse_ddtags: bool,
385    ) -> Self {
386        Self {
387            api_key_extractor: ApiKeyExtractor {
388                store_api_key,
389                matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
390                    .expect("static regex always compiles"),
391            },
392            log_schema_host_key: log_schema()
393                .host_key_target_path()
394                .expect("global log_schema.host_key to be valid path")
395                .clone(),
396            log_schema_source_type_key: log_schema()
397                .source_type_key_target_path()
398                .expect("global log_schema.source_type_key to be valid path")
399                .clone(),
400            decoder,
401            protocol,
402            logs_schema_definition: logs_schema_definition.map(Arc::new),
403            log_namespace,
404            events_received: register!(EventsReceived),
405            parse_ddtags,
406        }
407    }
408
409    fn build_warp_filters(
410        &self,
411        out: SourceSender,
412        acknowledgements: bool,
413        config: &DatadogAgentConfig,
414    ) -> crate::Result<BoxedFilter<(Response,)>> {
415        let mut filters = (!config.disable_logs).then(|| {
416            logs::build_warp_filter(
417                acknowledgements,
418                config.multiple_outputs,
419                out.clone(),
420                self.clone(),
421            )
422        });
423
424        if !config.disable_traces {
425            let trace_filter = traces::build_warp_filter(
426                acknowledgements,
427                config.multiple_outputs,
428                out.clone(),
429                self.clone(),
430            );
431            filters = filters
432                .map(|f| f.or(trace_filter.clone()).unify().boxed())
433                .or(Some(trace_filter));
434        }
435
436        if !config.disable_metrics {
437            let metrics_filter = metrics::build_warp_filter(
438                acknowledgements,
439                config.multiple_outputs,
440                out,
441                self.clone(),
442            );
443            filters = filters
444                .map(|f| f.or(metrics_filter.clone()).unify().boxed())
445                .or(Some(metrics_filter));
446        }
447
448        filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
449    }
450
451    pub(crate) fn decode(
452        &self,
453        header: &Option<String>,
454        mut body: Bytes,
455        path: &str,
456    ) -> Result<Bytes, ErrorMessage> {
457        if let Some(encodings) = header {
458            for encoding in encodings.rsplit(',').map(str::trim) {
459                body = match encoding {
460                    "identity" => body,
461                    "gzip" | "x-gzip" => {
462                        let mut decoded = Vec::new();
463                        MultiGzDecoder::new(body.reader())
464                            .read_to_end(&mut decoded)
465                            .map_err(|error| handle_decode_error(encoding, error))?;
466                        decoded.into()
467                    }
468                    "zstd" => {
469                        let mut decoded = Vec::new();
470                        zstd::stream::copy_decode(body.reader(), &mut decoded)
471                            .map_err(|error| handle_decode_error(encoding, error))?;
472                        decoded.into()
473                    }
474                    "deflate" | "x-deflate" => {
475                        let mut decoded = Vec::new();
476                        ZlibDecoder::new(body.reader())
477                            .read_to_end(&mut decoded)
478                            .map_err(|error| handle_decode_error(encoding, error))?;
479                        decoded.into()
480                    }
481                    encoding => {
482                        return Err(ErrorMessage::new(
483                            StatusCode::UNSUPPORTED_MEDIA_TYPE,
484                            format!("Unsupported encoding {encoding}"),
485                        ))
486                    }
487                }
488            }
489        }
490        emit!(HttpBytesReceived {
491            byte_size: body.len(),
492            http_path: path,
493            protocol: self.protocol,
494        });
495        Ok(body)
496    }
497}
498
499pub(crate) async fn handle_request(
500    events: Result<Vec<Event>, ErrorMessage>,
501    acknowledgements: bool,
502    mut out: SourceSender,
503    output: Option<&str>,
504) -> Result<Response, Rejection> {
505    match events {
506        Ok(mut events) => {
507            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
508            let count = events.len();
509
510            if let Some(name) = output {
511                out.send_batch_named(name, events).await
512            } else {
513                out.send_batch(events).await
514            }
515            .map_err(|_| {
516                emit!(StreamClosedError { count });
517                warp::reject::custom(ApiError::ServerShutdown)
518            })?;
519            match receiver {
520                None => Ok(warp::reply().into_response()),
521                Some(receiver) => match receiver.await {
522                    BatchStatus::Delivered => Ok(warp::reply().into_response()),
523                    BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
524                        StatusCode::INTERNAL_SERVER_ERROR,
525                        "Error delivering contents to sink".into(),
526                    ))),
527                    BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
528                        StatusCode::BAD_REQUEST,
529                        "Contents failed to deliver to sink".into(),
530                    ))),
531                },
532            }
533        }
534        Err(err) => Err(warp::reject::custom(err)),
535    }
536}
537
538fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
539    emit!(HttpDecompressError {
540        encoding,
541        error: &error
542    });
543    ErrorMessage::new(
544        StatusCode::UNPROCESSABLE_ENTITY,
545        format!("Failed decompressing payload with {encoding} decoder."),
546    )
547}
548
549// https://github.com/DataDog/datadog-agent/blob/a33248c2bc125920a9577af1e16f12298875a4ad/pkg/logs/processor/json.go#L23-L49
550#[derive(Clone, Debug, Deserialize, Serialize)]
551#[serde(deny_unknown_fields)]
552struct LogMsg {
553    pub message: Bytes,
554    pub status: Bytes,
555    #[serde(
556        deserialize_with = "ts_milliseconds::deserialize",
557        serialize_with = "ts_milliseconds::serialize"
558    )]
559    pub timestamp: DateTime<Utc>,
560    pub hostname: Bytes,
561    pub service: Bytes,
562    pub ddsource: Bytes,
563    pub ddtags: Bytes,
564}