vector/sources/kubernetes_logs/parser/
docker.rs

1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use serde_json::Value as JsonValue;
4use snafu::{OptionExt, ResultExt, Snafu};
5use vector_lib::{
6    config::{LegacyKey, LogNamespace},
7    lookup::{self, OwnedTargetPath, path},
8};
9
10use crate::{
11    config::log_schema,
12    event::{self, Event, LogEvent, Value},
13    internal_events::KubernetesLogsDockerFormatParseError,
14    sources::kubernetes_logs::{Config, transform_utils::get_message_path},
15    transforms::{FunctionTransform, OutputBuffer},
16};
17
18pub const MESSAGE_KEY: &str = "log";
19pub const STREAM_KEY: &str = "stream";
20pub const TIMESTAMP_KEY: &str = "time";
21
22/// Parser for the Docker log format.
23///
24/// Expects logs to arrive in a JSONLines format with the fields names and
25/// contents specific to the implementation of the Docker `json-file` log driver.
26///
27/// Normalizes parsed data for consistency.
28#[derive(Clone, Debug)]
29pub(super) struct Docker {
30    log_namespace: LogNamespace,
31}
32
33impl Docker {
34    pub const fn new(log_namespace: LogNamespace) -> Self {
35        Self { log_namespace }
36    }
37}
38
39impl FunctionTransform for Docker {
40    fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
41        let log = event.as_mut_log();
42        if let Err(err) = parse_json(log, self.log_namespace) {
43            emit!(KubernetesLogsDockerFormatParseError { error: &err });
44            return;
45        }
46        if let Err(err) = normalize_event(log, self.log_namespace) {
47            emit!(KubernetesLogsDockerFormatParseError { error: &err });
48            return;
49        }
50        output.push(event);
51    }
52}
53
54/// Parses `message` as json object and removes it.
55fn parse_json(log: &mut LogEvent, log_namespace: LogNamespace) -> Result<(), ParsingError> {
56    let target_path = get_message_path(log_namespace);
57
58    let value = log
59        .remove(&target_path)
60        .ok_or(ParsingError::NoMessageField)?;
61
62    let bytes = match value {
63        Value::Bytes(bytes) => bytes,
64        _ => return Err(ParsingError::MessageFieldNotInBytes),
65    };
66
67    match serde_json::from_slice(bytes.as_ref()) {
68        Ok(JsonValue::Object(object)) => {
69            for (key, value) in object {
70                match key.as_str() {
71                    MESSAGE_KEY => drop(log.insert(&target_path, value)),
72                    STREAM_KEY => log_namespace.insert_source_metadata(
73                        Config::NAME,
74                        log,
75                        Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
76                        path!(STREAM_KEY),
77                        value,
78                    ),
79                    TIMESTAMP_KEY => log_namespace.insert_source_metadata(
80                        Config::NAME,
81                        log,
82                        log_schema().timestamp_key().map(LegacyKey::Overwrite),
83                        path!("timestamp"),
84                        value,
85                    ),
86                    _ => unreachable!("all json-file keys should be matched"),
87                };
88            }
89            Ok(())
90        }
91        Ok(_) => Err(ParsingError::NotAnObject { message: bytes }),
92        Err(err) => Err(ParsingError::InvalidJson {
93            source: err,
94            message: bytes,
95        }),
96    }
97}
98
99const DOCKER_MESSAGE_SPLIT_THRESHOLD: usize = 16 * 1024; // 16 Kib
100
101fn normalize_event(
102    log: &mut LogEvent,
103    log_namespace: LogNamespace,
104) -> Result<(), NormalizationError> {
105    // Parse timestamp.
106    let timestamp_key = match log_namespace {
107        LogNamespace::Vector => Some(OwnedTargetPath::metadata(lookup::owned_value_path!(
108            "kubernetes_logs",
109            "timestamp"
110        ))),
111        LogNamespace::Legacy => log_schema()
112            .timestamp_key()
113            .map(|path| OwnedTargetPath::event(path.clone())),
114    };
115
116    if let Some(timestamp_key) = timestamp_key {
117        let time = log.remove(&timestamp_key).context(TimeFieldMissingSnafu)?;
118        let time = time
119            .as_str()
120            .ok_or(NormalizationError::TimeValueUnexpectedType)?;
121        let time = DateTime::parse_from_rfc3339(time.as_ref()).context(TimeParsingSnafu)?;
122        log_namespace.insert_source_metadata(
123            Config::NAME,
124            log,
125            log_schema().timestamp_key().map(LegacyKey::Overwrite),
126            path!("timestamp"),
127            time.with_timezone(&Utc),
128        );
129    }
130
131    // Parse message, remove trailing newline and detect if it's partial.
132    let message_path = get_message_path(log_namespace);
133    let message = log.remove(&message_path).context(LogFieldMissingSnafu)?;
134    let mut message = match message {
135        Value::Bytes(val) => val,
136        _ => return Err(NormalizationError::LogValueUnexpectedType),
137    };
138    // Here we apply out heuristics to detect if message is partial.
139    // Partial messages are only split in docker at the maximum message length
140    // (`DOCKER_MESSAGE_SPLIT_THRESHOLD`).
141    // Thus, for a message to be partial it also has to have exactly that
142    // length.
143    // Now, whether that message will or won't actually be partial if it has
144    // exactly the max length is unknown. We consider all messages with the
145    // exact length of `DOCKER_MESSAGE_SPLIT_THRESHOLD` bytes partial
146    // by default, and then, if they end with newline - consider that
147    // an exception and make them non-partial.
148    // This is still not ideal, and can potentially be improved.
149    let mut is_partial = message.len() == DOCKER_MESSAGE_SPLIT_THRESHOLD;
150    if message.last().map(|&b| b as char == '\n').unwrap_or(false) {
151        message.truncate(message.len() - 1);
152        is_partial = false;
153    };
154    log.insert(&message_path, message);
155
156    // For partial messages add a partial event indicator.
157    if is_partial {
158        log_namespace.insert_source_metadata(
159            Config::NAME,
160            log,
161            Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
162            path!(event::PARTIAL),
163            true,
164        );
165    }
166
167    Ok(())
168}
169
170#[derive(Debug, Snafu)]
171enum ParsingError {
172    NoMessageField,
173    MessageFieldNotInBytes,
174    #[snafu(display(
175        "Could not parse json: {} in message {:?}",
176        source,
177        String::from_utf8_lossy(message)
178    ))]
179    InvalidJson {
180        source: serde_json::Error,
181        message: Bytes,
182    },
183    #[snafu(display("Message was not an object: {:?}", String::from_utf8_lossy(message)))]
184    NotAnObject {
185        message: Bytes,
186    },
187}
188
189#[derive(Debug, Snafu)]
190enum NormalizationError {
191    TimeFieldMissing,
192    TimeValueUnexpectedType,
193    TimeParsing { source: chrono::ParseError },
194    LogFieldMissing,
195    LogValueUnexpectedType,
196}
197
198#[cfg(test)]
199pub mod tests {
200    use vrl::value;
201
202    use super::{super::test_util, *};
203    use crate::test_util::trace_init;
204
205    fn make_long_string(base: &str, len: usize) -> String {
206        base.chars().cycle().take(len).collect()
207    }
208
209    /// Shared test cases.
210    pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
211        vec![
212            (
213                Bytes::from(
214                    r#"{"log": "The actual log line\n", "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
215                ),
216                vec![test_util::make_log_event(
217                    value!("The actual log line"),
218                    "2016-10-05T00:00:30.082640485Z",
219                    "stderr",
220                    false,
221                    log_namespace,
222                )],
223            ),
224            (
225                Bytes::from(
226                    r#"{"log": "A line without newline char at the end", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
227                ),
228                vec![test_util::make_log_event(
229                    value!("A line without newline char at the end"),
230                    "2016-10-05T00:00:30.082640485Z",
231                    "stdout",
232                    false,
233                    log_namespace,
234                )],
235            ),
236            // Partial message due to message length.
237            (
238                Bytes::from(
239                    [
240                        r#"{"log": ""#,
241                        make_long_string("partial ", 16 * 1024).as_str(),
242                        r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
243                    ]
244                    .join(""),
245                ),
246                vec![test_util::make_log_event(
247                    value!(make_long_string("partial ", 16 * 1024)),
248                    "2016-10-05T00:00:30.082640485Z",
249                    "stdout",
250                    true,
251                    log_namespace,
252                )],
253            ),
254            // Non-partial message, because message length matches but
255            // the message also ends with newline.
256            (
257                Bytes::from(
258                    [
259                        r#"{"log": ""#,
260                        make_long_string("non-partial ", 16 * 1024 - 1).as_str(),
261                        r"\n",
262                        r#"", "stream": "stdout", "time": "2016-10-05T00:00:30.082640485Z"}"#,
263                    ]
264                    .join(""),
265                ),
266                vec![test_util::make_log_event(
267                    value!(make_long_string("non-partial ", 16 * 1024 - 1)),
268                    "2016-10-05T00:00:30.082640485Z",
269                    "stdout",
270                    false,
271                    log_namespace,
272                )],
273            ),
274        ]
275    }
276
277    pub fn invalid_cases() -> Vec<Bytes> {
278        vec![
279            // Empty string.
280            Bytes::from(""),
281            // Incomplete.
282            Bytes::from("{"),
283            // Random non-JSON text.
284            Bytes::from("hello world"),
285            // Random JSON non-object.
286            Bytes::from("123"),
287            // Empty JSON object.
288            Bytes::from("{}"),
289            // No timestamp.
290            Bytes::from(r#"{"log": "Hello world", "stream": "stdout"}"#),
291            // Timestamp not a string.
292            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": 123}"#),
293            // Empty timestamp.
294            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": ""}"#),
295            // Invalid timestamp.
296            Bytes::from(r#"{"log": "Hello world", "stream": "stdout", "time": "qwerty"}"#),
297            // No log field.
298            Bytes::from(r#"{"stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#),
299            // Log is not a string.
300            Bytes::from(
301                r#"{"log": 123, "stream": "stderr", "time": "2016-10-05T00:00:30.082640485Z"}"#,
302            ),
303        ]
304    }
305
306    #[test]
307    fn test_parsing_valid_vector_namespace() {
308        trace_init();
309
310        test_util::test_parser(
311            || Docker {
312                log_namespace: LogNamespace::Vector,
313            },
314            |bytes| Event::Log(LogEvent::from(value!(bytes))),
315            valid_cases(LogNamespace::Vector),
316        );
317    }
318
319    #[test]
320    fn test_parsing_valid_legacy_namespace() {
321        trace_init();
322
323        test_util::test_parser(
324            || Docker {
325                log_namespace: LogNamespace::Legacy,
326            },
327            |bytes| Event::Log(LogEvent::from(bytes)),
328            valid_cases(LogNamespace::Legacy),
329        );
330    }
331
332    #[test]
333    fn test_parsing_invalid_vector_namespace() {
334        trace_init();
335
336        let cases = invalid_cases();
337
338        for bytes in cases {
339            let mut parser = Docker::new(LogNamespace::Vector);
340            let input = LogEvent::from(value!(bytes));
341            let mut output = OutputBuffer::default();
342            parser.transform(&mut output, input.into());
343
344            assert!(output.is_empty(), "Expected no events: {output:?}");
345        }
346    }
347
348    #[test]
349    fn test_parsing_invalid_legacy_namespace() {
350        trace_init();
351
352        let cases = invalid_cases();
353
354        for bytes in cases {
355            let mut parser = Docker::new(LogNamespace::Legacy);
356            let input = LogEvent::from(bytes);
357            let mut output = OutputBuffer::default();
358            parser.transform(&mut output, input.into());
359
360            assert!(output.is_empty(), "Expected no events: {output:?}");
361        }
362    }
363}