vector/sources/kubernetes_logs/parser/
mod.rs

1mod cri;
2mod docker;
3mod test_util;
4
5use vector_lib::config::LogNamespace;
6
7use crate::{
8    event::{Event, Value},
9    internal_events::KubernetesLogsFormatPickerEdgeCase,
10    sources::kubernetes_logs::transform_utils::get_message_path,
11    transforms::{FunctionTransform, OutputBuffer},
12};
13
14#[derive(Clone, Debug)]
15enum ParserState {
16    /// Runtime has not yet been detected.
17    Uninitialized,
18
19    /// Docker runtime is being used.
20    Docker(docker::Docker),
21
22    /// CRI is being used.
23    Cri(cri::Cri),
24}
25
26#[derive(Clone, Debug)]
27pub struct Parser {
28    state: ParserState,
29    log_namespace: LogNamespace,
30}
31
32impl Parser {
33    pub const fn new(log_namespace: LogNamespace) -> Self {
34        Self {
35            state: ParserState::Uninitialized,
36            log_namespace,
37        }
38    }
39}
40
41impl FunctionTransform for Parser {
42    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
43        match &mut self.state {
44            ParserState::Uninitialized => {
45                let message_field = get_message_path(self.log_namespace);
46                let message = match event.as_log().get(&message_field) {
47                    Some(message) => message,
48                    None => {
49                        emit!(KubernetesLogsFormatPickerEdgeCase {
50                            what: "got an event without a message"
51                        });
52                        return;
53                    }
54                };
55
56                let bytes = match message {
57                    Value::Bytes(bytes) => bytes,
58                    _ => {
59                        emit!(KubernetesLogsFormatPickerEdgeCase {
60                            what: "got an event with non-bytes message"
61                        });
62                        return;
63                    }
64                };
65
66                self.state = if bytes.len() > 1 && bytes[0] == b'{' {
67                    ParserState::Docker(docker::Docker::new(self.log_namespace))
68                } else {
69                    ParserState::Cri(cri::Cri::new(self.log_namespace))
70                };
71                self.transform(output, event)
72            }
73            ParserState::Docker(t) => t.transform(output, event),
74            ParserState::Cri(t) => t.transform(output, event),
75        }
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use bytes::Bytes;
82    use vector_lib::lookup::event_path;
83    use vrl::value;
84
85    use super::*;
86    use crate::{
87        event::{Event, LogEvent},
88        test_util::trace_init,
89    };
90
91    /// Picker has to work for all test cases for underlying parsers.
92    fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
93        let mut valid_cases = vec![];
94        valid_cases.extend(docker::tests::valid_cases(log_namespace));
95        valid_cases.extend(cri::tests::valid_cases(log_namespace));
96        valid_cases
97    }
98
99    fn invalid_cases() -> Vec<Bytes> {
100        let mut invalid_cases = vec![];
101        invalid_cases.extend(docker::tests::invalid_cases());
102        invalid_cases
103    }
104
105    #[test]
106    fn test_parsing_valid_vector_namespace() {
107        trace_init();
108        test_util::test_parser(
109            || Parser::new(LogNamespace::Vector),
110            |bytes| Event::Log(LogEvent::from(value!(bytes))),
111            valid_cases(LogNamespace::Vector),
112        );
113    }
114
115    #[test]
116    fn test_parsing_valid_legacy_namespace() {
117        trace_init();
118        test_util::test_parser(
119            || Parser::new(LogNamespace::Legacy),
120            |bytes| Event::Log(LogEvent::from(bytes)),
121            valid_cases(LogNamespace::Legacy),
122        );
123    }
124
125    #[test]
126    fn test_parsing_invalid_legacy_namespace() {
127        trace_init();
128
129        let cases = invalid_cases();
130
131        for bytes in cases {
132            let mut parser = Parser::new(LogNamespace::Legacy);
133            let input = LogEvent::from(bytes);
134            let mut output = OutputBuffer::default();
135            parser.transform(&mut output, input.into());
136
137            assert!(output.is_empty(), "Expected no events: {output:?}");
138        }
139    }
140
141    #[test]
142    fn test_parsing_invalid_non_standard_events() {
143        trace_init();
144
145        let cases = vec![
146            // No `message` field.
147            (LogEvent::default(), LogNamespace::Legacy),
148            // Non-bytes `message` field.
149            (LogEvent::from(value!(123)), LogNamespace::Vector),
150            (
151                {
152                    let mut input = LogEvent::default();
153                    input.insert(event_path!("message"), 123);
154                    input
155                },
156                LogNamespace::Legacy,
157            ),
158        ];
159
160        for (input, log_namespace) in cases {
161            let mut parser = Parser::new(log_namespace);
162            let mut output = OutputBuffer::default();
163            parser.transform(&mut output, input.into());
164
165            assert!(output.is_empty(), "Expected no events: {output:?}");
166        }
167    }
168}