1use chrono::{DateTime, Utc};
2use derivative::Derivative;
3use vector_lib::{
4 config::{LegacyKey, LogNamespace, log_schema},
5 conversion,
6 lookup::path,
7};
8
9use crate::{
10 event::{self, Event, Value},
11 internal_events::{
12 DROP_EVENT, ParserConversionError, ParserMatchError, ParserMissingFieldError,
13 },
14 sources::kubernetes_logs::{Config, transform_utils::get_message_path},
15 transforms::{FunctionTransform, OutputBuffer},
16};
17
18const STREAM_KEY: &str = "stream";
19const TIMESTAMP_KEY: &str = "timestamp";
20
21#[derive(Clone, Derivative)]
32#[derivative(Debug)]
33pub(super) struct Cri {
34 log_namespace: LogNamespace,
35}
36
37impl Cri {
38 pub const fn new(log_namespace: LogNamespace) -> Self {
39 Self { log_namespace }
40 }
41}
42
43impl FunctionTransform for Cri {
44 fn transform(&mut self, output: &mut OutputBuffer, mut event: Event) {
45 let message_path = get_message_path(self.log_namespace);
46
47 let log = event.as_mut_log();
49 let value = log.remove(&message_path).map(|s| s.coerce_to_bytes());
50 match value {
51 None => {
52 emit!(ParserMissingFieldError::<DROP_EVENT> {
55 field: &message_path.to_string()
56 });
57 return;
58 }
59 Some(s) => match parse_log_line(&s) {
60 None => {
61 emit!(ParserMatchError { value: &s[..] });
62 return;
63 }
64 Some(parsed_log) => {
65 drop(log.insert(&message_path, Value::Bytes(s.slice_ref(parsed_log.message))));
73
74 if parsed_log.multiline_tag[0] == b'P' {
79 self.log_namespace.insert_source_metadata(
80 Config::NAME,
81 log,
82 Some(LegacyKey::Overwrite(path!(event::PARTIAL))),
83 path!(event::PARTIAL),
84 true,
85 );
86 }
87
88 let ds = String::from_utf8_lossy(parsed_log.timestamp);
90 match DateTime::parse_from_str(&ds, "%+") {
91 Ok(dt) =>
92 {
95 self.log_namespace.insert_source_metadata(
96 Config::NAME,
97 log,
98 log_schema().timestamp_key().map(LegacyKey::Overwrite),
99 path!(TIMESTAMP_KEY),
100 Value::Timestamp(dt.with_timezone(&Utc)),
101 )
102 }
103 Err(e) => {
104 emit!(ParserConversionError {
105 name: TIMESTAMP_KEY,
106 error: conversion::Error::TimestampParse {
107 s: ds.to_string(),
108 source: e,
109 },
110 });
111 }
112 }
113
114 self.log_namespace.insert_source_metadata(
116 Config::NAME,
117 log,
118 Some(LegacyKey::Overwrite(path!(STREAM_KEY))),
119 path!(STREAM_KEY),
120 Value::Bytes(s.slice_ref(parsed_log.stream)),
121 );
122 }
123 },
124 }
125
126 output.push(event);
127 }
128}
129
130struct ParsedLog<'a> {
131 timestamp: &'a [u8],
132 stream: &'a [u8],
133 multiline_tag: &'a [u8],
134 message: &'a [u8],
135}
136
137#[allow(clippy::trivially_copy_pass_by_ref)]
138#[inline]
139const fn is_delimiter(c: &u8) -> bool {
140 *c == b' '
141}
142
143#[inline]
147fn parse_log_line(line: &[u8]) -> Option<ParsedLog<'_>> {
148 let rest = line;
149
150 let after_timestamp_pos = rest.iter().position(is_delimiter)?;
151 let (timestamp, rest) = rest.split_at(after_timestamp_pos + 1);
152 let timestamp = timestamp.split_last()?.1; let after_stream_pos = rest.iter().position(is_delimiter)?;
155 let (stream, rest) = rest.split_at(after_stream_pos + 1);
156 let stream = stream.split_last()?.1;
157 if stream != b"stdout".as_ref() && stream != b"stderr".as_ref() {
158 return None;
159 }
160
161 let after_multiline_tag_pos = rest.iter().position(is_delimiter)?;
162 let (multiline_tag, rest) = rest.split_at(after_multiline_tag_pos + 1);
163 let multiline_tag = multiline_tag.split_last()?.1;
164 if multiline_tag != b"F".as_ref() && multiline_tag != b"P".as_ref() {
165 return None;
166 }
167
168 let has_new_line_tag = !rest.is_empty() && *rest.last()? == b'\n';
169 let message = if has_new_line_tag {
170 rest.split_last()?.1
173 } else {
174 rest
175 };
176
177 Some(ParsedLog {
178 timestamp,
179 stream,
180 multiline_tag,
181 message,
182 })
183}
184
185#[cfg(test)]
186pub mod tests {
187 use bytes::Bytes;
188 use vrl::value;
189
190 use super::{super::test_util, *};
191 use crate::{event::LogEvent, test_util::trace_init};
192
193 fn make_long_string(base: &str, len: usize) -> String {
194 base.chars().cycle().take(len).collect()
195 }
196
197 pub fn valid_cases(log_namespace: LogNamespace) -> Vec<(Bytes, Vec<Event>)> {
199 vec![
200 (
201 Bytes::from(
202 "2016-10-06T00:17:09.669794202Z stdout F The content of the log entry 1",
203 ),
204 vec![test_util::make_log_event(
205 value!("The content of the log entry 1"),
206 "2016-10-06T00:17:09.669794202Z",
207 "stdout",
208 false,
209 log_namespace,
210 )],
211 ),
212 (
213 Bytes::from("2016-10-06T00:17:09.669794202Z stdout P First line of log entry 2"),
214 vec![test_util::make_log_event(
215 value!("First line of log entry 2"),
216 "2016-10-06T00:17:09.669794202Z",
217 "stdout",
218 true,
219 log_namespace,
220 )],
221 ),
222 (
223 Bytes::from(
224 "2016-10-06T00:17:09.669794202Z stdout P Second line of the log entry 2",
225 ),
226 vec![test_util::make_log_event(
227 value!("Second line of the log entry 2"),
228 "2016-10-06T00:17:09.669794202Z",
229 "stdout",
230 true,
231 log_namespace,
232 )],
233 ),
234 (
235 Bytes::from("2016-10-06T00:17:10.113242941Z stderr F Last line of the log entry 2"),
236 vec![test_util::make_log_event(
237 value!("Last line of the log entry 2"),
238 "2016-10-06T00:17:10.113242941Z",
239 "stderr",
240 false,
241 log_namespace,
242 )],
243 ),
244 (
246 Bytes::from(
247 [
248 r"2016-10-06T00:17:10.113242941Z stdout P ",
249 make_long_string("very long message ", 16 * 1024).as_str(),
250 ]
251 .join(""),
252 ),
253 vec![test_util::make_log_event(
254 value!(make_long_string("very long message ", 16 * 1024)),
255 "2016-10-06T00:17:10.113242941Z",
256 "stdout",
257 true,
258 log_namespace,
259 )],
260 ),
261 (
262 Bytes::from(vec![
265 50, 48, 50, 49, 45, 48, 56, 45, 48, 53, 84, 49, 55, 58, 51, 53, 58, 50, 54, 46,
266 54, 52, 48, 53, 48, 55, 53, 51, 57, 90, 32, 115, 116, 100, 111, 117, 116, 32,
267 80, 32, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209,
268 128, 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209, 10,
269 ]),
270 vec![test_util::make_log_event(
271 value!(Bytes::from(vec![
272 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 32, 208, 159, 209, 128,
273 208, 184, 208, 178, 208, 181, 209, 130, 32, 208, 156, 208, 184, 209,
274 ])),
275 "2021-08-05T17:35:26.640507539Z",
276 "stdout",
277 true,
278 log_namespace,
279 )],
280 ),
281 ]
282 }
283
284 #[test]
285 fn test_parsing_valid_vector_namespace() {
286 trace_init();
287 test_util::test_parser(
288 || Cri::new(LogNamespace::Vector),
289 |bytes| Event::Log(LogEvent::from(value!(bytes))),
290 valid_cases(LogNamespace::Vector),
291 );
292 }
293
294 #[test]
295 fn test_parsing_valid_legacy_namespace() {
296 trace_init();
297 test_util::test_parser(
298 || Cri::new(LogNamespace::Legacy),
299 |bytes| Event::Log(LogEvent::from(bytes)),
300 valid_cases(LogNamespace::Legacy),
301 );
302 }
303}