vector/sources/kubernetes_logs/parser/
mod.rs1mod cri;
2mod docker;
3mod test_util;
4
5use vector_lib::config::LogNamespace;
6
7use crate::{
8 event::{Event, Value},
9 internal_events::KubernetesLogsFormatPickerEdgeCase,
10 sources::kubernetes_logs::transform_utils::get_message_path,
11 transforms::{FunctionTransform, OutputBuffer},
12};
13
14#[derive(Clone, Debug)]
15enum ParserState {
16 Uninitialized,
18
19 Docker(docker::Docker),
21
22 Cri(cri::Cri),
24}
25
26#[derive(Clone, Debug)]
27pub struct Parser {
28 state: ParserState,
29 log_namespace: LogNamespace,
30}
31
32impl Parser {
33 pub const fn new(log_namespace: LogNamespace) -> Self {
34 Self {
35 state: ParserState::Uninitialized,
36 log_namespace,
37 }
38 }
39}
40
41impl FunctionTransform for Parser {
42 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
43 match &mut self.state {
44 ParserState::Uninitialized => {
45 let message_field = get_message_path(self.log_namespace);
46 let message = match event.as_log().get(&message_field) {
47 Some(message) => message,
48 None => {
49 emit!(KubernetesLogsFormatPickerEdgeCase {
50 what: "got an event without a message"
51 });
52 return;
53 }
54 };
55
56 let bytes = match message {
57 Value::Bytes(bytes) => bytes,
58 _ => {
59 emit!(KubernetesLogsFormatPickerEdgeCase {
60 what: "got an event with non-bytes message"
61 });
62 return;
63 }
64 };
65
66 self.state = if bytes.len() > 1 && bytes[0] == b'{' {
67 ParserState::Docker(docker::Docker::new(self.log_namespace))
68 } else {
69 ParserState::Cri(cri::Cri::new(self.log_namespace))
70 };
71 self.transform(output, event)
72 }
73 ParserState::Docker(t) => t.transform(output, event),
74 ParserState::Cri(t) => t.transform(output, event),
75 }
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use bytes::Bytes;
82 use vector_lib::lookup::event_path;
83 use vrl::value;
84
85 use super::*;
86 use crate::{
87 event::{Event, LogEvent},
88 test_util::trace_init,
89 };
90
91 fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
93 let mut valid_cases = vec![];
94 valid_cases.extend(docker::tests::valid_cases(log_namespace));
95 valid_cases.extend(cri::tests::valid_cases(log_namespace));
96 valid_cases
97 }
98
99 fn invalid_cases() -> Vec<Bytes> {
100 let mut invalid_cases = vec![];
101 invalid_cases.extend(docker::tests::invalid_cases());
102 invalid_cases
103 }
104
105 #[test]
106 fn test_parsing_valid_vector_namespace() {
107 trace_init();
108 test_util::test_parser(
109 || Parser::new(LogNamespace::Vector),
110 |bytes| Event::Log(LogEvent::from(value!(bytes))),
111 valid_cases(LogNamespace::Vector),
112 );
113 }
114
115 #[test]
116 fn test_parsing_valid_legacy_namespace() {
117 trace_init();
118 test_util::test_parser(
119 || Parser::new(LogNamespace::Legacy),
120 |bytes| Event::Log(LogEvent::from(bytes)),
121 valid_cases(LogNamespace::Legacy),
122 );
123 }
124
125 #[test]
126 fn test_parsing_invalid_legacy_namespace() {
127 trace_init();
128
129 let cases = invalid_cases();
130
131 for bytes in cases {
132 let mut parser = Parser::new(LogNamespace::Legacy);
133 let input = LogEvent::from(bytes);
134 let mut output = OutputBuffer::default();
135 parser.transform(&mut output, input.into());
136
137 assert!(output.is_empty(), "Expected no events: {output:?}");
138 }
139 }
140
141 #[test]
142 fn test_parsing_invalid_non_standard_events() {
143 trace_init();
144
145 let cases = vec![
146 (LogEvent::default(), LogNamespace::Legacy),
148 (LogEvent::from(value!(123)), LogNamespace::Vector),
150 (
151 {
152 let mut input = LogEvent::default();
153 input.insert(event_path!("message"), 123);
154 input
155 },
156 LogNamespace::Legacy,
157 ),
158 ];
159
160 for (input, log_namespace) in cases {
161 let mut parser = Parser::new(log_namespace);
162 let mut output = OutputBuffer::default();
163 parser.transform(&mut output, input.into());
164
165 assert!(output.is_empty(), "Expected no events: {output:?}");
166 }
167 }
168}