vector/sources/datadog_agent/
logs.rs

1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::{
8    EstimatedJsonEncodedSizeOf,
9    codecs::StreamDecodingError,
10    config::LegacyKey,
11    internal_event::{CountByteSize, InternalEventHandle as _},
12    json_size::JsonSize,
13    lookup::path,
14};
15use vrl::core::Value;
16use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};
17
18use crate::{
19    SourceSender,
20    common::{datadog::DDTAGS, http::ErrorMessage},
21    event::Event,
22    internal_events::DatadogAgentJsonParseError,
23    sources::datadog_agent::{
24        ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, handle_request,
25    },
26};
27
28pub(crate) fn build_warp_filter(
29    acknowledgements: bool,
30    multiple_outputs: bool,
31    out: SourceSender,
32    source: DatadogAgentSource,
33) -> BoxedFilter<(Response,)> {
34    warp::post()
35        .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
36        .and(warp::path::full())
37        .and(warp::header::optional::<String>("content-encoding"))
38        .and(warp::header::optional::<String>("dd-api-key"))
39        .and(warp::query::<ApiKeyQueryParams>())
40        .and(warp::body::bytes())
41        .and_then(
42            move |_,
43                  path: FullPath,
44                  encoding_header: Option<String>,
45                  api_token: Option<String>,
46                  query_params: ApiKeyQueryParams,
47                  body: Bytes| {
48                let events = source
49                    .decode(&encoding_header, body, path.as_str())
50                    .and_then(|body| {
51                        decode_log_body(
52                            body,
53                            source.api_key_extractor.extract(
54                                path.as_str(),
55                                api_token,
56                                query_params.dd_api_key,
57                            ),
58                            &source,
59                        )
60                    });
61
62                let output = multiple_outputs.then_some(super::LOGS);
63                handle_request(events, acknowledgements, out.clone(), output)
64            },
65        )
66        .boxed()
67}
68
69pub(crate) fn decode_log_body(
70    body: Bytes,
71    api_key: Option<Arc<str>>,
72    source: &DatadogAgentSource,
73) -> Result<Vec<Event>, ErrorMessage> {
74    if body.is_empty() || body.as_ref() == b"{}" {
75        // The datadog agent may send an empty payload as a keep alive
76        // https://github.com/DataDog/datadog-agent/blob/5a6c5dd75a2233fbf954e38ddcc1484df4c21a35/pkg/logs/client/http/destination.go#L52
77        debug!(
78            message = "Empty payload ignored.",
79            internal_log_rate_limit = true
80        );
81        return Ok(Vec::new());
82    }
83
84    let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
85        emit!(DatadogAgentJsonParseError { error: &error });
86
87        ErrorMessage::new(
88            StatusCode::BAD_REQUEST,
89            format!("Error parsing JSON: {error:?}"),
90        )
91    })?;
92
93    let now = Utc::now();
94    let mut decoded = Vec::new();
95    let mut event_bytes_received = JsonSize::zero();
96
97    for LogMsg {
98        message,
99        status,
100        timestamp,
101        hostname,
102        service,
103        ddsource,
104        ddtags,
105    } in messages
106    {
107        let mut decoder = source.decoder.clone();
108        let mut buffer = BytesMut::new();
109        buffer.put(message);
110
111        loop {
112            match decoder.decode_eof(&mut buffer) {
113                Ok(Some((events, _byte_size))) => {
114                    for mut event in events {
115                        if let Event::Log(ref mut log) = event {
116                            let namespace = &source.log_namespace;
117                            let source_name = "datadog_agent";
118
119                            namespace.insert_source_metadata(
120                                source_name,
121                                log,
122                                Some(LegacyKey::InsertIfEmpty(path!("status"))),
123                                path!("status"),
124                                status.clone(),
125                            );
126                            namespace.insert_source_metadata(
127                                source_name,
128                                log,
129                                Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
130                                path!("timestamp"),
131                                timestamp,
132                            );
133                            namespace.insert_source_metadata(
134                                source_name,
135                                log,
136                                Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
137                                path!("hostname"),
138                                hostname.clone(),
139                            );
140                            namespace.insert_source_metadata(
141                                source_name,
142                                log,
143                                Some(LegacyKey::InsertIfEmpty(path!("service"))),
144                                path!("service"),
145                                service.clone(),
146                            );
147                            namespace.insert_source_metadata(
148                                source_name,
149                                log,
150                                Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
151                                path!("ddsource"),
152                                ddsource.clone(),
153                            );
154
155                            let ddtags: Value = if source.parse_ddtags {
156                                parse_ddtags(&ddtags)
157                            } else {
158                                ddtags.clone().into()
159                            };
160
161                            namespace.insert_source_metadata(
162                                source_name,
163                                log,
164                                Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
165                                path!(DDTAGS),
166                                ddtags,
167                            );
168
169                            // compute EstimatedJsonSizeOf before enrichment
170                            event_bytes_received += log.estimated_json_encoded_size_of();
171
172                            namespace.insert_standard_vector_source_metadata(
173                                log,
174                                DatadogAgentConfig::NAME,
175                                now,
176                            );
177
178                            if let Some(k) = &api_key {
179                                log.metadata_mut().set_datadog_api_key(Arc::clone(k));
180                            }
181
182                            let logs_schema_definition = source
183                                .logs_schema_definition
184                                .as_ref()
185                                .unwrap_or_else(|| panic!("registered log schema required"));
186
187                            log.metadata_mut()
188                                .set_schema_definition(logs_schema_definition);
189                        }
190
191                        decoded.push(event);
192                    }
193                }
194                Ok(None) => break,
195                Err(error) => {
196                    // Error is logged by `crate::codecs::Decoder`, no further
197                    // handling is needed here.
198                    if !error.can_continue() {
199                        break;
200                    }
201                }
202            }
203        }
204    }
205
206    source
207        .events_received
208        .emit(CountByteSize(decoded.len(), event_bytes_received));
209
210    Ok(decoded)
211}
212
213// ddtags input is a string containing a list of tags which
214// can include both bare tags and key-value pairs.
215// the tag list members are separated by `,` and the
216// tag-value pairs are separated by `:`.
217//
218// The output is an Array regardless of the input string.
219fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
220    if ddtags_raw.is_empty() {
221        return Vec::<Value>::new().into();
222    }
223
224    let ddtags_str = String::from_utf8_lossy(ddtags_raw);
225
226    // There are multiple tags, which could be either bare or pairs
227    let ddtags: Vec<Value> = ddtags_str
228        .split(',')
229        .filter(|kv| !kv.is_empty())
230        .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
231        .collect();
232
233    if ddtags.is_empty() && !ddtags_str.is_empty() {
234        warn!(
235            message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed."
236        )
237    }
238
239    ddtags.into()
240}
241
242#[cfg(test)]
243mod tests {
244    use similar_asserts::assert_eq;
245    use vrl::value;
246
247    use super::*;
248
249    #[test]
250    fn ddtags_parse_empty() {
251        let raw = Bytes::from(String::from(""));
252        let val = parse_ddtags(&raw);
253
254        assert_eq!(val, value!([]));
255    }
256
257    #[test]
258    fn ddtags_parse_bare() {
259        let raw = Bytes::from(String::from("bare"));
260        let val = parse_ddtags(&raw);
261
262        assert_eq!(val, value!(["bare"]));
263    }
264
265    #[test]
266    fn ddtags_parse_kv_one() {
267        let raw = Bytes::from(String::from("filename:driver.log"));
268        let val = parse_ddtags(&raw);
269
270        assert_eq!(val, value!(["filename:driver.log"]));
271    }
272
273    #[test]
274    fn ddtags_parse_kv_multi() {
275        let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
276        let val = parse_ddtags(&raw);
277
278        assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
279    }
280
281    #[test]
282    fn ddtags_parse_kv_bare_combo() {
283        let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
284        let val = parse_ddtags(&raw);
285
286        assert_eq!(
287            val,
288            value!(["filename:driver.log", "debug", "wizard:the_grey"])
289        );
290    }
291}