vector/sources/datadog_agent/
logs.rs

1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::codecs::StreamDecodingError;
8use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _};
9use vector_lib::json_size::JsonSize;
10use vector_lib::lookup::path;
11use vector_lib::{config::LegacyKey, EstimatedJsonEncodedSizeOf};
12use vrl::core::Value;
13use warp::{filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response, Filter};
14
15use crate::common::datadog::DDTAGS;
16use crate::common::http::ErrorMessage;
17use crate::{
18    event::Event,
19    internal_events::DatadogAgentJsonParseError,
20    sources::datadog_agent::{
21        handle_request, ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg,
22    },
23    SourceSender,
24};
25
26pub(crate) fn build_warp_filter(
27    acknowledgements: bool,
28    multiple_outputs: bool,
29    out: SourceSender,
30    source: DatadogAgentSource,
31) -> BoxedFilter<(Response,)> {
32    warp::post()
33        .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
34        .and(warp::path::full())
35        .and(warp::header::optional::<String>("content-encoding"))
36        .and(warp::header::optional::<String>("dd-api-key"))
37        .and(warp::query::<ApiKeyQueryParams>())
38        .and(warp::body::bytes())
39        .and_then(
40            move |_,
41                  path: FullPath,
42                  encoding_header: Option<String>,
43                  api_token: Option<String>,
44                  query_params: ApiKeyQueryParams,
45                  body: Bytes| {
46                let events = source
47                    .decode(&encoding_header, body, path.as_str())
48                    .and_then(|body| {
49                        decode_log_body(
50                            body,
51                            source.api_key_extractor.extract(
52                                path.as_str(),
53                                api_token,
54                                query_params.dd_api_key,
55                            ),
56                            &source,
57                        )
58                    });
59
60                let output = multiple_outputs.then_some(super::LOGS);
61                handle_request(events, acknowledgements, out.clone(), output)
62            },
63        )
64        .boxed()
65}
66
67pub(crate) fn decode_log_body(
68    body: Bytes,
69    api_key: Option<Arc<str>>,
70    source: &DatadogAgentSource,
71) -> Result<Vec<Event>, ErrorMessage> {
72    if body.is_empty() || body.as_ref() == b"{}" {
73        // The datadog agent may send an empty payload as a keep alive
74        // https://github.com/DataDog/datadog-agent/blob/5a6c5dd75a2233fbf954e38ddcc1484df4c21a35/pkg/logs/client/http/destination.go#L52
75        debug!(
76            message = "Empty payload ignored.",
77            internal_log_rate_limit = true
78        );
79        return Ok(Vec::new());
80    }
81
82    let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
83        emit!(DatadogAgentJsonParseError { error: &error });
84
85        ErrorMessage::new(
86            StatusCode::BAD_REQUEST,
87            format!("Error parsing JSON: {error:?}"),
88        )
89    })?;
90
91    let now = Utc::now();
92    let mut decoded = Vec::new();
93    let mut event_bytes_received = JsonSize::zero();
94
95    for LogMsg {
96        message,
97        status,
98        timestamp,
99        hostname,
100        service,
101        ddsource,
102        ddtags,
103    } in messages
104    {
105        let mut decoder = source.decoder.clone();
106        let mut buffer = BytesMut::new();
107        buffer.put(message);
108
109        loop {
110            match decoder.decode_eof(&mut buffer) {
111                Ok(Some((events, _byte_size))) => {
112                    for mut event in events {
113                        if let Event::Log(ref mut log) = event {
114                            let namespace = &source.log_namespace;
115                            let source_name = "datadog_agent";
116
117                            namespace.insert_source_metadata(
118                                source_name,
119                                log,
120                                Some(LegacyKey::InsertIfEmpty(path!("status"))),
121                                path!("status"),
122                                status.clone(),
123                            );
124                            namespace.insert_source_metadata(
125                                source_name,
126                                log,
127                                Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
128                                path!("timestamp"),
129                                timestamp,
130                            );
131                            namespace.insert_source_metadata(
132                                source_name,
133                                log,
134                                Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
135                                path!("hostname"),
136                                hostname.clone(),
137                            );
138                            namespace.insert_source_metadata(
139                                source_name,
140                                log,
141                                Some(LegacyKey::InsertIfEmpty(path!("service"))),
142                                path!("service"),
143                                service.clone(),
144                            );
145                            namespace.insert_source_metadata(
146                                source_name,
147                                log,
148                                Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
149                                path!("ddsource"),
150                                ddsource.clone(),
151                            );
152
153                            let ddtags: Value = if source.parse_ddtags {
154                                parse_ddtags(&ddtags)
155                            } else {
156                                ddtags.clone().into()
157                            };
158
159                            namespace.insert_source_metadata(
160                                source_name,
161                                log,
162                                Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
163                                path!(DDTAGS),
164                                ddtags,
165                            );
166
167                            // compute EstimatedJsonSizeOf before enrichment
168                            event_bytes_received += log.estimated_json_encoded_size_of();
169
170                            namespace.insert_standard_vector_source_metadata(
171                                log,
172                                DatadogAgentConfig::NAME,
173                                now,
174                            );
175
176                            if let Some(k) = &api_key {
177                                log.metadata_mut().set_datadog_api_key(Arc::clone(k));
178                            }
179
180                            let logs_schema_definition = source
181                                .logs_schema_definition
182                                .as_ref()
183                                .unwrap_or_else(|| panic!("registered log schema required"));
184
185                            log.metadata_mut()
186                                .set_schema_definition(logs_schema_definition);
187                        }
188
189                        decoded.push(event);
190                    }
191                }
192                Ok(None) => break,
193                Err(error) => {
194                    // Error is logged by `crate::codecs::Decoder`, no further
195                    // handling is needed here.
196                    if !error.can_continue() {
197                        break;
198                    }
199                }
200            }
201        }
202    }
203
204    source
205        .events_received
206        .emit(CountByteSize(decoded.len(), event_bytes_received));
207
208    Ok(decoded)
209}
210
211// ddtags input is a string containing a list of tags which
212// can include both bare tags and key-value pairs.
213// the tag list members are separated by `,` and the
214// tag-value pairs are separated by `:`.
215//
216// The output is an Array regardless of the input string.
217fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
218    if ddtags_raw.is_empty() {
219        return Vec::<Value>::new().into();
220    }
221
222    let ddtags_str = String::from_utf8_lossy(ddtags_raw);
223
224    // There are multiple tags, which could be either bare or pairs
225    let ddtags: Vec<Value> = ddtags_str
226        .split(',')
227        .filter(|kv| !kv.is_empty())
228        .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
229        .collect();
230
231    if ddtags.is_empty() && !ddtags_str.is_empty() {
232        warn!(message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed.")
233    }
234
235    ddtags.into()
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use similar_asserts::assert_eq;
242    use vrl::value;
243
244    #[test]
245    fn ddtags_parse_empty() {
246        let raw = Bytes::from(String::from(""));
247        let val = parse_ddtags(&raw);
248
249        assert_eq!(val, value!([]));
250    }
251
252    #[test]
253    fn ddtags_parse_bare() {
254        let raw = Bytes::from(String::from("bare"));
255        let val = parse_ddtags(&raw);
256
257        assert_eq!(val, value!(["bare"]));
258    }
259
260    #[test]
261    fn ddtags_parse_kv_one() {
262        let raw = Bytes::from(String::from("filename:driver.log"));
263        let val = parse_ddtags(&raw);
264
265        assert_eq!(val, value!(["filename:driver.log"]));
266    }
267
268    #[test]
269    fn ddtags_parse_kv_multi() {
270        let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
271        let val = parse_ddtags(&raw);
272
273        assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
274    }
275
276    #[test]
277    fn ddtags_parse_kv_bare_combo() {
278        let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
279        let val = parse_ddtags(&raw);
280
281        assert_eq!(
282            val,
283            value!(["filename:driver.log", "debug", "wizard:the_grey"])
284        );
285    }
286}