vector/sources/kubernetes_logs/parser/
cri.rs

1use chrono::{DateTime, Utc};
2use derivative::Derivative;
3use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
4use vector_lib::conversion;
5use vector_lib::lookup::path;
6
7use crate::sources::kubernetes_logs::transform_utils::get_message_path;
8use crate::{
9    event::{self, Event, Value},
10    internal_events::{
11        ParserConversionError, ParserMatchError, ParserMissingFieldError, DROP_EVENT,
12    },
13    sources::kubernetes_logs::Config,
14    transforms::{FunctionTransform, OutputBuffer},
15};
16
17const STREAM_KEY: &str = "stream";
18const TIMESTAMP_KEY: &str = "timestamp";
19
20/// Parser for the CRI log format.
21///
22/// Expects logs to arrive in a CRI log format.
23///
24/// CRI log format ([documentation][cri_log_format]) is a simple
25/// newline-separated text format. We rely on regular expressions to parse it.
26///
27/// Normalizes parsed data for consistency.
28///
29/// [cri_log_format]: https://github.com/kubernetes/community/blob/ee2abbf9dbfa4523b414f99a04ddc97bd38c74b2/contributors/design-proposals/node/kubelet-cri-logging.md
30#[derive(Clone, Derivative)]
31#[derivative(Debug)]
32pub(super) struct Cri {
33    log_namespace: LogNamespace,
34}
35
36impl Cri {
37    pub const fn new(log_namespace: LogNamespace) -> Self {
38        Self { log_namespace }
39    }
40}
41
42impl FunctionTransform for Cri {
43    fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
44        let message_path = get_message_path(self.log_namespace);
45
46        // Get the log field with the message, if it exists, and coerce it to bytes.
47        let log = event.as_mut_log();
48        let value = log.remove(&message_path).map(|s| s.coerce_to_bytes());
49        match value {
50            None => {
51                // The message field was missing, inexplicably. If we can't find the message field, there's nothing for
52                // us to actually decode, so there's no event we could emit, and so we just emit the error and return.
53                emit!(ParserMissingFieldError::<DROP_EVENT> {
54                    field: &message_path.to_string()
55                });
56                return;
57            }
58            Some(s) => match parse_log_line(&s) {
59                None => {
60                    emit!(ParserMatchError { value: &s[..] });
61                    return;
62                }
63                Some(parsed_log) => {
64                    // For all fields except `timestamp`, simply treat them as `Value::Bytes`. For
65                    // `timestamp`, however, we actually make sure we can convert it correctly and feed it
66                    // in as `Value::Timestamp`.
67
68                    // MESSAGE
69                    // Insert either directly into `.` or `log_schema().message_key()`,
70                    // overwriting the original "full" CRI log that included additional fields.
71                    drop(log.insert(&message_path, Value::Bytes(s.slice_ref(parsed_log.message))));
72
73                    // MULTILINE_TAG
74                    // If the MULTILINE_TAG is 'P' (partial), insert our generic `_partial` key.
75                    // This is safe to `unwrap()` as we've just ensured this value is a Value::Bytes
76                    // during the above capturing and mapping.
77                    if parsed_log.multiline_tag[0] == b'P' {
78                        self.log_namespace.insert_source_metadata(
79                            Config::NAME,
80                            log,
81                            Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
82                            path!(event::PARTIAL),
83                            true,
84                        );
85                    }
86
87                    // TIMESTAMP_TAG
88                    let ds = String::from_utf8_lossy(parsed_log.timestamp);
89                    match DateTime::parse_from_str(&ds, "%+") {
90                        Ok(dt) =>
91                        // Insert the TIMESTAMP_TAG parsed out of the CRI log, this is the timestamp of
92                        // when the runtime processed this message.
93                        {
94                            self.log_namespace.insert_source_metadata(
95                                Config::NAME,
96                                log,
97                                log_schema().timestamp_key().map(LegacyKey::Overwrite),
98                                path!(TIMESTAMP_KEY),
99                                Value::Timestamp(dt.with_timezone(&Utc)),
100                            )
101                        }
102                        Err(e) => {
103                            emit!(ParserConversionError {
104                                name: TIMESTAMP_KEY,
105                                error: conversion::Error::TimestampParse {
106                                    s: ds.to_string(),
107                                    source: e,
108                                },
109                            });
110                        }
111                    }
112
113                    // STREAM_TAG
114                    self.log_namespace.insert_source_metadata(
115                        Config::NAME,
116                        log,
117                        Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
118                        path!(STREAM_KEY),
119                        Value::Bytes(s.slice_ref(parsed_log.stream)),
120                    );
121                }
122            },
123        }
124
125        output.push(event);
126    }
127}
128
129struct ParsedLog<'a> {
130    timestamp: &'a [u8],
131    stream: &'a [u8],
132    multiline_tag: &'a [u8],
133    message: &'a [u8],
134}
135
136#[allow(clippy::trivially_copy_pass_by_ref)]
137#[inline]
138const fn is_delimiter(c: &u8) -> bool {
139    *c == b' '
140}
141
142/// Parses a CRI log line.
143///
144/// Equivalent to regex: `(?-u)^(?P<timestamp>.*) (?P<stream>(stdout|stderr)) (?P<multiline_tag>(P|F)) (?P<message>.*)(?P<new_line_tag>\n?)$`
145#[inline]
146fn parse_log_line(line: &[u8]) -> Option<ParsedLog> {
147    let rest = line;
148
149    let after_timestamp_pos = rest.iter().position(is_delimiter)?;
150    let (timestamp, rest) = rest.split_at(after_timestamp_pos + 1);
151    let timestamp = timestamp.split_last()?.1; // Trim the delimiter
152
153    let after_stream_pos = rest.iter().position(is_delimiter)?;
154    let (stream, rest) = rest.split_at(after_stream_pos + 1);
155    let stream = stream.split_last()?.1;
156    if stream != b"stdout".as_ref() && stream != b"stderr".as_ref() {
157        return None;
158    }
159
160    let after_multiline_tag_pos = rest.iter().position(is_delimiter)?;
161    let (multiline_tag, rest) = rest.split_at(after_multiline_tag_pos + 1);
162    let multiline_tag = multiline_tag.split_last()?.1;
163    if multiline_tag != b"F".as_ref() && multiline_tag != b"P".as_ref() {
164        return None;
165    }
166
167    let has_new_line_tag = !rest.is_empty() && *rest.last()? == b'\n';
168    let message = if has_new_line_tag {
169        // Remove the newline tag field, if it exists.
170        // For additional details, see https://github.com/vectordotdev/vector/issues/8606.
171        rest.split_last()?.1
172    } else {
173        rest
174    };
175
176    Some(ParsedLog {
177        timestamp,
178        stream,
179        multiline_tag,
180        message,
181    })
182}
183
184#[cfg(test)]
185pub mod tests {
186    use bytes::Bytes;
187
188    use super::{super::test_util, *};
189    use crate::{event::LogEvent, test_util::trace_init};
190    use vrl::value;
191
192    fn make_long_string(base: &str, len: usize) -> String {
193        base.chars().cycle().take(len).collect()
194    }
195
196    /// Shared test cases.
197    pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
198        vec![
199            (
200                Bytes::from(
201                    "2016-10-06T00:17:09.669794202Z stdout F The content of the log entry 1",
202                ),
203                vec![test_util::make_log_event(
204                    value!("The content of the log entry 1"),
205                    "2016-10-06T00:17:09.669794202Z",
206                    "stdout",
207                    false,
208                    log_namespace,
209                )],
210            ),
211            (
212                Bytes::from("2016-10-06T00:17:09.669794202Z stdout P First line of log entry 2"),
213                vec![test_util::make_log_event(
214                    value!("First line of log entry 2"),
215                    "2016-10-06T00:17:09.669794202Z",
216                    "stdout",
217                    true,
218                    log_namespace,
219                )],
220            ),
221            (
222                Bytes::from(
223                    "2016-10-06T00:17:09.669794202Z stdout P Second line of the log entry 2",
224                ),
225                vec![test_util::make_log_event(
226                    value!("Second line of the log entry 2"),
227                    "2016-10-06T00:17:09.669794202Z",
228                    "stdout",
229                    true,
230                    log_namespace,
231                )],
232            ),
233            (
234                Bytes::from("2016-10-06T00:17:10.113242941Z stderr F Last line of the log entry 2"),
235                vec![test_util::make_log_event(
236                    value!("Last line of the log entry 2"),
237                    "2016-10-06T00:17:10.113242941Z",
238                    "stderr",
239                    false,
240                    log_namespace,
241                )],
242            ),
243            // A part of the partial message with a realistic length.
244            (
245                Bytes::from(
246                    [
247                        r"2016-10-06T00:17:10.113242941Z stdout P ",
248                        make_long_string("very long message ", 16 * 1024).as_str(),
249                    ]
250                    .join(""),
251                ),
252                vec![test_util::make_log_event(
253                    value!(make_long_string("very long message ", 16 * 1024)),
254                    "2016-10-06T00:17:10.113242941Z",
255                    "stdout",
256                    true,
257                    log_namespace,
258                )],
259            ),
260            (
261                // This is not valid UTF-8 string, ends with \n
262                // 2021-08-05T17:35:26.640507539Z stdout P Hello World Привет Ми\xd1\n
263                Bytes::from(vec![
264                    50, 48, 50, 49, 45, 48, 56, 45, 48, 53, 84, 49, 55, 58, 51, 53, 58, 50, 54, 46,
265                    54, 52, 48, 53, 48, 55, 53, 51, 57, 90, 32, 115, 116, 100, 111, 117, 116, 32,
266                    80, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209,
267                    128, 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209, 10,
268                ]),
269                vec![test_util::make_log_event(
270                    value!(Bytes::from(vec![
271                        72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209, 128,
272                        208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209,
273                    ])),
274                    "2021-08-05T17:35:26.640507539Z",
275                    "stdout",
276                    true,
277                    log_namespace,
278                )],
279            ),
280        ]
281    }
282
283    #[test]
284    fn test_parsing_valid_vector_namespace() {
285        trace_init();
286        test_util::test_parser(
287            || Cri::new(LogNamespace::Vector),
288            |bytes| Event::Log(LogEvent::from(value!(bytes))),
289            valid_cases(LogNamespace::Vector),
290        );
291    }
292
293    #[test]
294    fn test_parsing_valid_legacy_namespace() {
295        trace_init();
296        test_util::test_parser(
297            || Cri::new(LogNamespace::Legacy),
298            |bytes| Event::Log(LogEvent::from(bytes)),
299            valid_cases(LogNamespace::Legacy),
300        );
301    }
302}