1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use serde_json::Value as JsonValue;
4use snafu::{OptionExt, ResultExt, Snafu};
5use vector_lib::{
6 config::{LegacyKey, LogNamespace},
7 lookup::{self, OwnedTargetPath, path},
8};
9
10use crate::{
11 config::log_schema,
12 event::{self, Event, LogEvent, Value},
13 internal_events::KubernetesLogsDockerFormatParseError,
14 sources::kubernetes_logs::{Config, transform_utils::get_message_path},
15 transforms::{FunctionTransform, OutputBuffer},
16};
17
18pub const MESSAGE_KEY: &str = "log";
19pub const STREAM_KEY: &str = "stream";
20pub const TIMESTAMP_KEY: &str = "time";
21
22#[derive(Clone, Debug)]
29pub(super) struct Docker {
30 log_namespace: LogNamespace,
31}
32
33impl Docker {
34 pub const fn new(log_namespace: LogNamespace) -> Self {
35 Self { log_namespace }
36 }
37}
38
39impl FunctionTransform for Docker {
40 fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
41 let log = event.as_mut_log();
42 if let Err(err) = parse_json(log, self.log_namespace) {
43 emit!(KubernetesLogsDockerFormatParseError { error: &err });
44 return;
45 }
46 if let Err(err) = normalize_event(log, self.log_namespace) {
47 emit!(KubernetesLogsDockerFormatParseError { error: &err });
48 return;
49 }
50 output.push(event);
51 }
52}
53
54fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> {
56 let target_path = get_message_path(log_namespace);
57
58 let value = log
59 .remove(&target_path)
60 .ok_or(ParsingError::NoMessageField)?;
61
62 let bytes = match value {
63 Value::Bytes(bytes) => bytes,
64 _ => return Err(ParsingError::MessageFieldNotInBytes),
65 };
66
67 match serde_json::from_slice(bytes.as_ref()) {
68 Ok(JsonValue::Object(object)) => {
69 for (key, value) in object {
70 match key.as_str() {
71 MESSAGE_KEY => drop(log.insert(&target_path, value)),
72 STREAM_KEY => log_namespace.insert_source_metadata(
73 Config::NAME,
74 log,
75 Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
76 path!(STREAM_KEY),
77 value,
78 ),
79 TIMESTAMP_KEY => log_namespace.insert_source_metadata(
80 Config::NAME,
81 log,
82 log_schema().timestamp_key().map(LegacyKey::Overwrite),
83 path!("timestamp"),
84 value,
85 ),
86 _ => unreachable!("all json-file keys should be matched"),
87 };
88 }
89 Ok(())
90 }
91 Ok(_) => Err(ParsingError::NotAnObject { message: bytes }),
92 Err(err) => Err(ParsingError::InvalidJson {
93 source: err,
94 message: bytes,
95 }),
96 }
97}
98
99const DOCKER_MESSAGE_SPLIT_THRESHOLD: usize = 16 * 1024; fn normalize_event(
102 log: &mut LogEvent,
103 log_namespace: LogNamespace,
104) -> Result<(), NormalizationError> {
105 let timestamp_key = match log_namespace {
107 LogNamespace::Vector => Some(OwnedTargetPath::metadata(lookup::owned_value_path!(
108 "kubernetes_logs",
109 "timestamp"
110 ))),
111 LogNamespace::Legacy => log_schema()
112 .timestamp_key()
113 .map(|path| OwnedTargetPath::event(path.clone())),
114 };
115
116 if let Some(timestamp_key) = timestamp_key {
117 let time = log.remove(×tamp_key).context(TimeFieldMissingSnafu)?;
118 let time = time
119 .as_str()
120 .ok_or(NormalizationError::TimeValueUnexpectedType)?;
121 let time = DateTime::parse_from_rfc3339(time.as_ref()).context(TimeParsingSnafu)?;
122 log_namespace.insert_source_metadata(
123 Config::NAME,
124 log,
125 log_schema().timestamp_key().map(LegacyKey::Overwrite),
126 path!("timestamp"),
127 time.with_timezone(&Utc),
128 );
129 }
130
131 let message_path = get_message_path(log_namespace);
133 let message = log.remove(&message_path).context(LogFieldMissingSnafu)?;
134 let mut message = match message {
135 Value::Bytes(val) => val,
136 _ => return Err(NormalizationError::LogValueUnexpectedType),
137 };
138 let mut is_partial = message.len() == DOCKER_MESSAGE_SPLIT_THRESHOLD;
150 if message.last().map(|&b| b as char == '\n').unwrap_or(false) {
151 message.truncate(message.len() - 1);
152 is_partial = false;
153 };
154 log.insert(&message_path, message);
155
156 if is_partial {
158 log_namespace.insert_source_metadata(
159 Config::NAME,
160 log,
161 Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
162 path!(event::PARTIAL),
163 true,
164 );
165 }
166
167 Ok(())
168}
169
170#[derive(Debug, Snafu)]
171enum ParsingError {
172 NoMessageField,
173 MessageFieldNotInBytes,
174 #[snafu(display(
175 "Could not parse json: {} in message {:?}",
176 source,
177 String::from_utf8_lossy(message)
178 ))]
179 InvalidJson {
180 source: serde_json::Error,
181 message: Bytes,
182 },
183 #[snafu(display("Message was not an object: {:?}", String::from_utf8_lossy(message)))]
184 NotAnObject {
185 message: Bytes,
186 },
187}
188
189#[derive(Debug, Snafu)]
190enum NormalizationError {
191 TimeFieldMissing,
192 TimeValueUnexpectedType,
193 TimeParsing { source: chrono::ParseError },
194 LogFieldMissing,
195 LogValueUnexpectedType,
196}
197
198#[cfg(test)]
199pub mod tests {
200 use vrl::value;
201
202 use super::{super::test_util, *};
203 use crate::test_util::trace_init;
204
205 fn make_long_string(base: &str, len: usize) -> String {
206 base.chars().cycle().take(len).collect()
207 }
208
209 pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
211 vec![
212 (
213 Bytes::from(
214 r#"{"log": "The actual log line\n", "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
215 ),
216 vec![test_util::make_log_event(
217 value!("The actual log line"),
218 "2016-10-05T00:00:30.082640485Z",
219 "stderr",
220 false,
221 log_namespace,
222 )],
223 ),
224 (
225 Bytes::from(
226 r#"{"log": "A line without newline char at the end", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
227 ),
228 vec![test_util::make_log_event(
229 value!("A line without newline char at the end"),
230 "2016-10-05T00:00:30.082640485Z",
231 "stdout",
232 false,
233 log_namespace,
234 )],
235 ),
236 (
238 Bytes::from(
239 [
240 r#"{"log": ""#,
241 make_long_string("partial ", 16 * 1024).as_str(),
242 r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
243 ]
244 .join(""),
245 ),
246 vec![test_util::make_log_event(
247 value!(make_long_string("partial ", 16 * 1024)),
248 "2016-10-05T00:00:30.082640485Z",
249 "stdout",
250 true,
251 log_namespace,
252 )],
253 ),
254 (
257 Bytes::from(
258 [
259 r#"{"log": ""#,
260 make_long_string("non-partial ", 16 * 1024 - 1).as_str(),
261 r"\n",
262 r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
263 ]
264 .join(""),
265 ),
266 vec![test_util::make_log_event(
267 value!(make_long_string("non-partial ", 16 * 1024 - 1)),
268 "2016-10-05T00:00:30.082640485Z",
269 "stdout",
270 false,
271 log_namespace,
272 )],
273 ),
274 ]
275 }
276
277 pub fn invalid_cases() -> Vec<Bytes> {
278 vec![
279 Bytes::from(""),
281 Bytes::from("{"),
283 Bytes::from("hello world"),
285 Bytes::from("123"),
287 Bytes::from("{}"),
289 Bytes::from(r#"{"log": "Hello world", "stream": "stdout"}"#),
291 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": 123}"#),
293 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": ""}"#),
295 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": "qwerty"}"#),
297 Bytes::from(r#"{"stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#),
299 Bytes::from(
301 r#"{"log": 123, "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
302 ),
303 ]
304 }
305
306 #[test]
307 fn test_parsing_valid_vector_namespace() {
308 trace_init();
309
310 test_util::test_parser(
311 || Docker {
312 log_namespace: LogNamespace::Vector,
313 },
314 |bytes| Event::Log(LogEvent::from(value!(bytes))),
315 valid_cases(LogNamespace::Vector),
316 );
317 }
318
319 #[test]
320 fn test_parsing_valid_legacy_namespace() {
321 trace_init();
322
323 test_util::test_parser(
324 || Docker {
325 log_namespace: LogNamespace::Legacy,
326 },
327 |bytes| Event::Log(LogEvent::from(bytes)),
328 valid_cases(LogNamespace::Legacy),
329 );
330 }
331
332 #[test]
333 fn test_parsing_invalid_vector_namespace() {
334 trace_init();
335
336 let cases = invalid_cases();
337
338 for bytes in cases {
339 let mut parser = Docker::new(LogNamespace::Vector);
340 let input = LogEvent::from(value!(bytes));
341 let mut output = OutputBuffer::default();
342 parser.transform(&mut output, input.into());
343
344 assert!(output.is_empty(), "Expected no events: {output:?}");
345 }
346 }
347
348 #[test]
349 fn test_parsing_invalid_legacy_namespace() {
350 trace_init();
351
352 let cases = invalid_cases();
353
354 for bytes in cases {
355 let mut parser = Docker::new(LogNamespace::Legacy);
356 let input = LogEvent::from(bytes);
357 let mut output = OutputBuffer::default();
358 parser.transform(&mut output, input.into());
359
360 assert!(output.is_empty(), "Expected no events: {output:?}");
361 }
362 }
363}