vector/sources/kubernetes_logs/parser/
docker.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use serde_json::Value as JsonValue;
4use snafu::{OptionExt, ResultExt, Snafu};
5use vector_lib::config::{LegacyKey, LogNamespace};
6use vector_lib::lookup::{self, path, OwnedTargetPath};
7
8use crate::sources::kubernetes_logs::transform_utils::get_message_path;
9use crate::{
10    config::log_schema,
11    event::{self, Event, LogEvent, Value},
12    internal_events::KubernetesLogsDockerFormatParseError,
13    sources::kubernetes_logs::Config,
14    transforms::{FunctionTransform, OutputBuffer},
15};
16
17pub const MESSAGE_KEY: &str = "log";
18pub const STREAM_KEY: &str = "stream";
19pub const TIMESTAMP_KEY: &str = "time";
20
21/// Parser for the Docker log format.
22///
23/// Expects logs to arrive in a JSONLines format with the fields names and
24/// contents specific to the implementation of the Docker `json-file` log driver.
25///
26/// Normalizes parsed data for consistency.
27#[derive(Clone, Debug)]
28pub(super) struct Docker {
29    log_namespace: LogNamespace,
30}
31
32impl Docker {
33    pub const fn new(log_namespace: LogNamespace) -> Self {
34        Self { log_namespace }
35    }
36}
37
38impl FunctionTransform for Docker {
39    fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
40        let log = event.as_mut_log();
41        if let Err(err) = parse_json(log, self.log_namespace) {
42            emit!(KubernetesLogsDockerFormatParseError { error: &err });
43            return;
44        }
45        if let Err(err) = normalize_event(log, self.log_namespace) {
46            emit!(KubernetesLogsDockerFormatParseError { error: &err });
47            return;
48        }
49        output.push(event);
50    }
51}
52
53/// Parses `message` as json object and removes it.
54fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> {
55    let target_path = get_message_path(log_namespace);
56
57    let value = log
58        .remove(&target_path)
59        .ok_or(ParsingError::NoMessageField)?;
60
61    let bytes = match value {
62        Value::Bytes(bytes) => bytes,
63        _ => return Err(ParsingError::MessageFieldNotInBytes),
64    };
65
66    match serde_json::from_slice(bytes.as_ref()) {
67        Ok(JsonValue::Object(object)) => {
68            for (key, value) in object {
69                match key.as_str() {
70                    MESSAGE_KEY => drop(log.insert(&target_path, value)),
71                    STREAM_KEY => log_namespace.insert_source_metadata(
72                        Config::NAME,
73                        log,
74                        Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
75                        path!(STREAM_KEY),
76                        value,
77                    ),
78                    TIMESTAMP_KEY => log_namespace.insert_source_metadata(
79                        Config::NAME,
80                        log,
81                        log_schema().timestamp_key().map(LegacyKey::Overwrite),
82                        path!("timestamp"),
83                        value,
84                    ),
85                    _ => unreachable!("all json-file keys should be matched"),
86                };
87            }
88            Ok(())
89        }
90        Ok(_) => Err(ParsingError::NotAnObject { message: bytes }),
91        Err(err) => Err(ParsingError::InvalidJson {
92            source: err,
93            message: bytes,
94        }),
95    }
96}
97
98const DOCKER_MESSAGE_SPLIT_THRESHOLD: usize = 16 * 1024; // 16 Kib
99
100fn normalize_event(
101    log: &mut LogEvent,
102    log_namespace: LogNamespace,
103) -> Result<(), NormalizationError> {
104    // Parse timestamp.
105    let timestamp_key = match log_namespace {
106        LogNamespace::Vector => Some(OwnedTargetPath::metadata(lookup::owned_value_path!(
107            "kubernetes_logs",
108            "timestamp"
109        ))),
110        LogNamespace::Legacy => log_schema()
111            .timestamp_key()
112            .map(|path| OwnedTargetPath::event(path.clone())),
113    };
114
115    if let Some(timestamp_key) = timestamp_key {
116        let time = log.remove(&timestamp_key).context(TimeFieldMissingSnafu)?;
117        let time = time
118            .as_str()
119            .ok_or(NormalizationError::TimeValueUnexpectedType)?;
120        let time = DateTime::parse_from_rfc3339(time.as_ref()).context(TimeParsingSnafu)?;
121        log_namespace.insert_source_metadata(
122            Config::NAME,
123            log,
124            log_schema().timestamp_key().map(LegacyKey::Overwrite),
125            path!("timestamp"),
126            time.with_timezone(&Utc),
127        );
128    }
129
130    // Parse message, remove trailing newline and detect if it's partial.
131    let message_path = get_message_path(log_namespace);
132    let message = log.remove(&message_path).context(LogFieldMissingSnafu)?;
133    let mut message = match message {
134        Value::Bytes(val) => val,
135        _ => return Err(NormalizationError::LogValueUnexpectedType),
136    };
137    // Here we apply out heuristics to detect if message is partial.
138    // Partial messages are only split in docker at the maximum message length
139    // (`DOCKER_MESSAGE_SPLIT_THRESHOLD`).
140    // Thus, for a message to be partial it also has to have exactly that
141    // length.
142    // Now, whether that message will or won't actually be partial if it has
143    // exactly the max length is unknown. We consider all messages with the
144    // exact length of `DOCKER_MESSAGE_SPLIT_THRESHOLD` bytes partial
145    // by default, and then, if they end with newline - consider that
146    // an exception and make them non-partial.
147    // This is still not ideal, and can potentially be improved.
148    let mut is_partial = message.len() == DOCKER_MESSAGE_SPLIT_THRESHOLD;
149    if message.last().map(|&b| b as char == '\n').unwrap_or(false) {
150        message.truncate(message.len() - 1);
151        is_partial = false;
152    };
153    log.insert(&message_path, message);
154
155    // For partial messages add a partial event indicator.
156    if is_partial {
157        log_namespace.insert_source_metadata(
158            Config::NAME,
159            log,
160            Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
161            path!(event::PARTIAL),
162            true,
163        );
164    }
165
166    Ok(())
167}
168
169#[derive(Debug, Snafu)]
170enum ParsingError {
171    NoMessageField,
172    MessageFieldNotInBytes,
173    #[snafu(display(
174        "Could not parse json: {} in message {:?}",
175        source,
176        String::from_utf8_lossy(message)
177    ))]
178    InvalidJson {
179        source: serde_json::Error,
180        message: Bytes,
181    },
182    #[snafu(display("Message was not an object: {:?}", String::from_utf8_lossy(message)))]
183    NotAnObject {
184        message: Bytes,
185    },
186}
187
188#[derive(Debug, Snafu)]
189enum NormalizationError {
190    TimeFieldMissing,
191    TimeValueUnexpectedType,
192    TimeParsing { source: chrono::ParseError },
193    LogFieldMissing,
194    LogValueUnexpectedType,
195}
196
197#[cfg(test)]
198pub mod tests {
199    use super::{super::test_util, *};
200    use crate::test_util::trace_init;
201    use vrl::value;
202
203    fn make_long_string(base: &str, len: usize) -> String {
204        base.chars().cycle().take(len).collect()
205    }
206
207    /// Shared test cases.
208    pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
209        vec![
210            (
211                Bytes::from(
212                    r#"{"log": "The actual log line\n", "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
213                ),
214                vec![test_util::make_log_event(
215                    value!("The actual log line"),
216                    "2016-10-05T00:00:30.082640485Z",
217                    "stderr",
218                    false,
219                    log_namespace,
220                )],
221            ),
222            (
223                Bytes::from(
224                    r#"{"log": "A line without newline char at the end", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
225                ),
226                vec![test_util::make_log_event(
227                    value!("A line without newline char at the end"),
228                    "2016-10-05T00:00:30.082640485Z",
229                    "stdout",
230                    false,
231                    log_namespace,
232                )],
233            ),
234            // Partial message due to message length.
235            (
236                Bytes::from(
237                    [
238                        r#"{"log": ""#,
239                        make_long_string("partial ", 16 * 1024).as_str(),
240                        r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
241                    ]
242                    .join(""),
243                ),
244                vec![test_util::make_log_event(
245                    value!(make_long_string("partial ", 16 * 1024)),
246                    "2016-10-05T00:00:30.082640485Z",
247                    "stdout",
248                    true,
249                    log_namespace,
250                )],
251            ),
252            // Non-partial message, because message length matches but
253            // the message also ends with newline.
254            (
255                Bytes::from(
256                    [
257                        r#"{"log": ""#,
258                        make_long_string("non-partial ", 16 * 1024 - 1).as_str(),
259                        r"\n",
260                        r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
261                    ]
262                    .join(""),
263                ),
264                vec![test_util::make_log_event(
265                    value!(make_long_string("non-partial ", 16 * 1024 - 1)),
266                    "2016-10-05T00:00:30.082640485Z",
267                    "stdout",
268                    false,
269                    log_namespace,
270                )],
271            ),
272        ]
273    }
274
275    pub fn invalid_cases() -> Vec<Bytes> {
276        vec![
277            // Empty string.
278            Bytes::from(""),
279            // Incomplete.
280            Bytes::from("{"),
281            // Random non-JSON text.
282            Bytes::from("hello world"),
283            // Random JSON non-object.
284            Bytes::from("123"),
285            // Empty JSON object.
286            Bytes::from("{}"),
287            // No timestamp.
288            Bytes::from(r#"{"log": "Hello world", "stream": "stdout"}"#),
289            // Timestamp not a string.
290            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": 123}"#),
291            // Empty timestamp.
292            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": ""}"#),
293            // Invalid timestamp.
294            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": "qwerty"}"#),
295            // No log field.
296            Bytes::from(r#"{"stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#),
297            // Log is not a string.
298            Bytes::from(
299                r#"{"log": 123, "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
300            ),
301        ]
302    }
303
304    #[test]
305    fn test_parsing_valid_vector_namespace() {
306        trace_init();
307
308        test_util::test_parser(
309            || Docker {
310                log_namespace: LogNamespace::Vector,
311            },
312            |bytes| Event::Log(LogEvent::from(value!(bytes))),
313            valid_cases(LogNamespace::Vector),
314        );
315    }
316
317    #[test]
318    fn test_parsing_valid_legacy_namespace() {
319        trace_init();
320
321        test_util::test_parser(
322            || Docker {
323                log_namespace: LogNamespace::Legacy,
324            },
325            |bytes| Event::Log(LogEvent::from(bytes)),
326            valid_cases(LogNamespace::Legacy),
327        );
328    }
329
330    #[test]
331    fn test_parsing_invalid_vector_namespace() {
332        trace_init();
333
334        let cases = invalid_cases();
335
336        for bytes in cases {
337            let mut parser = Docker::new(LogNamespace::Vector);
338            let input = LogEvent::from(value!(bytes));
339            let mut output = OutputBuffer::default();
340            parser.transform(&mut output, input.into());
341
342            assert!(output.is_empty(), "Expected no events: {output:?}");
343        }
344    }
345
346    #[test]
347    fn test_parsing_invalid_legacy_namespace() {
348        trace_init();
349
350        let cases = invalid_cases();
351
352        for bytes in cases {
353            let mut parser = Docker::new(LogNamespace::Legacy);
354            let input = LogEvent::from(bytes);
355            let mut output = OutputBuffer::default();
356            parser.transform(&mut output, input.into());
357
358            assert!(output.is_empty(), "Expected no events: {output:?}");
359        }
360    }
361}