vector/sources/datadog_agent/
logs.rs1use std::sync::Arc;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use tokio_util::codec::Decoder;
7use vector_lib::codecs::StreamDecodingError;
8use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _};
9use vector_lib::json_size::JsonSize;
10use vector_lib::lookup::path;
11use vector_lib::{config::LegacyKey, EstimatedJsonEncodedSizeOf};
12use vrl::core::Value;
13use warp::{filters::BoxedFilter, path as warp_path, path::FullPath, reply::Response, Filter};
14
15use crate::common::datadog::DDTAGS;
16use crate::common::http::ErrorMessage;
17use crate::{
18 event::Event,
19 internal_events::DatadogAgentJsonParseError,
20 sources::datadog_agent::{
21 handle_request, ApiKeyQueryParams, DatadogAgentConfig, DatadogAgentSource, LogMsg,
22 },
23 SourceSender,
24};
25
26pub(crate) fn build_warp_filter(
27 acknowledgements: bool,
28 multiple_outputs: bool,
29 out: SourceSender,
30 source: DatadogAgentSource,
31) -> BoxedFilter<(Response,)> {
32 warp::post()
33 .and(warp_path!("v1" / "input" / ..).or(warp_path!("api" / "v2" / "logs" / ..)))
34 .and(warp::path::full())
35 .and(warp::header::optional::<String>("content-encoding"))
36 .and(warp::header::optional::<String>("dd-api-key"))
37 .and(warp::query::<ApiKeyQueryParams>())
38 .and(warp::body::bytes())
39 .and_then(
40 move |_,
41 path: FullPath,
42 encoding_header: Option<String>,
43 api_token: Option<String>,
44 query_params: ApiKeyQueryParams,
45 body: Bytes| {
46 let events = source
47 .decode(&encoding_header, body, path.as_str())
48 .and_then(|body| {
49 decode_log_body(
50 body,
51 source.api_key_extractor.extract(
52 path.as_str(),
53 api_token,
54 query_params.dd_api_key,
55 ),
56 &source,
57 )
58 });
59
60 let output = multiple_outputs.then_some(super::LOGS);
61 handle_request(events, acknowledgements, out.clone(), output)
62 },
63 )
64 .boxed()
65}
66
67pub(crate) fn decode_log_body(
68 body: Bytes,
69 api_key: Option<Arc<str>>,
70 source: &DatadogAgentSource,
71) -> Result<Vec<Event>, ErrorMessage> {
72 if body.is_empty() || body.as_ref() == b"{}" {
73 debug!(
76 message = "Empty payload ignored.",
77 internal_log_rate_limit = true
78 );
79 return Ok(Vec::new());
80 }
81
82 let messages: Vec<LogMsg> = serde_json::from_slice(&body).map_err(|error| {
83 emit!(DatadogAgentJsonParseError { error: &error });
84
85 ErrorMessage::new(
86 StatusCode::BAD_REQUEST,
87 format!("Error parsing JSON: {error:?}"),
88 )
89 })?;
90
91 let now = Utc::now();
92 let mut decoded = Vec::new();
93 let mut event_bytes_received = JsonSize::zero();
94
95 for LogMsg {
96 message,
97 status,
98 timestamp,
99 hostname,
100 service,
101 ddsource,
102 ddtags,
103 } in messages
104 {
105 let mut decoder = source.decoder.clone();
106 let mut buffer = BytesMut::new();
107 buffer.put(message);
108
109 loop {
110 match decoder.decode_eof(&mut buffer) {
111 Ok(Some((events, _byte_size))) => {
112 for mut event in events {
113 if let Event::Log(ref mut log) = event {
114 let namespace = &source.log_namespace;
115 let source_name = "datadog_agent";
116
117 namespace.insert_source_metadata(
118 source_name,
119 log,
120 Some(LegacyKey::InsertIfEmpty(path!("status"))),
121 path!("status"),
122 status.clone(),
123 );
124 namespace.insert_source_metadata(
125 source_name,
126 log,
127 Some(LegacyKey::InsertIfEmpty(path!("timestamp"))),
128 path!("timestamp"),
129 timestamp,
130 );
131 namespace.insert_source_metadata(
132 source_name,
133 log,
134 Some(LegacyKey::InsertIfEmpty(path!("hostname"))),
135 path!("hostname"),
136 hostname.clone(),
137 );
138 namespace.insert_source_metadata(
139 source_name,
140 log,
141 Some(LegacyKey::InsertIfEmpty(path!("service"))),
142 path!("service"),
143 service.clone(),
144 );
145 namespace.insert_source_metadata(
146 source_name,
147 log,
148 Some(LegacyKey::InsertIfEmpty(path!("ddsource"))),
149 path!("ddsource"),
150 ddsource.clone(),
151 );
152
153 let ddtags: Value = if source.parse_ddtags {
154 parse_ddtags(&ddtags)
155 } else {
156 ddtags.clone().into()
157 };
158
159 namespace.insert_source_metadata(
160 source_name,
161 log,
162 Some(LegacyKey::InsertIfEmpty(path!(DDTAGS))),
163 path!(DDTAGS),
164 ddtags,
165 );
166
167 event_bytes_received += log.estimated_json_encoded_size_of();
169
170 namespace.insert_standard_vector_source_metadata(
171 log,
172 DatadogAgentConfig::NAME,
173 now,
174 );
175
176 if let Some(k) = &api_key {
177 log.metadata_mut().set_datadog_api_key(Arc::clone(k));
178 }
179
180 let logs_schema_definition = source
181 .logs_schema_definition
182 .as_ref()
183 .unwrap_or_else(|| panic!("registered log schema required"));
184
185 log.metadata_mut()
186 .set_schema_definition(logs_schema_definition);
187 }
188
189 decoded.push(event);
190 }
191 }
192 Ok(None) => break,
193 Err(error) => {
194 if !error.can_continue() {
197 break;
198 }
199 }
200 }
201 }
202 }
203
204 source
205 .events_received
206 .emit(CountByteSize(decoded.len(), event_bytes_received));
207
208 Ok(decoded)
209}
210
211fn parse_ddtags(ddtags_raw: &Bytes) -> Value {
218 if ddtags_raw.is_empty() {
219 return Vec::<Value>::new().into();
220 }
221
222 let ddtags_str = String::from_utf8_lossy(ddtags_raw);
223
224 let ddtags: Vec<Value> = ddtags_str
226 .split(',')
227 .filter(|kv| !kv.is_empty())
228 .map(|kv| Value::Bytes(Bytes::from(kv.trim().to_string())))
229 .collect();
230
231 if ddtags.is_empty() && !ddtags_str.is_empty() {
232 warn!(message = "`parse_ddtags` set to true and Agent log contains non-empty ddtags string, but no tag-value pairs were parsed.")
233 }
234
235 ddtags.into()
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use similar_asserts::assert_eq;
242 use vrl::value;
243
244 #[test]
245 fn ddtags_parse_empty() {
246 let raw = Bytes::from(String::from(""));
247 let val = parse_ddtags(&raw);
248
249 assert_eq!(val, value!([]));
250 }
251
252 #[test]
253 fn ddtags_parse_bare() {
254 let raw = Bytes::from(String::from("bare"));
255 let val = parse_ddtags(&raw);
256
257 assert_eq!(val, value!(["bare"]));
258 }
259
260 #[test]
261 fn ddtags_parse_kv_one() {
262 let raw = Bytes::from(String::from("filename:driver.log"));
263 let val = parse_ddtags(&raw);
264
265 assert_eq!(val, value!(["filename:driver.log"]));
266 }
267
268 #[test]
269 fn ddtags_parse_kv_multi() {
270 let raw = Bytes::from(String::from("filename:driver.log,wizard:the_grey"));
271 let val = parse_ddtags(&raw);
272
273 assert_eq!(val, value!(["filename:driver.log", "wizard:the_grey"]));
274 }
275
276 #[test]
277 fn ddtags_parse_kv_bare_combo() {
278 let raw = Bytes::from(String::from("filename:driver.log,debug,wizard:the_grey"));
279 let val = parse_ddtags(&raw);
280
281 assert_eq!(
282 val,
283 value!(["filename:driver.log", "debug", "wizard:the_grey"])
284 );
285 }
286}