vector/sources/kubernetes_logs/parser/
mod.rs1mod cri;
2mod docker;
3mod test_util;
4
5use vector_lib::config::LogNamespace;
6
7use crate::sources::kubernetes_logs::transform_utils::get_message_path;
8use crate::{
9 event::{Event, Value},
10 internal_events::KubernetesLogsFormatPickerEdgeCase,
11 transforms::{FunctionTransform, OutputBuffer},
12};
13
14#[derive(Clone, Debug)]
15enum ParserState {
16 Uninitialized,
18
19 Docker(docker::Docker),
21
22 Cri(cri::Cri),
24}
25
26#[derive(Clone, Debug)]
27pub struct Parser {
28 state: ParserState,
29 log_namespace: LogNamespace,
30}
31
32impl Parser {
33 pub const fn new(log_namespace: LogNamespace) -> Self {
34 Self {
35 state: ParserState::Uninitialized,
36 log_namespace,
37 }
38 }
39}
40
41impl FunctionTransform for Parser {
42 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
43 match &mut self.state {
44 ParserState::Uninitialized => {
45 let message_field = get_message_path(self.log_namespace);
46 let message = match event.as_log().get(&message_field) {
47 Some(message) => message,
48 None => {
49 emit!(KubernetesLogsFormatPickerEdgeCase {
50 what: "got an event without a message"
51 });
52 return;
53 }
54 };
55
56 let bytes = match message {
57 Value::Bytes(bytes) => bytes,
58 _ => {
59 emit!(KubernetesLogsFormatPickerEdgeCase {
60 what: "got an event with non-bytes message"
61 });
62 return;
63 }
64 };
65
66 self.state = if bytes.len() > 1 && bytes[0] == b'{' {
67 ParserState::Docker(docker::Docker::new(self.log_namespace))
68 } else {
69 ParserState::Cri(cri::Cri::new(self.log_namespace))
70 };
71 self.transform(output, event)
72 }
73 ParserState::Docker(t) => t.transform(output, event),
74 ParserState::Cri(t) => t.transform(output, event),
75 }
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use bytes::Bytes;
82 use vector_lib::lookup::event_path;
83 use vrl::value;
84
85 use super::*;
86 use crate::{event::Event, event::LogEvent, test_util::trace_init};
87
88 fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
90 let mut valid_cases = vec![];
91 valid_cases.extend(docker::tests::valid_cases(log_namespace));
92 valid_cases.extend(cri::tests::valid_cases(log_namespace));
93 valid_cases
94 }
95
96 fn invalid_cases() -> Vec<Bytes> {
97 let mut invalid_cases = vec![];
98 invalid_cases.extend(docker::tests::invalid_cases());
99 invalid_cases
100 }
101
102 #[test]
103 fn test_parsing_valid_vector_namespace() {
104 trace_init();
105 test_util::test_parser(
106 || Parser::new(LogNamespace::Vector),
107 |bytes| Event::Log(LogEvent::from(value!(bytes))),
108 valid_cases(LogNamespace::Vector),
109 );
110 }
111
112 #[test]
113 fn test_parsing_valid_legacy_namespace() {
114 trace_init();
115 test_util::test_parser(
116 || Parser::new(LogNamespace::Legacy),
117 |bytes| Event::Log(LogEvent::from(bytes)),
118 valid_cases(LogNamespace::Legacy),
119 );
120 }
121
122 #[test]
123 fn test_parsing_invalid_legacy_namespace() {
124 trace_init();
125
126 let cases = invalid_cases();
127
128 for bytes in cases {
129 let mut parser = Parser::new(LogNamespace::Legacy);
130 let input = LogEvent::from(bytes);
131 let mut output = OutputBuffer::default();
132 parser.transform(&mut output, input.into());
133
134 assert!(output.is_empty(), "Expected no events: {output:?}");
135 }
136 }
137
138 #[test]
139 fn test_parsing_invalid_non_standard_events() {
140 trace_init();
141
142 let cases = vec![
143 (LogEvent::default(), LogNamespace::Legacy),
145 (LogEvent::from(value!(123)), LogNamespace::Vector),
147 (
148 {
149 let mut input = LogEvent::default();
150 input.insert(event_path!("message"), 123);
151 input
152 },
153 LogNamespace::Legacy,
154 ),
155 ];
156
157 for (input, log_namespace) in cases {
158 let mut parser = Parser::new(log_namespace);
159 let mut output = OutputBuffer::default();
160 parser.transform(&mut output, input.into());
161
162 assert!(output.is_empty(), "Expected no events: {output:?}");
163 }
164 }
165}