vector/sources/datadog_agent/
logs.rs

1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::{
8    EstimatedJsonEncodedSizeOf,
9    codecs::StreamDecodingError,
10    config::LegacyKey,
11    internal_event::{CountByteSize, InternalEventHandle as _},
12    json_size::JsonSize,
13    lookup::path,
14};
15use vrl::core::Value;
16use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};
17
18use crate::{
19    SourceSender,
20    common::{datadog::DDTAGS, http::ErrorMessage},
21    event::Event,
22    internal_events::DatadogAgentJsonParseError,
23    sources::datadog_agent::{
24        ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, handle_request,
25    },
26};
27
28pub(crate) fn build_warp_filter(
29    acknowledgements: bool,
30    multiple_outputs: bool,
31    out: SourceSender,
32    source: DatadogAgentSource,
33) -> BoxedFilter<(Response,)> {
34    warp::post()
35        .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
36        .and(warp::path::full())
37        .and(warp::header::optional::<String>("content-encoding"))
38        .and(warp::header::optional::<String>("dd-api-key"))
39        .and(warp::query::<ApiKeyQueryParams>())
40        .and(warp::body::bytes())
41        .and_then(
42            move |_,
43                  path: FullPath,
44                  encoding_header: Option<String>,
45                  api_token: Option<String>,
46                  query_params: ApiKeyQueryParams,
47                  body: Bytes| {
48                let events = source
49                    .decode(&encoding_header, body, path.as_str())
50                    .and_then(|body| {
51                        decode_log_body(
52                            body,
53                            source.api_key_extractor.extract(
54                                path.as_str(),
55                                api_token,
56                                query_params.dd_api_key,
57                            ),
58                            &source,
59                        )
60                    });
61
62                let output = multiple_outputs.then_some(super::LOGS);
63                handle_request(events, acknowledgements, out.clone(), output)
64            },
65        )
66        .boxed()
67}
68
69pub(crate) fn decode_log_body(
70    body: Bytes,
71    api_key: Option<Arc<str>>,
72    source: &DatadogAgentSource,
73) -> Result<Vec<Event>, ErrorMessage> {
74    if body.is_empty() || body.as_ref() == b"{}" {
75        // The datadog agent may send an empty payload as a keep alive
76        // https://github.com/DataDog/datadog-agent/blob/5a6c5dd75a2233fbf954e38ddcc1484df4c21a35/pkg/logs/client/http/destination.go#L52
77        debug!(message = "Empty payload ignored.");
78        return Ok(Vec::new());
79    }
80
81    let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
82        emit!(DatadogAgentJsonParseError { error: &error });
83
84        ErrorMessage::new(
85            StatusCode::BAD_REQUEST,
86            format!("Error parsing JSON: {error:?}"),
87        )
88    })?;
89
90    let now = Utc::now();
91    let mut decoded = Vec::new();
92    let mut event_bytes_received = JsonSize::zero();
93
94    for LogMsg {
95        message,
96        status,
97        timestamp,
98        hostname,
99        service,
100        ddsource,
101        ddtags,
102    } in messages
103    {
104        let mut decoder = source.decoder.clone();
105        let mut buffer = BytesMut::new();
106        buffer.put(message);
107
108        loop {
109            match decoder.decode_eof(&mut buffer) {
110                Ok(Some((events, _byte_size))) => {
111                    for mut event in events {
112                        if let Event::Log(ref mut log) = event {
113                            let namespace = &source.log_namespace;
114                            let source_name = "datadog_agent";
115
116                            namespace.insert_source_metadata(
117                                source_name,
118                                log,
119                                Some(LegacyKey::InsertIfEmpty(path!("status"))),
120                                path!("status"),
121                                status.clone(),
122                            );
123                            namespace.insert_source_metadata(
124                                source_name,
125                                log,
126                                Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
127                                path!("timestamp"),
128                                timestamp,
129                            );
130                            namespace.insert_source_metadata(
131                                source_name,
132                                log,
133                                Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
134                                path!("hostname"),
135                                hostname.clone(),
136                            );
137                            namespace.insert_source_metadata(
138                                source_name,
139                                log,
140                                Some(LegacyKey::InsertIfEmpty(path!("service"))),
141                                path!("service"),
142                                service.clone(),
143                            );
144                            namespace.insert_source_metadata(
145                                source_name,
146                                log,
147                                Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
148                                path!("ddsource"),
149                                ddsource.clone(),
150                            );
151
152                            let ddtags: Value = if source.parse_ddtags {
153                                parse_ddtags(&ddtags)
154                            } else {
155                                ddtags.clone().into()
156                            };
157
158                            namespace.insert_source_metadata(
159                                source_name,
160                                log,
161                                Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
162                                path!(DDTAGS),
163                                ddtags,
164                            );
165
166                            // compute EstimatedJsonSizeOf before enrichment
167                            event_bytes_received += log.estimated_json_encoded_size_of();
168
169                            namespace.insert_standard_vector_source_metadata(
170                                log,
171                                DatadogAgentConfig::NAME,
172                                now,
173                            );
174
175                            if let Some(k) = &api_key {
176                                log.metadata_mut().set_datadog_api_key(Arc::clone(k));
177                            }
178
179                            let logs_schema_definition = source
180                                .logs_schema_definition
181                                .as_ref()
182                                .unwrap_or_else(|| panic!("registered log schema required"));
183
184                            log.metadata_mut()
185                                .set_schema_definition(logs_schema_definition);
186                        }
187
188                        decoded.push(event);
189                    }
190                }
191                Ok(None) => break,
192                Err(error) => {
193                    // Error is logged by `crate::codecs::Decoder`, no further
194                    // handling is needed here.
195                    if !error.can_continue() {
196                        break;
197                    }
198                }
199            }
200        }
201    }
202
203    source
204        .events_received
205        .emit(CountByteSize(decoded.len(), event_bytes_received));
206
207    Ok(decoded)
208}
209
210// ddtags input is a string containing a list of tags which
211// can include both bare tags and key-value pairs.
212// the tag list members are separated by `,` and the
213// tag-value pairs are separated by `:`.
214//
215// The output is an Array regardless of the input string.
216fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
217    if ddtags_raw.is_empty() {
218        return Vec::<Value>::new().into();
219    }
220
221    let ddtags_str = String::from_utf8_lossy(ddtags_raw);
222
223    // There are multiple tags, which could be either bare or pairs
224    let ddtags: Vec<Value> = ddtags_str
225        .split(',')
226        .filter(|kv| !kv.is_empty())
227        .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
228        .collect();
229
230    if ddtags.is_empty() && !ddtags_str.is_empty() {
231        warn!(
232            message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed."
233        )
234    }
235
236    ddtags.into()
237}
238
239#[cfg(test)]
240mod tests {
241    use similar_asserts::assert_eq;
242    use vrl::value;
243
244    use super::*;
245
246    #[test]
247    fn ddtags_parse_empty() {
248        let raw = Bytes::from(String::from(""));
249        let val = parse_ddtags(&raw);
250
251        assert_eq!(val, value!([]));
252    }
253
254    #[test]
255    fn ddtags_parse_bare() {
256        let raw = Bytes::from(String::from("bare"));
257        let val = parse_ddtags(&raw);
258
259        assert_eq!(val, value!(["bare"]));
260    }
261
262    #[test]
263    fn ddtags_parse_kv_one() {
264        let raw = Bytes::from(String::from("filename:driver.log"));
265        let val = parse_ddtags(&raw);
266
267        assert_eq!(val, value!(["filename:driver.log"]));
268    }
269
270    #[test]
271    fn ddtags_parse_kv_multi() {
272        let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
273        let val = parse_ddtags(&raw);
274
275        assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
276    }
277
278    #[test]
279    fn ddtags_parse_kv_bare_combo() {
280        let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
281        let val = parse_ddtags(&raw);
282
283        assert_eq!(
284            val,
285            value!(["filename:driver.log", "debug", "wizard:the_grey"])
286        );
287    }
288}