1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use serde_json::Value as JsonValue;
4use snafu::{OptionExt, ResultExt, Snafu};
5use vector_lib::config::{LegacyKey, LogNamespace};
6use vector_lib::lookup::{self, path, OwnedTargetPath};
7
8use crate::sources::kubernetes_logs::transform_utils::get_message_path;
9use crate::{
10 config::log_schema,
11 event::{self, Event, LogEvent, Value},
12 internal_events::KubernetesLogsDockerFormatParseError,
13 sources::kubernetes_logs::Config,
14 transforms::{FunctionTransform, OutputBuffer},
15};
16
17pub const MESSAGE_KEY: &str = "log";
18pub const STREAM_KEY: &str = "stream";
19pub const TIMESTAMP_KEY: &str = "time";
20
21#[derive(Clone, Debug)]
28pub(super) struct Docker {
29 log_namespace: LogNamespace,
30}
31
32impl Docker {
33 pub const fn new(log_namespace: LogNamespace) -> Self {
34 Self { log_namespace }
35 }
36}
37
38impl FunctionTransform for Docker {
39 fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
40 let log = event.as_mut_log();
41 if let Err(err) = parse_json(log, self.log_namespace) {
42 emit!(KubernetesLogsDockerFormatParseError { error: &err });
43 return;
44 }
45 if let Err(err) = normalize_event(log, self.log_namespace) {
46 emit!(KubernetesLogsDockerFormatParseError { error: &err });
47 return;
48 }
49 output.push(event);
50 }
51}
52
53fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> {
55 let target_path = get_message_path(log_namespace);
56
57 let value = log
58 .remove(&target_path)
59 .ok_or(ParsingError::NoMessageField)?;
60
61 let bytes = match value {
62 Value::Bytes(bytes) => bytes,
63 _ => return Err(ParsingError::MessageFieldNotInBytes),
64 };
65
66 match serde_json::from_slice(bytes.as_ref()) {
67 Ok(JsonValue::Object(object)) => {
68 for (key, value) in object {
69 match key.as_str() {
70 MESSAGE_KEY => drop(log.insert(&target_path, value)),
71 STREAM_KEY => log_namespace.insert_source_metadata(
72 Config::NAME,
73 log,
74 Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
75 path!(STREAM_KEY),
76 value,
77 ),
78 TIMESTAMP_KEY => log_namespace.insert_source_metadata(
79 Config::NAME,
80 log,
81 log_schema().timestamp_key().map(LegacyKey::Overwrite),
82 path!("timestamp"),
83 value,
84 ),
85 _ => unreachable!("all json-file keys should be matched"),
86 };
87 }
88 Ok(())
89 }
90 Ok(_) => Err(ParsingError::NotAnObject { message: bytes }),
91 Err(err) => Err(ParsingError::InvalidJson {
92 source: err,
93 message: bytes,
94 }),
95 }
96}
97
98const DOCKER_MESSAGE_SPLIT_THRESHOLD: usize = 16 * 1024; fn normalize_event(
101 log: &mut LogEvent,
102 log_namespace: LogNamespace,
103) -> Result<(), NormalizationError> {
104 let timestamp_key = match log_namespace {
106 LogNamespace::Vector => Some(OwnedTargetPath::metadata(lookup::owned_value_path!(
107 "kubernetes_logs",
108 "timestamp"
109 ))),
110 LogNamespace::Legacy => log_schema()
111 .timestamp_key()
112 .map(|path| OwnedTargetPath::event(path.clone())),
113 };
114
115 if let Some(timestamp_key) = timestamp_key {
116 let time = log.remove(×tamp_key).context(TimeFieldMissingSnafu)?;
117 let time = time
118 .as_str()
119 .ok_or(NormalizationError::TimeValueUnexpectedType)?;
120 let time = DateTime::parse_from_rfc3339(time.as_ref()).context(TimeParsingSnafu)?;
121 log_namespace.insert_source_metadata(
122 Config::NAME,
123 log,
124 log_schema().timestamp_key().map(LegacyKey::Overwrite),
125 path!("timestamp"),
126 time.with_timezone(&Utc),
127 );
128 }
129
130 let message_path = get_message_path(log_namespace);
132 let message = log.remove(&message_path).context(LogFieldMissingSnafu)?;
133 let mut message = match message {
134 Value::Bytes(val) => val,
135 _ => return Err(NormalizationError::LogValueUnexpectedType),
136 };
137 let mut is_partial = message.len() == DOCKER_MESSAGE_SPLIT_THRESHOLD;
149 if message.last().map(|&b| b as char == '\n').unwrap_or(false) {
150 message.truncate(message.len() - 1);
151 is_partial = false;
152 };
153 log.insert(&message_path, message);
154
155 if is_partial {
157 log_namespace.insert_source_metadata(
158 Config::NAME,
159 log,
160 Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
161 path!(event::PARTIAL),
162 true,
163 );
164 }
165
166 Ok(())
167}
168
169#[derive(Debug, Snafu)]
170enum ParsingError {
171 NoMessageField,
172 MessageFieldNotInBytes,
173 #[snafu(display(
174 "Could not parse json: {} in message {:?}",
175 source,
176 String::from_utf8_lossy(message)
177 ))]
178 InvalidJson {
179 source: serde_json::Error,
180 message: Bytes,
181 },
182 #[snafu(display("Message was not an object: {:?}", String::from_utf8_lossy(message)))]
183 NotAnObject {
184 message: Bytes,
185 },
186}
187
188#[derive(Debug, Snafu)]
189enum NormalizationError {
190 TimeFieldMissing,
191 TimeValueUnexpectedType,
192 TimeParsing { source: chrono::ParseError },
193 LogFieldMissing,
194 LogValueUnexpectedType,
195}
196
197#[cfg(test)]
198pub mod tests {
199 use super::{super::test_util, *};
200 use crate::test_util::trace_init;
201 use vrl::value;
202
203 fn make_long_string(base: &str, len: usize) -> String {
204 base.chars().cycle().take(len).collect()
205 }
206
207 pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
209 vec![
210 (
211 Bytes::from(
212 r#"{"log": "The actual log line\n", "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
213 ),
214 vec![test_util::make_log_event(
215 value!("The actual log line"),
216 "2016-10-05T00:00:30.082640485Z",
217 "stderr",
218 false,
219 log_namespace,
220 )],
221 ),
222 (
223 Bytes::from(
224 r#"{"log": "A line without newline char at the end", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
225 ),
226 vec![test_util::make_log_event(
227 value!("A line without newline char at the end"),
228 "2016-10-05T00:00:30.082640485Z",
229 "stdout",
230 false,
231 log_namespace,
232 )],
233 ),
234 (
236 Bytes::from(
237 [
238 r#"{"log": ""#,
239 make_long_string("partial ", 16 * 1024).as_str(),
240 r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
241 ]
242 .join(""),
243 ),
244 vec![test_util::make_log_event(
245 value!(make_long_string("partial ", 16 * 1024)),
246 "2016-10-05T00:00:30.082640485Z",
247 "stdout",
248 true,
249 log_namespace,
250 )],
251 ),
252 (
255 Bytes::from(
256 [
257 r#"{"log": ""#,
258 make_long_string("non-partial ", 16 * 1024 - 1).as_str(),
259 r"\n",
260 r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
261 ]
262 .join(""),
263 ),
264 vec![test_util::make_log_event(
265 value!(make_long_string("non-partial ", 16 * 1024 - 1)),
266 "2016-10-05T00:00:30.082640485Z",
267 "stdout",
268 false,
269 log_namespace,
270 )],
271 ),
272 ]
273 }
274
275 pub fn invalid_cases() -> Vec<Bytes> {
276 vec![
277 Bytes::from(""),
279 Bytes::from("{"),
281 Bytes::from("hello world"),
283 Bytes::from("123"),
285 Bytes::from("{}"),
287 Bytes::from(r#"{"log": "Hello world", "stream": "stdout"}"#),
289 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": 123}"#),
291 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": ""}"#),
293 Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": "qwerty"}"#),
295 Bytes::from(r#"{"stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#),
297 Bytes::from(
299 r#"{"log": 123, "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
300 ),
301 ]
302 }
303
304 #[test]
305 fn test_parsing_valid_vector_namespace() {
306 trace_init();
307
308 test_util::test_parser(
309 || Docker {
310 log_namespace: LogNamespace::Vector,
311 },
312 |bytes| Event::Log(LogEvent::from(value!(bytes))),
313 valid_cases(LogNamespace::Vector),
314 );
315 }
316
317 #[test]
318 fn test_parsing_valid_legacy_namespace() {
319 trace_init();
320
321 test_util::test_parser(
322 || Docker {
323 log_namespace: LogNamespace::Legacy,
324 },
325 |bytes| Event::Log(LogEvent::from(bytes)),
326 valid_cases(LogNamespace::Legacy),
327 );
328 }
329
330 #[test]
331 fn test_parsing_invalid_vector_namespace() {
332 trace_init();
333
334 let cases = invalid_cases();
335
336 for bytes in cases {
337 let mut parser = Docker::new(LogNamespace::Vector);
338 let input = LogEvent::from(value!(bytes));
339 let mut output = OutputBuffer::default();
340 parser.transform(&mut output, input.into());
341
342 assert!(output.is_empty(), "Expected no events: {output:?}");
343 }
344 }
345
346 #[test]
347 fn test_parsing_invalid_legacy_namespace() {
348 trace_init();
349
350 let cases = invalid_cases();
351
352 for bytes in cases {
353 let mut parser = Docker::new(LogNamespace::Legacy);
354 let input = LogEvent::from(bytes);
355 let mut output = OutputBuffer::default();
356 parser.transform(&mut output, input.into());
357
358 assert!(output.is_empty(), "Expected no events: {output:?}");
359 }
360 }
361}