1use chrono::{DateTime, Utc};
2use derivative::Derivative;
3use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
4use vector_lib::conversion;
5use vector_lib::lookup::path;
6
7use crate::sources::kubernetes_logs::transform_utils::get_message_path;
8use crate::{
9 event::{self, Event, Value},
10 internal_events::{
11 ParserConversionError, ParserMatchError, ParserMissingFieldError, DROP_EVENT,
12 },
13 sources::kubernetes_logs::Config,
14 transforms::{FunctionTransform, OutputBuffer},
15};
16
17const STREAM_KEY: &str = "stream";
18const TIMESTAMP_KEY: &str = "timestamp";
19
20#[derive(Clone, Derivative)]
31#[derivative(Debug)]
32pub(super) struct Cri {
33 log_namespace: LogNamespace,
34}
35
36impl Cri {
37 pub const fn new(log_namespace: LogNamespace) -> Self {
38 Self { log_namespace }
39 }
40}
41
42impl FunctionTransform for Cri {
43 fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
44 let message_path = get_message_path(self.log_namespace);
45
46 let log = event.as_mut_log();
48 let value = log.remove(&message_path).map(|s| s.coerce_to_bytes());
49 match value {
50 None => {
51 emit!(ParserMissingFieldError::<DROP_EVENT> {
54 field: &message_path.to_string()
55 });
56 return;
57 }
58 Some(s) => match parse_log_line(&s) {
59 None => {
60 emit!(ParserMatchError { value: &s[..] });
61 return;
62 }
63 Some(parsed_log) => {
64 drop(log.insert(&message_path, Value::Bytes(s.slice_ref(parsed_log.message))));
72
73 if parsed_log.multiline_tag[0] == b'P' {
78 self.log_namespace.insert_source_metadata(
79 Config::NAME,
80 log,
81 Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
82 path!(event::PARTIAL),
83 true,
84 );
85 }
86
87 let ds = String::from_utf8_lossy(parsed_log.timestamp);
89 match DateTime::parse_from_str(&ds, "%+") {
90 Ok(dt) =>
91 {
94 self.log_namespace.insert_source_metadata(
95 Config::NAME,
96 log,
97 log_schema().timestamp_key().map(LegacyKey::Overwrite),
98 path!(TIMESTAMP_KEY),
99 Value::Timestamp(dt.with_timezone(&Utc)),
100 )
101 }
102 Err(e) => {
103 emit!(ParserConversionError {
104 name: TIMESTAMP_KEY,
105 error: conversion::Error::TimestampParse {
106 s: ds.to_string(),
107 source: e,
108 },
109 });
110 }
111 }
112
113 self.log_namespace.insert_source_metadata(
115 Config::NAME,
116 log,
117 Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
118 path!(STREAM_KEY),
119 Value::Bytes(s.slice_ref(parsed_log.stream)),
120 );
121 }
122 },
123 }
124
125 output.push(event);
126 }
127}
128
129struct ParsedLog<'a> {
130 timestamp: &'a [u8],
131 stream: &'a [u8],
132 multiline_tag: &'a [u8],
133 message: &'a [u8],
134}
135
136#[allow(clippy::trivially_copy_pass_by_ref)]
137#[inline]
138const fn is_delimiter(c: &u8) -> bool {
139 *c == b' '
140}
141
142#[inline]
146fn parse_log_line(line: &[u8]) -> Option<ParsedLog> {
147 let rest = line;
148
149 let after_timestamp_pos = rest.iter().position(is_delimiter)?;
150 let (timestamp, rest) = rest.split_at(after_timestamp_pos + 1);
151 let timestamp = timestamp.split_last()?.1; let after_stream_pos = rest.iter().position(is_delimiter)?;
154 let (stream, rest) = rest.split_at(after_stream_pos + 1);
155 let stream = stream.split_last()?.1;
156 if stream != b"stdout".as_ref() && stream != b"stderr".as_ref() {
157 return None;
158 }
159
160 let after_multiline_tag_pos = rest.iter().position(is_delimiter)?;
161 let (multiline_tag, rest) = rest.split_at(after_multiline_tag_pos + 1);
162 let multiline_tag = multiline_tag.split_last()?.1;
163 if multiline_tag != b"F".as_ref() && multiline_tag != b"P".as_ref() {
164 return None;
165 }
166
167 let has_new_line_tag = !rest.is_empty() && *rest.last()? == b'\n';
168 let message = if has_new_line_tag {
169 rest.split_last()?.1
172 } else {
173 rest
174 };
175
176 Some(ParsedLog {
177 timestamp,
178 stream,
179 multiline_tag,
180 message,
181 })
182}
183
184#[cfg(test)]
185pub mod tests {
186 use bytes::Bytes;
187
188 use super::{super::test_util, *};
189 use crate::{event::LogEvent, test_util::trace_init};
190 use vrl::value;
191
192 fn make_long_string(base: &str, len: usize) -> String {
193 base.chars().cycle().take(len).collect()
194 }
195
196 pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
198 vec![
199 (
200 Bytes::from(
201 "2016-10-06T00:17:09.669794202Z stdout F The content of the log entry 1",
202 ),
203 vec![test_util::make_log_event(
204 value!("The content of the log entry 1"),
205 "2016-10-06T00:17:09.669794202Z",
206 "stdout",
207 false,
208 log_namespace,
209 )],
210 ),
211 (
212 Bytes::from("2016-10-06T00:17:09.669794202Z stdout P First line of log entry 2"),
213 vec![test_util::make_log_event(
214 value!("First line of log entry 2"),
215 "2016-10-06T00:17:09.669794202Z",
216 "stdout",
217 true,
218 log_namespace,
219 )],
220 ),
221 (
222 Bytes::from(
223 "2016-10-06T00:17:09.669794202Z stdout P Second line of the log entry 2",
224 ),
225 vec![test_util::make_log_event(
226 value!("Second line of the log entry 2"),
227 "2016-10-06T00:17:09.669794202Z",
228 "stdout",
229 true,
230 log_namespace,
231 )],
232 ),
233 (
234 Bytes::from("2016-10-06T00:17:10.113242941Z stderr F Last line of the log entry 2"),
235 vec![test_util::make_log_event(
236 value!("Last line of the log entry 2"),
237 "2016-10-06T00:17:10.113242941Z",
238 "stderr",
239 false,
240 log_namespace,
241 )],
242 ),
243 (
245 Bytes::from(
246 [
247 r"2016-10-06T00:17:10.113242941Z stdout P ",
248 make_long_string("very long message ", 16 * 1024).as_str(),
249 ]
250 .join(""),
251 ),
252 vec![test_util::make_log_event(
253 value!(make_long_string("very long message ", 16 * 1024)),
254 "2016-10-06T00:17:10.113242941Z",
255 "stdout",
256 true,
257 log_namespace,
258 )],
259 ),
260 (
261 Bytes::from(vec![
264 50, 48, 50, 49, 45, 48, 56, 45, 48, 53, 84, 49, 55, 58, 51, 53, 58, 50, 54, 46,
265 54, 52, 48, 53, 48, 55, 53, 51, 57, 90, 32, 115, 116, 100, 111, 117, 116, 32,
266 80, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209,
267 128, 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209, 10,
268 ]),
269 vec![test_util::make_log_event(
270 value!(Bytes::from(vec![
271 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209, 128,
272 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209,
273 ])),
274 "2021-08-05T17:35:26.640507539Z",
275 "stdout",
276 true,
277 log_namespace,
278 )],
279 ),
280 ]
281 }
282
283 #[test]
284 fn test_parsing_valid_vector_namespace() {
285 trace_init();
286 test_util::test_parser(
287 || Cri::new(LogNamespace::Vector),
288 |bytes| Event::Log(LogEvent::from(value!(bytes))),
289 valid_cases(LogNamespace::Vector),
290 );
291 }
292
293 #[test]
294 fn test_parsing_valid_legacy_namespace() {
295 trace_init();
296 test_util::test_parser(
297 || Cri::new(LogNamespace::Legacy),
298 |bytes| Event::Log(LogEvent::from(bytes)),
299 valid_cases(LogNamespace::Legacy),
300 );
301 }
302}