vector/sources/datadog_agent/
logs.rs1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::{
8 EstimatedJsonEncodedSizeOf,
9 codecs::StreamDecodingError,
10 config::LegacyKey,
11 internal_event::{CountByteSize, InternalEventHandle as _},
12 json_size::JsonSize,
13 lookup::path,
14};
15use vrl::core::Value;
16use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};
17
18use super::{ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, RequestHandler};
19use crate::{
20 common::{datadog::DDTAGS, http::ErrorMessage},
21 event::Event,
22 internal_events::DatadogAgentJsonParseError,
23};
24
25pub(super) fn build_warp_filter(
26 handler: RequestHandler,
27 source: DatadogAgentSource,
28) -> BoxedFilter<(Response,)> {
29 warp::post()
30 .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
31 .and(warp::path::full())
32 .and(warp::header::optional::<String>("content-encoding"))
33 .and(warp::header::optional::<String>("dd-api-key"))
34 .and(warp::query::<ApiKeyQueryParams>())
35 .and(warp::body::bytes())
36 .and_then(
37 move |_,
38 path: FullPath,
39 encoding_header: Option<String>,
40 api_token: Option<String>,
41 query_params: ApiKeyQueryParams,
42 body: Bytes| {
43 let events = source
44 .decode(&encoding_header, body, path.as_str())
45 .and_then(|body| {
46 decode_log_body(
47 body,
48 source.api_key_extractor.extract(
49 path.as_str(),
50 api_token,
51 query_params.dd_api_key,
52 ),
53 &source,
54 )
55 });
56 handler.clone().handle_request(events, super::LOGS)
57 },
58 )
59 .boxed()
60}
61
62pub(crate) fn decode_log_body(
63 body: Bytes,
64 api_key: Option<Arc<str>>,
65 source: &DatadogAgentSource,
66) -> Result<Vec<Event>, ErrorMessage> {
67 if body.is_empty() || body.as_ref() == b"{}" {
68 debug!(message = "Empty payload ignored.");
71 return Ok(Vec::new());
72 }
73
74 let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
75 emit!(DatadogAgentJsonParseError { error: &error });
76
77 ErrorMessage::new(
78 StatusCode::BAD_REQUEST,
79 format!("Error parsing JSON: {error:?}"),
80 )
81 })?;
82
83 let now = Utc::now();
84 let mut decoded = Vec::new();
85 let mut event_bytes_received = JsonSize::zero();
86
87 for LogMsg {
88 message,
89 status,
90 timestamp,
91 hostname,
92 service,
93 ddsource,
94 ddtags,
95 } in messages
96 {
97 let mut decoder = source.decoder.clone();
98 let mut buffer = BytesMut::new();
99 buffer.put(message);
100
101 loop {
102 match decoder.decode_eof(&mut buffer) {
103 Ok(Some((events, _byte_size))) => {
104 for mut event in events {
105 if let Event::Log(ref mut log) = event {
106 let namespace = &source.log_namespace;
107 let source_name = "datadog_agent";
108
109 namespace.insert_source_metadata(
110 source_name,
111 log,
112 Some(LegacyKey::InsertIfEmpty(path!("status"))),
113 path!("status"),
114 status.clone(),
115 );
116 namespace.insert_source_metadata(
117 source_name,
118 log,
119 Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
120 path!("timestamp"),
121 timestamp,
122 );
123 namespace.insert_source_metadata(
124 source_name,
125 log,
126 Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
127 path!("hostname"),
128 hostname.clone(),
129 );
130 namespace.insert_source_metadata(
131 source_name,
132 log,
133 Some(LegacyKey::InsertIfEmpty(path!("service"))),
134 path!("service"),
135 service.clone(),
136 );
137 namespace.insert_source_metadata(
138 source_name,
139 log,
140 Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
141 path!("ddsource"),
142 ddsource.clone(),
143 );
144
145 let ddtags: Value = if source.parse_ddtags {
146 parse_ddtags(&ddtags)
147 } else {
148 ddtags.clone().into()
149 };
150
151 namespace.insert_source_metadata(
152 source_name,
153 log,
154 Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
155 path!(DDTAGS),
156 ddtags,
157 );
158
159 event_bytes_received += log.estimated_json_encoded_size_of();
161
162 namespace.insert_standard_vector_source_metadata(
163 log,
164 DatadogAgentConfig::NAME,
165 now,
166 );
167
168 if let Some(k) = &api_key {
169 log.metadata_mut().set_datadog_api_key(Arc::clone(k));
170 }
171
172 let logs_schema_definition = source
173 .logs_schema_definition
174 .as_ref()
175 .unwrap_or_else(|| panic!("registered log schema required"));
176
177 log.metadata_mut()
178 .set_schema_definition(logs_schema_definition);
179 }
180
181 decoded.push(event);
182 }
183 }
184 Ok(None) => break,
185 Err(error) => {
186 if !error.can_continue() {
189 break;
190 }
191 }
192 }
193 }
194 }
195
196 source
197 .events_received
198 .emit(CountByteSize(decoded.len(), event_bytes_received));
199
200 Ok(decoded)
201}
202
203fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
210 if ddtags_raw.is_empty() {
211 return Vec::<Value>::new().into();
212 }
213
214 let ddtags_str = String::from_utf8_lossy(ddtags_raw);
215
216 let ddtags: Vec<Value> = ddtags_str
218 .split(',')
219 .filter(|kv| !kv.is_empty())
220 .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
221 .collect();
222
223 if ddtags.is_empty() && !ddtags_str.is_empty() {
224 warn!(
225 message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed."
226 )
227 }
228
229 ddtags.into()
230}
231
232#[cfg(test)]
233mod tests {
234 use similar_asserts::assert_eq;
235 use vrl::value;
236
237 use super::*;
238
239 #[test]
240 fn ddtags_parse_empty() {
241 let raw = Bytes::from(String::from(""));
242 let val = parse_ddtags(&raw);
243
244 assert_eq!(val, value!([]));
245 }
246
247 #[test]
248 fn ddtags_parse_bare() {
249 let raw = Bytes::from(String::from("bare"));
250 let val = parse_ddtags(&raw);
251
252 assert_eq!(val, value!(["bare"]));
253 }
254
255 #[test]
256 fn ddtags_parse_kv_one() {
257 let raw = Bytes::from(String::from("filename:driver.log"));
258 let val = parse_ddtags(&raw);
259
260 assert_eq!(val, value!(["filename:driver.log"]));
261 }
262
263 #[test]
264 fn ddtags_parse_kv_multi() {
265 let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
266 let val = parse_ddtags(&raw);
267
268 assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
269 }
270
271 #[test]
272 fn ddtags_parse_kv_bare_combo() {
273 let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
274 let val = parse_ddtags(&raw);
275
276 assert_eq!(
277 val,
278 value!(["filename:driver.log", "debug", "wizard:the_grey"])
279 );
280 }
281}