vector/sources/datadog_agent/
mod.rs

1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17    include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::{convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc, time::Duration};
21
22use bytes::{Buf, Bytes};
23use chrono::{DateTime, Utc, serde::ts_milliseconds};
24use flate2::read::{MultiGzDecoder, ZlibDecoder};
25use futures::FutureExt;
26use http::StatusCode;
27use hyper::{Server, service::make_service_fn};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use serde_with::serde_as;
31use snafu::Snafu;
32use tokio::net::TcpStream;
33use tower::ServiceBuilder;
34use tracing::Span;
35use vector_lib::{
36    codecs::decoding::{DeserializerConfig, FramingConfig},
37    config::{LegacyKey, LogNamespace},
38    configurable::configurable_component,
39    event::{BatchNotifier, BatchStatus},
40    internal_event::{EventsReceived, Registered},
41    lookup::owned_value_path,
42    schema::meaning,
43    source_sender::SendError,
44    tls::MaybeTlsIncomingStream,
45};
46use vrl::{
47    path::OwnedTargetPath,
48    value::{Kind, kind::Collection},
49};
50use warp::{Filter, Reply, filters::BoxedFilter, reject::Rejection, reply::Response};
51
52use crate::{
53    SourceSender,
54    codecs::{Decoder, DecodingConfig},
55    common::http::ErrorMessage,
56    config::{
57        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
58        SourceContext, SourceOutput, log_schema,
59    },
60    event::Event,
61    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
62    internal_events::{HttpBytesReceived, StreamClosedError},
63    schema,
64    serde::{bool_or_struct, default_decoding, default_framing_message_based},
65    sources::{self, util::http::emit_decompress_error},
66    tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69pub const LOGS: &str = "logs";
70pub const METRICS: &str = "metrics";
71pub const TRACES: &str = "traces";
72
73/// Configuration for the `datadog_agent` source.
74#[configurable_component(source(
75    "datadog_agent",
76    "Receive logs, metrics, and traces collected by a Datadog Agent."
77))]
78#[serde_as]
79#[derive(Clone, Debug)]
80pub struct DatadogAgentConfig {
81    /// The socket address to accept connections on.
82    ///
83    /// It _must_ include a port.
84    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
85    #[configurable(metadata(docs::examples = "localhost:80"))]
86    address: SocketAddr,
87
88    /// If this is set to `true`, when incoming events contain a Datadog API key, it is
89    /// stored in the event metadata and used if the event is sent to a Datadog sink.
90    #[configurable(metadata(docs::advanced))]
91    #[serde(default = "crate::serde::default_true")]
92    store_api_key: bool,
93
94    /// If this is set to `true`, logs are not accepted by the component.
95    #[configurable(metadata(docs::advanced))]
96    #[serde(default = "crate::serde::default_false")]
97    disable_logs: bool,
98
99    /// If this is set to `true`, metrics (beta) are not accepted by the component.
100    #[configurable(metadata(docs::advanced))]
101    #[serde(default = "crate::serde::default_false")]
102    disable_metrics: bool,
103
104    /// If this is set to `true`, traces (alpha) are not accepted by the component.
105    #[configurable(metadata(docs::advanced))]
106    #[serde(default = "crate::serde::default_false")]
107    disable_traces: bool,
108
109    /// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs.
110    ///
111    ///
112    /// For a source component named `agent`, the received logs, metrics (beta), and traces (alpha) can then be
113    /// configured as input to other components by specifying `agent.logs`, `agent.metrics`, and
114    /// `agent.traces`, respectively.
115    #[configurable(metadata(docs::advanced))]
116    #[serde(default = "crate::serde::default_false")]
117    multiple_outputs: bool,
118
119    /// If this is set to `true`, when log events contain the field `ddtags`, the string value that
120    /// contains a list of key:value pairs set by the Agent is parsed and expanded into an array.
121    #[configurable(metadata(docs::advanced))]
122    #[serde(default = "crate::serde::default_false")]
123    parse_ddtags: bool,
124
125    /// If this is set to `true`, metric names are split at the first '.' into a namespace and name.
126    /// For example, `system.cpu.usage` would be split into namespace `system` and name `cpu.usage`.
127    /// If `false`, the full metric name is used without splitting. This may be useful if you are using a
128    /// default namespace for metrics in sinks connected to this source.
129    #[configurable(metadata(docs::advanced))]
130    #[serde(default = "crate::serde::default_true")]
131    split_metric_namespace: bool,
132
133    /// The namespace to use for logs. This overrides the global setting.
134    #[serde(default)]
135    #[configurable(metadata(docs::hidden))]
136    log_namespace: Option<bool>,
137
138    #[configurable(derived)]
139    tls: Option<TlsEnableableConfig>,
140
141    #[configurable(derived)]
142    #[serde(default = "default_framing_message_based")]
143    framing: FramingConfig,
144
145    #[configurable(derived)]
146    #[serde(default = "default_decoding")]
147    decoding: DeserializerConfig,
148
149    #[configurable(derived)]
150    #[serde(default, deserialize_with = "bool_or_struct")]
151    acknowledgements: SourceAcknowledgementsConfig,
152
153    #[configurable(derived)]
154    #[serde(default)]
155    keepalive: KeepaliveConfig,
156
157    /// The timeout before responding to requests with a HTTP 503 Service Unavailable error.
158    ///
159    /// If not set, responses to completed requests will block indefinitely until connected
160    /// transforms or sinks are ready to receive the events. When this happens, the sending Datadog
161    /// Agent will eventually time out the request and drop the connection, resulting Vector
162    /// generating an "Events dropped." error and incrementing the `component_discarded_events_total`
163    /// internal metric. By setting this option to a value less than the Agent's timeout, Vector
164    /// will instead respond to the Agent with a HTTP 503 Service Unavailable error, emit a warning,
165    /// and increment the `component_timed_out_events_total` internal metric instead.
166    #[serde_as(as = "Option<serde_with::DurationSecondsWithFrac<f64>>")]
167    send_timeout_secs: Option<f64>,
168}
169
170impl GenerateConfig for DatadogAgentConfig {
171    fn generate_config() -> toml::Value {
172        toml::Value::try_from(Self {
173            address: "0.0.0.0:8080".parse().unwrap(),
174            tls: None,
175            store_api_key: true,
176            framing: default_framing_message_based(),
177            decoding: default_decoding(),
178            acknowledgements: SourceAcknowledgementsConfig::default(),
179            disable_logs: false,
180            disable_metrics: false,
181            disable_traces: false,
182            multiple_outputs: false,
183            parse_ddtags: false,
184            split_metric_namespace: true,
185            log_namespace: Some(false),
186            keepalive: KeepaliveConfig::default(),
187            send_timeout_secs: None,
188        })
189        .unwrap()
190    }
191}
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "datadog_agent")]
195impl SourceConfig for DatadogAgentConfig {
196    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
197        let log_namespace = cx.log_namespace(self.log_namespace);
198
199        let logs_schema_definition = cx
200            .schema_definitions
201            .get(&Some(LOGS.to_owned()))
202            .or_else(|| cx.schema_definitions.get(&None))
203            .cloned();
204
205        let decoder =
206            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
207                .build()?;
208
209        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
210        let source = DatadogAgentSource::new(
211            self.store_api_key,
212            decoder,
213            tls.http_protocol_name(),
214            logs_schema_definition,
215            log_namespace,
216            self.parse_ddtags,
217            self.split_metric_namespace,
218        );
219        let listener = tls.bind(&self.address).await?;
220        let handler = RequestHandler {
221            acknowledgements: cx.do_acknowledgements(self.acknowledgements),
222            multiple_outputs: self.multiple_outputs,
223            out: cx.out,
224        };
225        let filters = source.build_warp_filters(handler, self)?;
226        let shutdown = cx.shutdown;
227        let keepalive_settings = self.keepalive.clone();
228
229        info!(message = "Building HTTP server.", address = %self.address);
230
231        Ok(Box::pin(async move {
232            let routes = filters.recover(|r: Rejection| async move {
233                if let Some(e_msg) = r.find::<ErrorMessage>() {
234                    let json = warp::reply::json(e_msg);
235                    Ok(warp::reply::with_status(json, e_msg.status_code()))
236                } else {
237                    // other internal error - will return 500 internal server error
238                    Err(r)
239                }
240            });
241
242            let span = Span::current();
243            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
244                let svc = ServiceBuilder::new()
245                    .layer(build_http_trace_layer(span.clone()))
246                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
247                        MaxConnectionAgeLayer::new(
248                            Duration::from_secs(secs),
249                            keepalive_settings.max_connection_age_jitter_factor,
250                            conn.peer_addr(),
251                        )
252                    }))
253                    .service(warp::service(routes.clone()));
254                futures_util::future::ok::<_, Infallible>(svc)
255            });
256
257            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
258                .serve(make_svc)
259                .with_graceful_shutdown(shutdown.map(|_| ()))
260                .await
261                .map_err(|err| {
262                    error!("An error occurred: {:?}.", err);
263                })?;
264
265            Ok(())
266        }))
267    }
268
269    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
270        let definition = self
271            .decoding
272            .schema_definition(global_log_namespace.merge(self.log_namespace))
273            // NOTE: "status" is intentionally semantically mapped to "severity",
274            //       since that is what DD designates as the semantic meaning of status
275            // https://docs.datadoghq.com/logs/log_configuration/attributes_naming_convention/?s=severity#reserved-attributes
276            .with_source_metadata(
277                Self::NAME,
278                Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
279                &owned_value_path!("status"),
280                Kind::bytes(),
281                Some(meaning::SEVERITY),
282            )
283            .with_source_metadata(
284                Self::NAME,
285                Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
286                &owned_value_path!("timestamp"),
287                Kind::timestamp(),
288                Some(meaning::TIMESTAMP),
289            )
290            .with_source_metadata(
291                Self::NAME,
292                Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
293                &owned_value_path!("hostname"),
294                Kind::bytes(),
295                Some(meaning::HOST),
296            )
297            .with_source_metadata(
298                Self::NAME,
299                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
300                &owned_value_path!("service"),
301                Kind::bytes(),
302                Some(meaning::SERVICE),
303            )
304            .with_source_metadata(
305                Self::NAME,
306                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
307                &owned_value_path!("ddsource"),
308                Kind::bytes(),
309                Some(meaning::SOURCE),
310            )
311            .with_source_metadata(
312                Self::NAME,
313                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
314                &owned_value_path!("ddtags"),
315                if self.parse_ddtags {
316                    Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
317                } else {
318                    Kind::bytes()
319                },
320                Some(meaning::TAGS),
321            )
322            .with_standard_vector_source_metadata();
323
324        let mut output = Vec::with_capacity(1);
325
326        if self.multiple_outputs {
327            if !self.disable_logs {
328                output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
329            }
330            if !self.disable_metrics {
331                output.push(SourceOutput::new_metrics().with_port(METRICS))
332            }
333            if !self.disable_traces {
334                output.push(SourceOutput::new_traces().with_port(TRACES))
335            }
336        } else {
337            output.push(SourceOutput::new_maybe_logs(
338                DataType::all_bits(),
339                definition,
340            ))
341        }
342        output
343    }
344
345    fn resources(&self) -> Vec<Resource> {
346        vec![Resource::tcp(self.address)]
347    }
348
349    fn can_acknowledge(&self) -> bool {
350        true
351    }
352
353    fn send_timeout(&self) -> Option<Duration> {
354        self.send_timeout_secs.map(Duration::from_secs_f64)
355    }
356}
357
358#[derive(Clone, Copy, Debug, Snafu)]
359pub(crate) enum ApiError {
360    ServerShutdown,
361}
362
363impl warp::reject::Reject for ApiError {}
364
365#[derive(Deserialize)]
366pub struct ApiKeyQueryParams {
367    #[serde(rename = "dd-api-key")]
368    pub dd_api_key: Option<String>,
369}
370
371#[derive(Clone)]
372pub(crate) struct DatadogAgentSource {
373    pub(crate) api_key_extractor: ApiKeyExtractor,
374    pub(crate) log_schema_host_key: OwnedTargetPath,
375    pub(crate) log_schema_source_type_key: OwnedTargetPath,
376    pub(crate) log_namespace: LogNamespace,
377    pub(crate) decoder: Decoder,
378    protocol: &'static str,
379    logs_schema_definition: Option<Arc<schema::Definition>>,
380    events_received: Registered<EventsReceived>,
381    parse_ddtags: bool,
382    split_metric_namespace: bool,
383}
384
385#[derive(Clone)]
386pub struct ApiKeyExtractor {
387    matcher: Regex,
388    store_api_key: bool,
389}
390
391impl ApiKeyExtractor {
392    pub fn extract(
393        &self,
394        path: &str,
395        header: Option<String>,
396        query_params: Option<String>,
397    ) -> Option<Arc<str>> {
398        if !self.store_api_key {
399            return None;
400        }
401        // Grab from URL first
402        self.matcher
403            .captures(path)
404            .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
405            // Try from query params
406            .or_else(|| query_params.map(Arc::from))
407            // Try from header next
408            .or_else(|| header.map(Arc::from))
409    }
410}
411
412impl DatadogAgentSource {
413    pub(crate) fn new(
414        store_api_key: bool,
415        decoder: Decoder,
416        protocol: &'static str,
417        logs_schema_definition: Option<schema::Definition>,
418        log_namespace: LogNamespace,
419        parse_ddtags: bool,
420        split_metric_namespace: bool,
421    ) -> Self {
422        Self {
423            api_key_extractor: ApiKeyExtractor {
424                store_api_key,
425                matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
426                    .expect("static regex always compiles"),
427            },
428            log_schema_host_key: log_schema()
429                .host_key_target_path()
430                .expect("global log_schema.host_key to be valid path")
431                .clone(),
432            log_schema_source_type_key: log_schema()
433                .source_type_key_target_path()
434                .expect("global log_schema.source_type_key to be valid path")
435                .clone(),
436            decoder,
437            protocol,
438            logs_schema_definition: logs_schema_definition.map(Arc::new),
439            log_namespace,
440            events_received: register!(EventsReceived),
441            parse_ddtags,
442            split_metric_namespace,
443        }
444    }
445
446    fn build_warp_filters(
447        &self,
448        handler: RequestHandler,
449        config: &DatadogAgentConfig,
450    ) -> crate::Result<BoxedFilter<(Response,)>> {
451        let mut filters =
452            (!config.disable_logs).then(|| logs::build_warp_filter(handler.clone(), self.clone()));
453
454        if !config.disable_traces {
455            let trace_filter = traces::build_warp_filter(handler.clone(), self.clone());
456            filters = filters
457                .map(|f| f.or(trace_filter.clone()).unify().boxed())
458                .or(Some(trace_filter));
459        }
460
461        if !config.disable_metrics {
462            let metrics_filter = metrics::build_warp_filter(handler, self.clone());
463            filters = filters
464                .map(|f| f.or(metrics_filter.clone()).unify().boxed())
465                .or(Some(metrics_filter));
466        }
467
468        filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
469    }
470
471    pub(crate) fn decode(
472        &self,
473        header: &Option<String>,
474        mut body: Bytes,
475        path: &str,
476    ) -> Result<Bytes, ErrorMessage> {
477        if let Some(encodings) = header {
478            for encoding in encodings.rsplit(',').map(str::trim) {
479                body = match encoding {
480                    "identity" => body,
481                    "gzip" | "x-gzip" => {
482                        let mut decoded = Vec::new();
483                        MultiGzDecoder::new(body.reader())
484                            .read_to_end(&mut decoded)
485                            .map_err(|error| emit_decompress_error(encoding, error))?;
486                        decoded.into()
487                    }
488                    "zstd" => {
489                        let mut decoded = Vec::new();
490                        zstd::stream::copy_decode(body.reader(), &mut decoded)
491                            .map_err(|error| emit_decompress_error(encoding, error))?;
492                        decoded.into()
493                    }
494                    "deflate" | "x-deflate" => {
495                        let mut decoded = Vec::new();
496                        ZlibDecoder::new(body.reader())
497                            .read_to_end(&mut decoded)
498                            .map_err(|error| emit_decompress_error(encoding, error))?;
499                        decoded.into()
500                    }
501                    encoding => {
502                        return Err(ErrorMessage::new(
503                            StatusCode::UNSUPPORTED_MEDIA_TYPE,
504                            format!("Unsupported encoding {encoding}"),
505                        ));
506                    }
507                }
508            }
509        }
510        emit!(HttpBytesReceived {
511            byte_size: body.len(),
512            http_path: path,
513            protocol: self.protocol,
514        });
515        Ok(body)
516    }
517}
518
519#[derive(Clone)]
520struct RequestHandler {
521    acknowledgements: bool,
522    multiple_outputs: bool,
523    out: SourceSender,
524}
525
526impl RequestHandler {
527    async fn handle_request(
528        mut self,
529        events: Result<Vec<Event>, ErrorMessage>,
530        output: &'static str,
531    ) -> Result<Response, Rejection> {
532        match events {
533            Ok(events) => self.handle_events(events, output).await,
534            Err(err) => Err(warp::reject::custom(err)),
535        }
536    }
537
538    async fn handle_events(
539        &mut self,
540        mut events: Vec<Event>,
541        output: &'static str,
542    ) -> Result<Response, Rejection> {
543        let receiver = BatchNotifier::maybe_apply_to(self.acknowledgements, &mut events);
544        let count = events.len();
545        let output = self.multiple_outputs.then_some(output);
546
547        let result = if let Some(name) = output {
548            self.out.send_batch_named(name, events).await
549        } else {
550            self.out.send_batch(events).await
551        };
552        match result {
553            Ok(()) => {}
554            Err(SendError::Closed) => {
555                emit!(StreamClosedError { count });
556                return Err(warp::reject::custom(ApiError::ServerShutdown));
557            }
558            Err(SendError::Timeout) => {
559                return Ok(warp::reply::with_status(
560                    "Service unavailable",
561                    StatusCode::SERVICE_UNAVAILABLE,
562                )
563                .into_response());
564            }
565        }
566        match receiver {
567            None => Ok(warp::reply().into_response()),
568            Some(receiver) => match receiver.await {
569                BatchStatus::Delivered => Ok(warp::reply().into_response()),
570                BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
571                    StatusCode::INTERNAL_SERVER_ERROR,
572                    "Error delivering contents to sink".into(),
573                ))),
574                BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
575                    StatusCode::BAD_REQUEST,
576                    "Contents failed to deliver to sink".into(),
577                ))),
578            },
579        }
580    }
581}
582
583// https://github.com/DataDog/datadog-agent/blob/a33248c2bc125920a9577af1e16f12298875a4ad/pkg/logs/processor/json.go#L23-L49
584#[derive(Clone, Debug, Deserialize, Serialize)]
585#[serde(deny_unknown_fields)]
586struct LogMsg {
587    pub message: Bytes,
588    pub status: Bytes,
589    #[serde(
590        deserialize_with = "ts_milliseconds::deserialize",
591        serialize_with = "ts_milliseconds::serialize"
592    )]
593    pub timestamp: DateTime<Utc>,
594    pub hostname: Bytes,
595    pub service: Bytes,
596    pub ddsource: Bytes,
597    pub ddtags: Bytes,
598}