vector/sources/datadog_agent/
logs.rs1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::{
8 EstimatedJsonEncodedSizeOf,
9 codecs::StreamDecodingError,
10 config::LegacyKey,
11 internal_event::{CountByteSize, InternalEventHandle as _},
12 json_size::JsonSize,
13 lookup::path,
14};
15use vrl::core::Value;
16use warp::{Filter, filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response};
17
18use crate::{
19 SourceSender,
20 common::{datadog::DDTAGS, http::ErrorMessage},
21 event::Event,
22 internal_events::DatadogAgentJsonParseError,
23 sources::datadog_agent::{
24 ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg, handle_request,
25 },
26};
27
28pub(crate) fn build_warp_filter(
29 acknowledgements: bool,
30 multiple_outputs: bool,
31 out: SourceSender,
32 source: DatadogAgentSource,
33) -> BoxedFilter<(Response,)> {
34 warp::post()
35 .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
36 .and(warp::path::full())
37 .and(warp::header::optional::<String>("content-encoding"))
38 .and(warp::header::optional::<String>("dd-api-key"))
39 .and(warp::query::<ApiKeyQueryParams>())
40 .and(warp::body::bytes())
41 .and_then(
42 move |_,
43 path: FullPath,
44 encoding_header: Option<String>,
45 api_token: Option<String>,
46 query_params: ApiKeyQueryParams,
47 body: Bytes| {
48 let events = source
49 .decode(&encoding_header, body, path.as_str())
50 .and_then(|body| {
51 decode_log_body(
52 body,
53 source.api_key_extractor.extract(
54 path.as_str(),
55 api_token,
56 query_params.dd_api_key,
57 ),
58 &source,
59 )
60 });
61
62 let output = multiple_outputs.then_some(super::LOGS);
63 handle_request(events, acknowledgements, out.clone(), output)
64 },
65 )
66 .boxed()
67}
68
69pub(crate) fn decode_log_body(
70 body: Bytes,
71 api_key: Option<Arc<str>>,
72 source: &DatadogAgentSource,
73) -> Result<Vec<Event>, ErrorMessage> {
74 if body.is_empty() || body.as_ref() == b"{}" {
75 debug!(
78 message = "Empty payload ignored.",
79 internal_log_rate_limit = true
80 );
81 return Ok(Vec::new());
82 }
83
84 let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
85 emit!(DatadogAgentJsonParseError { error: &error });
86
87 ErrorMessage::new(
88 StatusCode::BAD_REQUEST,
89 format!("Error parsing JSON: {error:?}"),
90 )
91 })?;
92
93 let now = Utc::now();
94 let mut decoded = Vec::new();
95 let mut event_bytes_received = JsonSize::zero();
96
97 for LogMsg {
98 message,
99 status,
100 timestamp,
101 hostname,
102 service,
103 ddsource,
104 ddtags,
105 } in messages
106 {
107 let mut decoder = source.decoder.clone();
108 let mut buffer = BytesMut::new();
109 buffer.put(message);
110
111 loop {
112 match decoder.decode_eof(&mut buffer) {
113 Ok(Some((events, _byte_size))) => {
114 for mut event in events {
115 if let Event::Log(ref mut log) = event {
116 let namespace = &source.log_namespace;
117 let source_name = "datadog_agent";
118
119 namespace.insert_source_metadata(
120 source_name,
121 log,
122 Some(LegacyKey::InsertIfEmpty(path!("status"))),
123 path!("status"),
124 status.clone(),
125 );
126 namespace.insert_source_metadata(
127 source_name,
128 log,
129 Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
130 path!("timestamp"),
131 timestamp,
132 );
133 namespace.insert_source_metadata(
134 source_name,
135 log,
136 Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
137 path!("hostname"),
138 hostname.clone(),
139 );
140 namespace.insert_source_metadata(
141 source_name,
142 log,
143 Some(LegacyKey::InsertIfEmpty(path!("service"))),
144 path!("service"),
145 service.clone(),
146 );
147 namespace.insert_source_metadata(
148 source_name,
149 log,
150 Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
151 path!("ddsource"),
152 ddsource.clone(),
153 );
154
155 let ddtags: Value = if source.parse_ddtags {
156 parse_ddtags(&ddtags)
157 } else {
158 ddtags.clone().into()
159 };
160
161 namespace.insert_source_metadata(
162 source_name,
163 log,
164 Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
165 path!(DDTAGS),
166 ddtags,
167 );
168
169 event_bytes_received += log.estimated_json_encoded_size_of();
171
172 namespace.insert_standard_vector_source_metadata(
173 log,
174 DatadogAgentConfig::NAME,
175 now,
176 );
177
178 if let Some(k) = &api_key {
179 log.metadata_mut().set_datadog_api_key(Arc::clone(k));
180 }
181
182 let logs_schema_definition = source
183 .logs_schema_definition
184 .as_ref()
185 .unwrap_or_else(|| panic!("registered log schema required"));
186
187 log.metadata_mut()
188 .set_schema_definition(logs_schema_definition);
189 }
190
191 decoded.push(event);
192 }
193 }
194 Ok(None) => break,
195 Err(error) => {
196 if !error.can_continue() {
199 break;
200 }
201 }
202 }
203 }
204 }
205
206 source
207 .events_received
208 .emit(CountByteSize(decoded.len(), event_bytes_received));
209
210 Ok(decoded)
211}
212
213fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
220 if ddtags_raw.is_empty() {
221 return Vec::<Value>::new().into();
222 }
223
224 let ddtags_str = String::from_utf8_lossy(ddtags_raw);
225
226 let ddtags: Vec<Value> = ddtags_str
228 .split(',')
229 .filter(|kv| !kv.is_empty())
230 .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
231 .collect();
232
233 if ddtags.is_empty() && !ddtags_str.is_empty() {
234 warn!(
235 message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed."
236 )
237 }
238
239 ddtags.into()
240}
241
242#[cfg(test)]
243mod tests {
244 use similar_asserts::assert_eq;
245 use vrl::value;
246
247 use super::*;
248
249 #[test]
250 fn ddtags_parse_empty() {
251 let raw = Bytes::from(String::from(""));
252 let val = parse_ddtags(&raw);
253
254 assert_eq!(val, value!([]));
255 }
256
257 #[test]
258 fn ddtags_parse_bare() {
259 let raw = Bytes::from(String::from("bare"));
260 let val = parse_ddtags(&raw);
261
262 assert_eq!(val, value!(["bare"]));
263 }
264
265 #[test]
266 fn ddtags_parse_kv_one() {
267 let raw = Bytes::from(String::from("filename:driver.log"));
268 let val = parse_ddtags(&raw);
269
270 assert_eq!(val, value!(["filename:driver.log"]));
271 }
272
273 #[test]
274 fn ddtags_parse_kv_multi() {
275 let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
276 let val = parse_ddtags(&raw);
277
278 assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
279 }
280
281 #[test]
282 fn ddtags_parse_kv_bare_combo() {
283 let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
284 let val = parse_ddtags(&raw);
285
286 assert_eq!(
287 val,
288 value!(["filename:driver.log", "debug", "wizard:the_grey"])
289 );
290 }
291}