vector/sources/datadog_agent/
mod.rs

1#[cfg(all(test, feature = "datadog-agent-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod logs;
7pub mod metrics;
8pub mod traces;
9
10#[allow(warnings, clippy::pedantic, clippy::nursery)]
11pub(crate) mod ddmetric_proto {
12    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
13}
14
15#[allow(warnings)]
16pub(crate) mod ddtrace_proto {
17    include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
18}
19
20use std::{convert::Infallible, fmt::Debug, io::Read, net::SocketAddr, sync::Arc, time::Duration};
21
22use bytes::{Buf, Bytes};
23use chrono::{DateTime, Utc, serde::ts_milliseconds};
24use flate2::read::{MultiGzDecoder, ZlibDecoder};
25use futures::FutureExt;
26use http::StatusCode;
27use hyper::{Server, service::make_service_fn};
28use regex::Regex;
29use serde::{Deserialize, Serialize};
30use snafu::Snafu;
31use tokio::net::TcpStream;
32use tower::ServiceBuilder;
33use tracing::Span;
34use vector_lib::{
35    codecs::decoding::{DeserializerConfig, FramingConfig},
36    config::{LegacyKey, LogNamespace},
37    configurable::configurable_component,
38    event::{BatchNotifier, BatchStatus},
39    internal_event::{EventsReceived, Registered},
40    lookup::owned_value_path,
41    schema::meaning,
42    tls::MaybeTlsIncomingStream,
43};
44use vrl::{
45    path::OwnedTargetPath,
46    value::{Kind, kind::Collection},
47};
48use warp::{Filter, Reply, filters::BoxedFilter, reject::Rejection, reply::Response};
49
50use crate::{
51    SourceSender,
52    codecs::{Decoder, DecodingConfig},
53    common::http::ErrorMessage,
54    config::{
55        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
56        SourceContext, SourceOutput, log_schema,
57    },
58    event::Event,
59    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
60    internal_events::{HttpBytesReceived, StreamClosedError},
61    schema,
62    serde::{bool_or_struct, default_decoding, default_framing_message_based},
63    sources::{self, util::http::emit_decompress_error},
64    tls::{MaybeTlsSettings, TlsEnableableConfig},
65};
66
67pub const LOGS: &str = "logs";
68pub const METRICS: &str = "metrics";
69pub const TRACES: &str = "traces";
70
71/// Configuration for the `datadog_agent` source.
72#[configurable_component(source(
73    "datadog_agent",
74    "Receive logs, metrics, and traces collected by a Datadog Agent."
75))]
76#[derive(Clone, Debug)]
77pub struct DatadogAgentConfig {
78    /// The socket address to accept connections on.
79    ///
80    /// It _must_ include a port.
81    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
82    #[configurable(metadata(docs::examples = "localhost:80"))]
83    address: SocketAddr,
84
85    /// If this is set to `true`, when incoming events contain a Datadog API key, it is
86    /// stored in the event metadata and used if the event is sent to a Datadog sink.
87    #[configurable(metadata(docs::advanced))]
88    #[serde(default = "crate::serde::default_true")]
89    store_api_key: bool,
90
91    /// If this is set to `true`, logs are not accepted by the component.
92    #[configurable(metadata(docs::advanced))]
93    #[serde(default = "crate::serde::default_false")]
94    disable_logs: bool,
95
96    /// If this is set to `true`, metrics (beta) are not accepted by the component.
97    #[configurable(metadata(docs::advanced))]
98    #[serde(default = "crate::serde::default_false")]
99    disable_metrics: bool,
100
101    /// If this is set to `true`, traces (alpha) are not accepted by the component.
102    #[configurable(metadata(docs::advanced))]
103    #[serde(default = "crate::serde::default_false")]
104    disable_traces: bool,
105
106    /// If this is set to `true`, logs, metrics (beta), and traces (alpha) are sent to different outputs.
107    ///
108    ///
109    /// For a source component named `agent`, the received logs, metrics (beta), and traces (alpha) can then be
110    /// configured as input to other components by specifying `agent.logs`, `agent.metrics`, and
111    /// `agent.traces`, respectively.
112    #[configurable(metadata(docs::advanced))]
113    #[serde(default = "crate::serde::default_false")]
114    multiple_outputs: bool,
115
116    /// If this is set to `true`, when log events contain the field `ddtags`, the string value that
117    /// contains a list of key:value pairs set by the Agent is parsed and expanded into an array.
118    #[configurable(metadata(docs::advanced))]
119    #[serde(default = "crate::serde::default_false")]
120    parse_ddtags: bool,
121
122    /// If this is set to `true`, metric names are split at the first '.' into a namespace and name.
123    /// For example, `system.cpu.usage` would be split into namespace `system` and name `cpu.usage`.
124    /// If `false`, the full metric name is used without splitting. This may be useful if you are using a
125    /// default namespace for metrics in sinks connected to this source.
126    #[configurable(metadata(docs::advanced))]
127    #[serde(default = "crate::serde::default_true")]
128    split_metric_namespace: bool,
129
130    /// The namespace to use for logs. This overrides the global setting.
131    #[serde(default)]
132    #[configurable(metadata(docs::hidden))]
133    log_namespace: Option<bool>,
134
135    #[configurable(derived)]
136    tls: Option<TlsEnableableConfig>,
137
138    #[configurable(derived)]
139    #[serde(default = "default_framing_message_based")]
140    framing: FramingConfig,
141
142    #[configurable(derived)]
143    #[serde(default = "default_decoding")]
144    decoding: DeserializerConfig,
145
146    #[configurable(derived)]
147    #[serde(default, deserialize_with = "bool_or_struct")]
148    acknowledgements: SourceAcknowledgementsConfig,
149
150    #[configurable(derived)]
151    #[serde(default)]
152    keepalive: KeepaliveConfig,
153}
154
155impl GenerateConfig for DatadogAgentConfig {
156    fn generate_config() -> toml::Value {
157        toml::Value::try_from(Self {
158            address: "0.0.0.0:8080".parse().unwrap(),
159            tls: None,
160            store_api_key: true,
161            framing: default_framing_message_based(),
162            decoding: default_decoding(),
163            acknowledgements: SourceAcknowledgementsConfig::default(),
164            disable_logs: false,
165            disable_metrics: false,
166            disable_traces: false,
167            multiple_outputs: false,
168            parse_ddtags: false,
169            split_metric_namespace: true,
170            log_namespace: Some(false),
171            keepalive: KeepaliveConfig::default(),
172        })
173        .unwrap()
174    }
175}
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "datadog_agent")]
179impl SourceConfig for DatadogAgentConfig {
180    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
181        let log_namespace = cx.log_namespace(self.log_namespace);
182
183        let logs_schema_definition = cx
184            .schema_definitions
185            .get(&Some(LOGS.to_owned()))
186            .or_else(|| cx.schema_definitions.get(&None))
187            .cloned();
188
189        let decoder =
190            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
191                .build()?;
192
193        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
194        let source = DatadogAgentSource::new(
195            self.store_api_key,
196            decoder,
197            tls.http_protocol_name(),
198            logs_schema_definition,
199            log_namespace,
200            self.parse_ddtags,
201            self.split_metric_namespace,
202        );
203        let listener = tls.bind(&self.address).await?;
204        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
205        let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
206        let shutdown = cx.shutdown;
207        let keepalive_settings = self.keepalive.clone();
208
209        info!(message = "Building HTTP server.", address = %self.address);
210
211        Ok(Box::pin(async move {
212            let routes = filters.recover(|r: Rejection| async move {
213                if let Some(e_msg) = r.find::<ErrorMessage>() {
214                    let json = warp::reply::json(e_msg);
215                    Ok(warp::reply::with_status(json, e_msg.status_code()))
216                } else {
217                    // other internal error - will return 500 internal server error
218                    Err(r)
219                }
220            });
221
222            let span = Span::current();
223            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
224                let svc = ServiceBuilder::new()
225                    .layer(build_http_trace_layer(span.clone()))
226                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
227                        MaxConnectionAgeLayer::new(
228                            Duration::from_secs(secs),
229                            keepalive_settings.max_connection_age_jitter_factor,
230                            conn.peer_addr(),
231                        )
232                    }))
233                    .service(warp::service(routes.clone()));
234                futures_util::future::ok::<_, Infallible>(svc)
235            });
236
237            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
238                .serve(make_svc)
239                .with_graceful_shutdown(shutdown.map(|_| ()))
240                .await
241                .map_err(|err| {
242                    error!("An error occurred: {:?}.", err);
243                })?;
244
245            Ok(())
246        }))
247    }
248
249    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
250        let definition = self
251            .decoding
252            .schema_definition(global_log_namespace.merge(self.log_namespace))
253            // NOTE: "status" is intentionally semantically mapped to "severity",
254            //       since that is what DD designates as the semantic meaning of status
255            // https://docs.datadoghq.com/logs/log_configuration/attributes_naming_convention/?s=severity#reserved-attributes
256            .with_source_metadata(
257                Self::NAME,
258                Some(LegacyKey::InsertIfEmpty(owned_value_path!("status"))),
259                &owned_value_path!("status"),
260                Kind::bytes(),
261                Some(meaning::SEVERITY),
262            )
263            .with_source_metadata(
264                Self::NAME,
265                Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
266                &owned_value_path!("timestamp"),
267                Kind::timestamp(),
268                Some(meaning::TIMESTAMP),
269            )
270            .with_source_metadata(
271                Self::NAME,
272                Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
273                &owned_value_path!("hostname"),
274                Kind::bytes(),
275                Some(meaning::HOST),
276            )
277            .with_source_metadata(
278                Self::NAME,
279                Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
280                &owned_value_path!("service"),
281                Kind::bytes(),
282                Some(meaning::SERVICE),
283            )
284            .with_source_metadata(
285                Self::NAME,
286                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddsource"))),
287                &owned_value_path!("ddsource"),
288                Kind::bytes(),
289                Some(meaning::SOURCE),
290            )
291            .with_source_metadata(
292                Self::NAME,
293                Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
294                &owned_value_path!("ddtags"),
295                if self.parse_ddtags {
296                    Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined()
297                } else {
298                    Kind::bytes()
299                },
300                Some(meaning::TAGS),
301            )
302            .with_standard_vector_source_metadata();
303
304        let mut output = Vec::with_capacity(1);
305
306        if self.multiple_outputs {
307            if !self.disable_logs {
308                output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
309            }
310            if !self.disable_metrics {
311                output.push(SourceOutput::new_metrics().with_port(METRICS))
312            }
313            if !self.disable_traces {
314                output.push(SourceOutput::new_traces().with_port(TRACES))
315            }
316        } else {
317            output.push(SourceOutput::new_maybe_logs(
318                DataType::all_bits(),
319                definition,
320            ))
321        }
322        output
323    }
324
325    fn resources(&self) -> Vec<Resource> {
326        vec![Resource::tcp(self.address)]
327    }
328
329    fn can_acknowledge(&self) -> bool {
330        true
331    }
332}
333
334#[derive(Clone, Copy, Debug, Snafu)]
335pub(crate) enum ApiError {
336    ServerShutdown,
337}
338
339impl warp::reject::Reject for ApiError {}
340
341#[derive(Deserialize)]
342pub struct ApiKeyQueryParams {
343    #[serde(rename = "dd-api-key")]
344    pub dd_api_key: Option<String>,
345}
346
347#[derive(Clone)]
348pub(crate) struct DatadogAgentSource {
349    pub(crate) api_key_extractor: ApiKeyExtractor,
350    pub(crate) log_schema_host_key: OwnedTargetPath,
351    pub(crate) log_schema_source_type_key: OwnedTargetPath,
352    pub(crate) log_namespace: LogNamespace,
353    pub(crate) decoder: Decoder,
354    protocol: &'static str,
355    logs_schema_definition: Option<Arc<schema::Definition>>,
356    events_received: Registered<EventsReceived>,
357    parse_ddtags: bool,
358    split_metric_namespace: bool,
359}
360
361#[derive(Clone)]
362pub struct ApiKeyExtractor {
363    matcher: Regex,
364    store_api_key: bool,
365}
366
367impl ApiKeyExtractor {
368    pub fn extract(
369        &self,
370        path: &str,
371        header: Option<String>,
372        query_params: Option<String>,
373    ) -> Option<Arc<str>> {
374        if !self.store_api_key {
375            return None;
376        }
377        // Grab from URL first
378        self.matcher
379            .captures(path)
380            .and_then(|cap| cap.name("api_key").map(|key| key.as_str()).map(Arc::from))
381            // Try from query params
382            .or_else(|| query_params.map(Arc::from))
383            // Try from header next
384            .or_else(|| header.map(Arc::from))
385    }
386}
387
388impl DatadogAgentSource {
389    pub(crate) fn new(
390        store_api_key: bool,
391        decoder: Decoder,
392        protocol: &'static str,
393        logs_schema_definition: Option<schema::Definition>,
394        log_namespace: LogNamespace,
395        parse_ddtags: bool,
396        split_metric_namespace: bool,
397    ) -> Self {
398        Self {
399            api_key_extractor: ApiKeyExtractor {
400                store_api_key,
401                matcher: Regex::new(r"^/v1/input/(?P<api_key>[[:alnum:]]{32})/??")
402                    .expect("static regex always compiles"),
403            },
404            log_schema_host_key: log_schema()
405                .host_key_target_path()
406                .expect("global log_schema.host_key to be valid path")
407                .clone(),
408            log_schema_source_type_key: log_schema()
409                .source_type_key_target_path()
410                .expect("global log_schema.source_type_key to be valid path")
411                .clone(),
412            decoder,
413            protocol,
414            logs_schema_definition: logs_schema_definition.map(Arc::new),
415            log_namespace,
416            events_received: register!(EventsReceived),
417            parse_ddtags,
418            split_metric_namespace,
419        }
420    }
421
422    fn build_warp_filters(
423        &self,
424        out: SourceSender,
425        acknowledgements: bool,
426        config: &DatadogAgentConfig,
427    ) -> crate::Result<BoxedFilter<(Response,)>> {
428        let mut filters = (!config.disable_logs).then(|| {
429            logs::build_warp_filter(
430                acknowledgements,
431                config.multiple_outputs,
432                out.clone(),
433                self.clone(),
434            )
435        });
436
437        if !config.disable_traces {
438            let trace_filter = traces::build_warp_filter(
439                acknowledgements,
440                config.multiple_outputs,
441                out.clone(),
442                self.clone(),
443            );
444            filters = filters
445                .map(|f| f.or(trace_filter.clone()).unify().boxed())
446                .or(Some(trace_filter));
447        }
448
449        if !config.disable_metrics {
450            let metrics_filter = metrics::build_warp_filter(
451                acknowledgements,
452                config.multiple_outputs,
453                out,
454                self.clone(),
455            );
456            filters = filters
457                .map(|f| f.or(metrics_filter.clone()).unify().boxed())
458                .or(Some(metrics_filter));
459        }
460
461        filters.ok_or_else(|| "At least one of the supported data type shall be enabled".into())
462    }
463
464    pub(crate) fn decode(
465        &self,
466        header: &Option<String>,
467        mut body: Bytes,
468        path: &str,
469    ) -> Result<Bytes, ErrorMessage> {
470        if let Some(encodings) = header {
471            for encoding in encodings.rsplit(',').map(str::trim) {
472                body = match encoding {
473                    "identity" => body,
474                    "gzip" | "x-gzip" => {
475                        let mut decoded = Vec::new();
476                        MultiGzDecoder::new(body.reader())
477                            .read_to_end(&mut decoded)
478                            .map_err(|error| emit_decompress_error(encoding, error))?;
479                        decoded.into()
480                    }
481                    "zstd" => {
482                        let mut decoded = Vec::new();
483                        zstd::stream::copy_decode(body.reader(), &mut decoded)
484                            .map_err(|error| emit_decompress_error(encoding, error))?;
485                        decoded.into()
486                    }
487                    "deflate" | "x-deflate" => {
488                        let mut decoded = Vec::new();
489                        ZlibDecoder::new(body.reader())
490                            .read_to_end(&mut decoded)
491                            .map_err(|error| emit_decompress_error(encoding, error))?;
492                        decoded.into()
493                    }
494                    encoding => {
495                        return Err(ErrorMessage::new(
496                            StatusCode::UNSUPPORTED_MEDIA_TYPE,
497                            format!("Unsupported encoding {encoding}"),
498                        ));
499                    }
500                }
501            }
502        }
503        emit!(HttpBytesReceived {
504            byte_size: body.len(),
505            http_path: path,
506            protocol: self.protocol,
507        });
508        Ok(body)
509    }
510}
511
512pub(crate) async fn handle_request(
513    events: Result<Vec<Event>, ErrorMessage>,
514    acknowledgements: bool,
515    mut out: SourceSender,
516    output: Option<&str>,
517) -> Result<Response, Rejection> {
518    match events {
519        Ok(mut events) => {
520            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
521            let count = events.len();
522
523            if let Some(name) = output {
524                out.send_batch_named(name, events).await
525            } else {
526                out.send_batch(events).await
527            }
528            .map_err(|_| {
529                emit!(StreamClosedError { count });
530                warp::reject::custom(ApiError::ServerShutdown)
531            })?;
532            match receiver {
533                None => Ok(warp::reply().into_response()),
534                Some(receiver) => match receiver.await {
535                    BatchStatus::Delivered => Ok(warp::reply().into_response()),
536                    BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
537                        StatusCode::INTERNAL_SERVER_ERROR,
538                        "Error delivering contents to sink".into(),
539                    ))),
540                    BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
541                        StatusCode::BAD_REQUEST,
542                        "Contents failed to deliver to sink".into(),
543                    ))),
544                },
545            }
546        }
547        Err(err) => Err(warp::reject::custom(err)),
548    }
549}
550
551// https://github.com/DataDog/datadog-agent/blob/a33248c2bc125920a9577af1e16f12298875a4ad/pkg/logs/processor/json.go#L23-L49
552#[derive(Clone, Debug, Deserialize, Serialize)]
553#[serde(deny_unknown_fields)]
554struct LogMsg {
555    pub message: Bytes,
556    pub status: Bytes,
557    #[serde(
558        deserialize_with = "ts_milliseconds::deserialize",
559        serialize_with = "ts_milliseconds::serialize"
560    )]
561    pub timestamp: DateTime<Utc>,
562    pub hostname: Bytes,
563    pub service: Bytes,
564    pub ddsource: Bytes,
565    pub ddtags: Bytes,
566}