vector/sources/kubernetes_logs/parser/
mod.rs

1mod cri;
2mod docker;
3mod test_util;
4
5use vector_lib::config::LogNamespace;
6
7use crate::sources::kubernetes_logs::transform_utils::get_message_path;
8use crate::{
9    event::{Event, Value},
10    internal_events::KubernetesLogsFormatPickerEdgeCase,
11    transforms::{FunctionTransform, OutputBuffer},
12};
13
14#[derive(Clone, Debug)]
15enum ParserState {
16    /// Runtime has not yet been detected.
17    Uninitialized,
18
19    /// Docker runtime is being used.
20    Docker(docker::Docker),
21
22    /// CRI is being used.
23    Cri(cri::Cri),
24}
25
26#[derive(Clone, Debug)]
27pub struct Parser {
28    state: ParserState,
29    log_namespace: LogNamespace,
30}
31
32impl Parser {
33    pub const fn new(log_namespace: LogNamespace) -> Self {
34        Self {
35            state: ParserState::Uninitialized,
36            log_namespace,
37        }
38    }
39}
40
41impl FunctionTransform for Parser {
42    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
43        match &mut self.state {
44            ParserState::Uninitialized => {
45                let message_field = get_message_path(self.log_namespace);
46                let message = match event.as_log().get(&message_field) {
47                    Some(message) => message,
48                    None => {
49                        emit!(KubernetesLogsFormatPickerEdgeCase {
50                            what: "got an event without a message"
51                        });
52                        return;
53                    }
54                };
55
56                let bytes = match message {
57                    Value::Bytes(bytes) => bytes,
58                    _ => {
59                        emit!(KubernetesLogsFormatPickerEdgeCase {
60                            what: "got an event with non-bytes message"
61                        });
62                        return;
63                    }
64                };
65
66                self.state = if bytes.len() > 1 && bytes[0] == b'{' {
67                    ParserState::Docker(docker::Docker::new(self.log_namespace))
68                } else {
69                    ParserState::Cri(cri::Cri::new(self.log_namespace))
70                };
71                self.transform(output, event)
72            }
73            ParserState::Docker(t) => t.transform(output, event),
74            ParserState::Cri(t) => t.transform(output, event),
75        }
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use bytes::Bytes;
82    use vector_lib::lookup::event_path;
83    use vrl::value;
84
85    use super::*;
86    use crate::{event::Event, event::LogEvent, test_util::trace_init};
87
88    /// Picker has to work for all test cases for underlying parsers.
89    fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
90        let mut valid_cases = vec![];
91        valid_cases.extend(docker::tests::valid_cases(log_namespace));
92        valid_cases.extend(cri::tests::valid_cases(log_namespace));
93        valid_cases
94    }
95
96    fn invalid_cases() -> Vec<Bytes> {
97        let mut invalid_cases = vec![];
98        invalid_cases.extend(docker::tests::invalid_cases());
99        invalid_cases
100    }
101
102    #[test]
103    fn test_parsing_valid_vector_namespace() {
104        trace_init();
105        test_util::test_parser(
106            || Parser::new(LogNamespace::Vector),
107            |bytes| Event::Log(LogEvent::from(value!(bytes))),
108            valid_cases(LogNamespace::Vector),
109        );
110    }
111
112    #[test]
113    fn test_parsing_valid_legacy_namespace() {
114        trace_init();
115        test_util::test_parser(
116            || Parser::new(LogNamespace::Legacy),
117            |bytes| Event::Log(LogEvent::from(bytes)),
118            valid_cases(LogNamespace::Legacy),
119        );
120    }
121
122    #[test]
123    fn test_parsing_invalid_legacy_namespace() {
124        trace_init();
125
126        let cases = invalid_cases();
127
128        for bytes in cases {
129            let mut parser = Parser::new(LogNamespace::Legacy);
130            let input = LogEvent::from(bytes);
131            let mut output = OutputBuffer::default();
132            parser.transform(&mut output, input.into());
133
134            assert!(output.is_empty(), "Expected no events: {output:?}");
135        }
136    }
137
138    #[test]
139    fn test_parsing_invalid_non_standard_events() {
140        trace_init();
141
142        let cases = vec![
143            // No `message` field.
144            (LogEvent::default(), LogNamespace::Legacy),
145            // Non-bytes `message` field.
146            (LogEvent::from(value!(123)), LogNamespace::Vector),
147            (
148                {
149                    let mut input = LogEvent::default();
150                    input.insert(event_path!("message"), 123);
151                    input
152                },
153                LogNamespace::Legacy,
154            ),
155        ];
156
157        for (input, log_namespace) in cases {
158            let mut parser = Parser::new(log_namespace);
159            let mut output = OutputBuffer::default();
160            parser.transform(&mut output, input.into());
161
162            assert!(output.is_empty(), "Expected no events: {output:?}");
163        }
164    }
165}