vector/sources/datadog_agent/
logs.rs

1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::{
8    EstimatedJsonEncodedSizeOf,
9    codecs::StreamDecodingError,
10    config::LegacyKey,
11    internal_event::{CountByteSize, InternalEventHandle as _},
12    json_size::JsonSize,
13    lookup::path,
14};
15use vrl::core::Value;
16use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};
17
18use super::{ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, RequestHandler};
19use crate::{
20    common::{datadog::DDTAGS, http::ErrorMessage},
21    event::Event,
22    internal_events::DatadogAgentJsonParseError,
23};
24
25pub(super) fn build_warp_filter(
26    handler: RequestHandler,
27    source: DatadogAgentSource,
28) -> BoxedFilter<(Response,)> {
29    warp::post()
30        .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
31        .and(warp::path::full())
32        .and(warp::header::optional::<String>("content-encoding"))
33        .and(warp::header::optional::<String>("dd-api-key"))
34        .and(warp::query::<ApiKeyQueryParams>())
35        .and(warp::body::bytes())
36        .and_then(
37            move |_,
38                  path: FullPath,
39                  encoding_header: Option<String>,
40                  api_token: Option<String>,
41                  query_params: ApiKeyQueryParams,
42                  body: Bytes| {
43                let events = source
44                    .decode(&encoding_header, body, path.as_str())
45                    .and_then(|body| {
46                        decode_log_body(
47                            body,
48                            source.api_key_extractor.extract(
49                                path.as_str(),
50                                api_token,
51                                query_params.dd_api_key,
52                            ),
53                            &source,
54                        )
55                    });
56                handler.clone().handle_request(events, super::LOGS)
57            },
58        )
59        .boxed()
60}
61
62pub(crate) fn decode_log_body(
63    body: Bytes,
64    api_key: Option<Arc<str>>,
65    source: &DatadogAgentSource,
66) -> Result<Vec<Event>, ErrorMessage> {
67    if body.is_empty() || body.as_ref() == b"{}" {
68        // The datadog agent may send an empty payload as a keep alive
69        // https://github.com/DataDog/datadog-agent/blob/5a6c5dd75a2233fbf954e38ddcc1484df4c21a35/pkg/logs/client/http/destination.go#L52
70        debug!(message = "Empty payload ignored.");
71        return Ok(Vec::new());
72    }
73
74    let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
75        emit!(DatadogAgentJsonParseError { error: &error });
76
77        ErrorMessage::new(
78            StatusCode::BAD_REQUEST,
79            format!("Error parsing JSON: {error:?}"),
80        )
81    })?;
82
83    let now = Utc::now();
84    let mut decoded = Vec::new();
85    let mut event_bytes_received = JsonSize::zero();
86
87    for LogMsg {
88        message,
89        status,
90        timestamp,
91        hostname,
92        service,
93        ddsource,
94        ddtags,
95    } in messages
96    {
97        let mut decoder = source.decoder.clone();
98        let mut buffer = BytesMut::new();
99        buffer.put(message);
100
101        loop {
102            match decoder.decode_eof(&mut buffer) {
103                Ok(Some((events, _byte_size))) => {
104                    for mut event in events {
105                        if let Event::Log(ref mut log) = event {
106                            let namespace = &source.log_namespace;
107                            let source_name = "datadog_agent";
108
109                            namespace.insert_source_metadata(
110                                source_name,
111                                log,
112                                Some(LegacyKey::InsertIfEmpty(path!("status"))),
113                                path!("status"),
114                                status.clone(),
115                            );
116                            namespace.insert_source_metadata(
117                                source_name,
118                                log,
119                                Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
120                                path!("timestamp"),
121                                timestamp,
122                            );
123                            namespace.insert_source_metadata(
124                                source_name,
125                                log,
126                                Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
127                                path!("hostname"),
128                                hostname.clone(),
129                            );
130                            namespace.insert_source_metadata(
131                                source_name,
132                                log,
133                                Some(LegacyKey::InsertIfEmpty(path!("service"))),
134                                path!("service"),
135                                service.clone(),
136                            );
137                            namespace.insert_source_metadata(
138                                source_name,
139                                log,
140                                Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
141                                path!("ddsource"),
142                                ddsource.clone(),
143                            );
144
145                            let ddtags: Value = if source.parse_ddtags {
146                                parse_ddtags(&ddtags)
147                            } else {
148                                ddtags.clone().into()
149                            };
150
151                            namespace.insert_source_metadata(
152                                source_name,
153                                log,
154                                Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
155                                path!(DDTAGS),
156                                ddtags,
157                            );
158
159                            // compute EstimatedJsonSizeOf before enrichment
160                            event_bytes_received += log.estimated_json_encoded_size_of();
161
162                            namespace.insert_standard_vector_source_metadata(
163                                log,
164                                DatadogAgentConfig::NAME,
165                                now,
166                            );
167
168                            if let Some(k) = &api_key {
169                                log.metadata_mut().set_datadog_api_key(Arc::clone(k));
170                            }
171
172                            let logs_schema_definition = source
173                                .logs_schema_definition
174                                .as_ref()
175                                .unwrap_or_else(|| panic!("registered log schema required"));
176
177                            log.metadata_mut()
178                                .set_schema_definition(logs_schema_definition);
179                        }
180
181                        decoded.push(event);
182                    }
183                }
184                Ok(None) => break,
185                Err(error) => {
186                    // Error is logged by `crate::codecs::Decoder`, no further
187                    // handling is needed here.
188                    if !error.can_continue() {
189                        break;
190                    }
191                }
192            }
193        }
194    }
195
196    source
197        .events_received
198        .emit(CountByteSize(decoded.len(), event_bytes_received));
199
200    Ok(decoded)
201}
202
203// ddtags input is a string containing a list of tags which
204// can include both bare tags and key-value pairs.
205// the tag list members are separated by `,` and the
206// tag-value pairs are separated by `:`.
207//
208// The output is an Array regardless of the input string.
209fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
210    if ddtags_raw.is_empty() {
211        return Vec::<Value>::new().into();
212    }
213
214    let ddtags_str = String::from_utf8_lossy(ddtags_raw);
215
216    // There are multiple tags, which could be either bare or pairs
217    let ddtags: Vec<Value> = ddtags_str
218        .split(',')
219        .filter(|kv| !kv.is_empty())
220        .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
221        .collect();
222
223    if ddtags.is_empty() && !ddtags_str.is_empty() {
224        warn!(
225            message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed."
226        )
227    }
228
229    ddtags.into()
230}
231
232#[cfg(test)]
233mod tests {
234    use similar_asserts::assert_eq;
235    use vrl::value;
236
237    use super::*;
238
239    #[test]
240    fn ddtags_parse_empty() {
241        let raw = Bytes::from(String::from(""));
242        let val = parse_ddtags(&raw);
243
244        assert_eq!(val, value!([]));
245    }
246
247    #[test]
248    fn ddtags_parse_bare() {
249        let raw = Bytes::from(String::from("bare"));
250        let val = parse_ddtags(&raw);
251
252        assert_eq!(val, value!(["bare"]));
253    }
254
255    #[test]
256    fn ddtags_parse_kv_one() {
257        let raw = Bytes::from(String::from("filename:driver.log"));
258        let val = parse_ddtags(&raw);
259
260        assert_eq!(val, value!(["filename:driver.log"]));
261    }
262
263    #[test]
264    fn ddtags_parse_kv_multi() {
265        let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
266        let val = parse_ddtags(&raw);
267
268        assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
269    }
270
271    #[test]
272    fn ddtags_parse_kv_bare_combo() {
273        let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
274        let val = parse_ddtags(&raw);
275
276        assert_eq!(
277            val,
278            value!(["filename:driver.log", "debug", "wizard:the_grey"])
279        );
280    }
281}