vector/sources/kubernetes_logs/parser/
cri.rs

1use chrono::{DateTime, Utc};
2use derivative::Derivative;
3use vector_lib::{
4    config::{LegacyKey, LogNamespace, log_schema},
5    conversion,
6    lookup::path,
7};
8
9use crate::{
10    event::{self, Event, Value},
11    internal_events::{
12        DROP_EVENT, ParserConversionError, ParserMatchError, ParserMissingFieldError,
13    },
14    sources::kubernetes_logs::{Config, transform_utils::get_message_path},
15    transforms::{FunctionTransform, OutputBuffer},
16};
17
18const STREAM_KEY: &str = "stream";
19const TIMESTAMP_KEY: &str = "timestamp";
20
21/// Parser for the CRI log format.
22///
23/// Expects logs to arrive in a CRI log format.
24///
25/// CRI log format ([documentation][cri_log_format]) is a simple
26/// newline-separated text format. We rely on regular expressions to parse it.
27///
28/// Normalizes parsed data for consistency.
29///
30/// [cri_log_format]: https://github.com/kubernetes/community/blob/ee2abbf9dbfa4523b414f99a04ddc97bd38c74b2/contributors/design-proposals/node/kubelet-cri-logging.md
31#[derive(Clone, Derivative)]
32#[derivative(Debug)]
33pub(super) struct Cri {
34    log_namespace: LogNamespace,
35}
36
37impl Cri {
38    pub const fn new(log_namespace: LogNamespace) -> Self {
39        Self { log_namespace }
40    }
41}
42
43impl FunctionTransform for Cri {
44    fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
45        let message_path = get_message_path(self.log_namespace);
46
47        // Get the log field with the message, if it exists, and coerce it to bytes.
48        let log = event.as_mut_log();
49        let value = log.remove(&message_path).map(|s| s.coerce_to_bytes());
50        match value {
51            None => {
52                // The message field was missing, inexplicably. If we can't find the message field, there's nothing for
53                // us to actually decode, so there's no event we could emit, and so we just emit the error and return.
54                emit!(ParserMissingFieldError::<DROP_EVENT> {
55                    field: &message_path.to_string()
56                });
57                return;
58            }
59            Some(s) => match parse_log_line(&s) {
60                None => {
61                    emit!(ParserMatchError { value: &s[..] });
62                    return;
63                }
64                Some(parsed_log) => {
65                    // For all fields except `timestamp`, simply treat them as `Value::Bytes`. For
66                    // `timestamp`, however, we actually make sure we can convert it correctly and feed it
67                    // in as `Value::Timestamp`.
68
69                    // MESSAGE
70                    // Insert either directly into `.` or `log_schema().message_key()`,
71                    // overwriting the original "full" CRI log that included additional fields.
72                    drop(log.insert(&message_path, Value::Bytes(s.slice_ref(parsed_log.message))));
73
74                    // MULTILINE_TAG
75                    // If the MULTILINE_TAG is 'P' (partial), insert our generic `_partial` key.
76                    // This is safe to `unwrap()` as we've just ensured this value is a Value::Bytes
77                    // during the above capturing and mapping.
78                    if parsed_log.multiline_tag[0] == b'P' {
79                        self.log_namespace.insert_source_metadata(
80                            Config::NAME,
81                            log,
82                            Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
83                            path!(event::PARTIAL),
84                            true,
85                        );
86                    }
87
88                    // TIMESTAMP_TAG
89                    let ds = String::from_utf8_lossy(parsed_log.timestamp);
90                    match DateTime::parse_from_str(&ds, "%+") {
91                        Ok(dt) =>
92                        // Insert the TIMESTAMP_TAG parsed out of the CRI log, this is the timestamp of
93                        // when the runtime processed this message.
94                        {
95                            self.log_namespace.insert_source_metadata(
96                                Config::NAME,
97                                log,
98                                log_schema().timestamp_key().map(LegacyKey::Overwrite),
99                                path!(TIMESTAMP_KEY),
100                                Value::Timestamp(dt.with_timezone(&Utc)),
101                            )
102                        }
103                        Err(e) => {
104                            emit!(ParserConversionError {
105                                name: TIMESTAMP_KEY,
106                                error: conversion::Error::TimestampParse {
107                                    s: ds.to_string(),
108                                    source: e,
109                                },
110                            });
111                        }
112                    }
113
114                    // STREAM_TAG
115                    self.log_namespace.insert_source_metadata(
116                        Config::NAME,
117                        log,
118                        Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
119                        path!(STREAM_KEY),
120                        Value::Bytes(s.slice_ref(parsed_log.stream)),
121                    );
122                }
123            },
124        }
125
126        output.push(event);
127    }
128}
129
130struct ParsedLog<'a> {
131    timestamp: &'a [u8],
132    stream: &'a [u8],
133    multiline_tag: &'a [u8],
134    message: &'a [u8],
135}
136
137#[allow(clippy::trivially_copy_pass_by_ref)]
138#[inline]
139const fn is_delimiter(c: &u8) -> bool {
140    *c == b' '
141}
142
143/// Parses a CRI log line.
144///
145/// Equivalent to regex: `(?-u)^(?P<timestamp>.*) (?P<stream>(stdout|stderr)) (?P<multiline_tag>(P|F)) (?P<message>.*)(?P<new_line_tag>\n?)$`
146#[inline]
147fn parse_log_line(line: &[u8]) -> Option<ParsedLog<'_>> {
148    let rest = line;
149
150    let after_timestamp_pos = rest.iter().position(is_delimiter)?;
151    let (timestamp, rest) = rest.split_at(after_timestamp_pos + 1);
152    let timestamp = timestamp.split_last()?.1; // Trim the delimiter
153
154    let after_stream_pos = rest.iter().position(is_delimiter)?;
155    let (stream, rest) = rest.split_at(after_stream_pos + 1);
156    let stream = stream.split_last()?.1;
157    if stream != b"stdout".as_ref() && stream != b"stderr".as_ref() {
158        return None;
159    }
160
161    let after_multiline_tag_pos = rest.iter().position(is_delimiter)?;
162    let (multiline_tag, rest) = rest.split_at(after_multiline_tag_pos + 1);
163    let multiline_tag = multiline_tag.split_last()?.1;
164    if multiline_tag != b"F".as_ref() && multiline_tag != b"P".as_ref() {
165        return None;
166    }
167
168    let has_new_line_tag = !rest.is_empty() && *rest.last()? == b'\n';
169    let message = if has_new_line_tag {
170        // Remove the newline tag field, if it exists.
171        // For additional details, see https://github.com/vectordotdev/vector/issues/8606.
172        rest.split_last()?.1
173    } else {
174        rest
175    };
176
177    Some(ParsedLog {
178        timestamp,
179        stream,
180        multiline_tag,
181        message,
182    })
183}
184
185#[cfg(test)]
186pub mod tests {
187    use bytes::Bytes;
188    use vrl::value;
189
190    use super::{super::test_util, *};
191    use crate::{event::LogEvent, test_util::trace_init};
192
193    fn make_long_string(base: &str, len: usize) -> String {
194        base.chars().cycle().take(len).collect()
195    }
196
197    /// Shared test cases.
198    pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
199        vec![
200            (
201                Bytes::from(
202                    "2016-10-06T00:17:09.669794202Z stdout F The content of the log entry 1",
203                ),
204                vec![test_util::make_log_event(
205                    value!("The content of the log entry 1"),
206                    "2016-10-06T00:17:09.669794202Z",
207                    "stdout",
208                    false,
209                    log_namespace,
210                )],
211            ),
212            (
213                Bytes::from("2016-10-06T00:17:09.669794202Z stdout P First line of log entry 2"),
214                vec![test_util::make_log_event(
215                    value!("First line of log entry 2"),
216                    "2016-10-06T00:17:09.669794202Z",
217                    "stdout",
218                    true,
219                    log_namespace,
220                )],
221            ),
222            (
223                Bytes::from(
224                    "2016-10-06T00:17:09.669794202Z stdout P Second line of the log entry 2",
225                ),
226                vec![test_util::make_log_event(
227                    value!("Second line of the log entry 2"),
228                    "2016-10-06T00:17:09.669794202Z",
229                    "stdout",
230                    true,
231                    log_namespace,
232                )],
233            ),
234            (
235                Bytes::from("2016-10-06T00:17:10.113242941Z stderr F Last line of the log entry 2"),
236                vec![test_util::make_log_event(
237                    value!("Last line of the log entry 2"),
238                    "2016-10-06T00:17:10.113242941Z",
239                    "stderr",
240                    false,
241                    log_namespace,
242                )],
243            ),
244            // A part of the partial message with a realistic length.
245            (
246                Bytes::from(
247                    [
248                        r"2016-10-06T00:17:10.113242941Z stdout P ",
249                        make_long_string("very long message ", 16 * 1024).as_str(),
250                    ]
251                    .join(""),
252                ),
253                vec![test_util::make_log_event(
254                    value!(make_long_string("very long message ", 16 * 1024)),
255                    "2016-10-06T00:17:10.113242941Z",
256                    "stdout",
257                    true,
258                    log_namespace,
259                )],
260            ),
261            (
262                // This is not valid UTF-8 string, ends with \n
263                // 2021-08-05T17:35:26.640507539Z stdout P Hello World Привет Ми\xd1\n
264                Bytes::from(vec![
265                    50, 48, 50, 49, 45, 48, 56, 45, 48, 53, 84, 49, 55, 58, 51, 53, 58, 50, 54, 46,
266                    54, 52, 48, 53, 48, 55, 53, 51, 57, 90, 32, 115, 116, 100, 111, 117, 116, 32,
267                    80, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209,
268                    128, 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209, 10,
269                ]),
270                vec![test_util::make_log_event(
271                    value!(Bytes::from(vec![
272                        72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209, 128,
273                        208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209,
274                    ])),
275                    "2021-08-05T17:35:26.640507539Z",
276                    "stdout",
277                    true,
278                    log_namespace,
279                )],
280            ),
281        ]
282    }
283
284    #[test]
285    fn test_parsing_valid_vector_namespace() {
286        trace_init();
287        test_util::test_parser(
288            || Cri::new(LogNamespace::Vector),
289            |bytes| Event::Log(LogEvent::from(value!(bytes))),
290            valid_cases(LogNamespace::Vector),
291        );
292    }
293
294    #[test]
295    fn test_parsing_valid_legacy_namespace() {
296        trace_init();
297        test_util::test_parser(
298            || Cri::new(LogNamespace::Legacy),
299            |bytes| Event::Log(LogEvent::from(bytes)),
300            valid_cases(LogNamespace::Legacy),
301        );
302    }
303}