vector/sources/kubernetes_logs/
partial_events_merger.rs1#![deny(missing_docs)]
2
3use bytes::BytesMut;
4use futures::{Stream, StreamExt};
5use std::collections::HashMap;
6use std::time::{Duration, Instant};
7use vector_lib::config::LogNamespace;
8use vector_lib::lookup::OwnedTargetPath;
9use vector_lib::stream::expiration_map::{map_with_expiration, Emitter};
10use vrl::owned_value_path;
11
12use crate::event;
13use crate::event::{Event, LogEvent, Value};
14use crate::internal_events::KubernetesMergedLineTooBigError;
15use crate::sources::kubernetes_logs::transform_utils::get_message_path;
16
17const FILE_KEY: &str = "file";
19
20const EXPIRATION_TIME: Duration = Duration::from_secs(30);
21
22struct PartialEventMergeState {
23 buckets: HashMap<String, Bucket>,
24 maybe_max_merged_line_bytes: Option<usize>,
25}
26
27impl PartialEventMergeState {
28 fn add_event(
29 &mut self,
30 event: LogEvent,
31 file: &str,
32 message_path: &OwnedTargetPath,
33 expiration_time: Duration,
34 ) {
35 let mut bytes_mut = BytesMut::new();
36 if let Some(bucket) = self.buckets.get_mut(file) {
37 if bucket.exceeds_max_merged_line_limit {
39 return;
40 }
41
42 if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) =
45 (bucket.event.get_mut(message_path), event.get(message_path))
46 {
47 bytes_mut.extend_from_slice(prev_value);
48 bytes_mut.extend_from_slice(new_value);
49
50 if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
52 if bytes_mut.len() > max_merged_line_bytes {
53 bucket.exceeds_max_merged_line_limit = true;
54 emit!(KubernetesMergedLineTooBigError {
56 event: &Value::Bytes(new_value.clone()),
57 configured_limit: max_merged_line_bytes,
58 encountered_size_so_far: bytes_mut.len()
59 });
60 }
61 }
62
63 *prev_value = bytes_mut.freeze();
64 }
65 } else {
66 let mut exceeds_max_merged_line_limit = false;
69
70 if let Some(Value::Bytes(event_bytes)) = event.get(message_path) {
71 bytes_mut.extend_from_slice(event_bytes);
72 if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
73 exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes;
74
75 if exceeds_max_merged_line_limit {
76 emit!(KubernetesMergedLineTooBigError {
78 event: &Value::Bytes(event_bytes.clone()),
79 configured_limit: max_merged_line_bytes,
80 encountered_size_so_far: bytes_mut.len()
81 });
82 }
83 }
84 }
85
86 self.buckets.insert(
87 file.to_owned(),
88 Bucket {
89 event,
90 expiration: Instant::now() + expiration_time,
91 exceeds_max_merged_line_limit,
92 },
93 );
94 }
95 }
96
97 fn remove_event(&mut self, file: &str) -> Option<LogEvent> {
98 self.buckets
99 .remove(file)
100 .filter(|bucket| !bucket.exceeds_max_merged_line_limit)
101 .map(|bucket| bucket.event)
102 }
103
104 fn emit_expired_events(&mut self, emitter: &mut Emitter<LogEvent>) {
105 let now = Instant::now();
106 self.buckets.retain(|_key, bucket| {
107 let expired = now >= bucket.expiration;
108 if expired && !bucket.exceeds_max_merged_line_limit {
109 emitter.emit(bucket.event.clone());
110 }
111 !expired
112 });
113 }
114
115 fn flush_events(&mut self, emitter: &mut Emitter<LogEvent>) {
116 for (_, bucket) in self.buckets.drain() {
117 if !bucket.exceeds_max_merged_line_limit {
118 emitter.emit(bucket.event);
119 }
120 }
121 }
122}
123
124struct Bucket {
125 event: LogEvent,
126 expiration: Instant,
127 exceeds_max_merged_line_limit: bool,
128}
129
130pub fn merge_partial_events(
131 stream: impl Stream<Item = Event> + 'static,
132 log_namespace: LogNamespace,
133 maybe_max_merged_line_bytes: Option<usize>,
134) -> impl Stream<Item = Event> {
135 merge_partial_events_with_custom_expiration(
136 stream,
137 log_namespace,
138 EXPIRATION_TIME,
139 maybe_max_merged_line_bytes,
140 )
141}
142
143fn merge_partial_events_with_custom_expiration(
145 stream: impl Stream<Item = Event> + 'static,
146 log_namespace: LogNamespace,
147 expiration_time: Duration,
148 maybe_max_merged_line_bytes: Option<usize>,
149) -> impl Stream<Item = Event> {
150 let partial_flag_path = match log_namespace {
151 LogNamespace::Vector => {
152 OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, event::PARTIAL))
153 }
154 LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(event::PARTIAL)),
155 };
156
157 let file_path = match log_namespace {
158 LogNamespace::Vector => {
159 OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, FILE_KEY))
160 }
161 LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(FILE_KEY)),
162 };
163
164 let state = PartialEventMergeState {
165 buckets: HashMap::new(),
166 maybe_max_merged_line_bytes,
167 };
168
169 let message_path = get_message_path(log_namespace);
170
171 map_with_expiration(
172 state,
173 stream.map(|e| e.into_log()),
174 Duration::from_secs(1),
175 move |state: &mut PartialEventMergeState,
176 event: LogEvent,
177 emitter: &mut Emitter<LogEvent>| {
178 let is_partial = event
180 .get(&partial_flag_path)
181 .and_then(|x| x.as_boolean())
182 .unwrap_or(false);
183
184 let file = event
185 .get(&file_path)
186 .and_then(|x| x.as_str())
187 .map(|x| x.to_string())
188 .unwrap_or_default();
189
190 state.add_event(event, &file, &message_path, expiration_time);
191 if !is_partial {
192 if let Some(log_event) = state.remove_event(&file) {
193 emitter.emit(log_event);
194 }
195 }
196 },
197 |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
198 state.emit_expired_events(emitter)
200 },
201 |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
202 state.flush_events(emitter);
204 },
205 )
206 .map(|e| e.into())
208}
209
210#[cfg(test)]
211mod test {
212 use super::*;
213 use vector_lib::event::LogEvent;
214 use vrl::value;
215
216 #[tokio::test]
217 async fn merge_single_event_legacy() {
218 let mut e_1 = LogEvent::from("test message 1");
219 e_1.insert("foo", 1);
220
221 let input_stream = futures::stream::iter([e_1.into()]);
222 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
223
224 let output: Vec<Event> = output_stream.collect().await;
225 assert_eq!(output.len(), 1);
226 assert_eq!(
227 output[0].as_log().get(".message"),
228 Some(&value!("test message 1"))
229 );
230 }
231
232 #[tokio::test]
233 async fn merge_single_event_legacy_exceeds_max_merged_line_limit() {
234 let mut e_1 = LogEvent::from("test message 1");
235 e_1.insert("foo", 1);
236
237 let input_stream = futures::stream::iter([e_1.into()]);
238 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1));
239
240 let output: Vec<Event> = output_stream.collect().await;
241 assert_eq!(output.len(), 0);
242 }
243
244 #[tokio::test]
245 async fn merge_multiple_events_legacy() {
246 let mut e_1 = LogEvent::from("test message 1");
247 e_1.insert("foo", 1);
248 e_1.insert("_partial", true);
249
250 let mut e_2 = LogEvent::from("test message 2");
251 e_2.insert("foo2", 1);
252
253 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
254 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
255
256 let output: Vec<Event> = output_stream.collect().await;
257 assert_eq!(output.len(), 1);
258 assert_eq!(
259 output[0].as_log().get(".message"),
260 Some(&value!("test message 1test message 2"))
261 );
262 }
263
264 #[tokio::test]
265 async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() {
266 let mut e_1 = LogEvent::from("test message 1");
267 e_1.insert("foo", 1);
268 e_1.insert("_partial", true);
269
270 let mut e_2 = LogEvent::from("test message 2");
271 e_2.insert("foo2", 1);
272
273 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
274 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
276
277 let output: Vec<Event> = output_stream.collect().await;
278 assert_eq!(output.len(), 0);
279 }
280
281 #[tokio::test]
282 async fn multiple_events_flush_legacy() {
283 let mut e_1 = LogEvent::from("test message 1");
284 e_1.insert("foo", 1);
285 e_1.insert("_partial", true);
286
287 let mut e_2 = LogEvent::from("test message 2");
288 e_2.insert("foo2", 1);
289 e_1.insert("_partial", true);
290
291 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
292 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
293
294 let output: Vec<Event> = output_stream.collect().await;
295 assert_eq!(output.len(), 1);
296 assert_eq!(
297 output[0].as_log().get(".message"),
298 Some(&value!("test message 1test message 2"))
299 );
300 }
301
302 #[tokio::test]
303 async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() {
304 let mut e_1 = LogEvent::from("test message 1");
305 e_1.insert("foo", 1);
306 e_1.insert("_partial", true);
307
308 let mut e_2 = LogEvent::from("test message 2");
309 e_2.insert("foo2", 1);
310 e_1.insert("_partial", true);
311
312 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
313 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
315
316 let output: Vec<Event> = output_stream.collect().await;
317 assert_eq!(output.len(), 0);
318 }
319
320 #[tokio::test]
321 async fn multiple_events_expire_legacy() {
322 let mut e_1 = LogEvent::from("test message");
323 e_1.insert(FILE_KEY, "foo1");
324 e_1.insert("_partial", true);
325
326 let mut e_2 = LogEvent::from("test message");
327 e_2.insert(FILE_KEY, "foo2");
328 e_1.insert("_partial", true);
329
330 let input_stream =
332 futures::stream::iter([e_1.into(), e_2.into()]).chain(futures::stream::pending());
333
334 let output_stream = merge_partial_events_with_custom_expiration(
335 input_stream,
336 LogNamespace::Legacy,
337 Duration::from_secs(1),
338 None,
339 );
340
341 let output: Vec<Event> = output_stream.take(2).collect().await;
342 assert_eq!(output.len(), 2);
343 assert_eq!(
344 output[0].as_log().get(".message"),
345 Some(&value!("test message"))
346 );
347 assert_eq!(
348 output[1].as_log().get(".message"),
349 Some(&value!("test message"))
350 );
351 }
352
353 #[tokio::test]
354 async fn merge_single_event_vector_namespace() {
355 let mut e_1 = LogEvent::from(value!("test message 1"));
356 e_1.insert(
357 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
358 "foo1",
359 );
360
361 let input_stream = futures::stream::iter([e_1.into()]);
362 let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
363
364 let output: Vec<Event> = output_stream.collect().await;
365 assert_eq!(output.len(), 1);
366 assert_eq!(output[0].as_log().get("."), Some(&value!("test message 1")));
367 assert_eq!(
368 output[0].as_log().get("%kubernetes_logs.file"),
369 Some(&value!("foo1"))
370 );
371 }
372
373 #[tokio::test]
374 async fn merge_multiple_events_vector_namespace() {
375 let mut e_1 = LogEvent::from(value!("test message 1"));
376 e_1.insert(
377 vrl::metadata_path!(super::super::Config::NAME, "_partial"),
378 true,
379 );
380 e_1.insert(
381 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
382 "foo1",
383 );
384
385 let mut e_2 = LogEvent::from(value!("test message 2"));
386 e_2.insert(
387 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
388 "foo1",
389 );
390
391 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
392 let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
393
394 let output: Vec<Event> = output_stream.collect().await;
395 assert_eq!(output.len(), 1);
396 assert_eq!(
397 output[0].as_log().get("."),
398 Some(&value!("test message 1test message 2"))
399 );
400 assert_eq!(
401 output[0].as_log().get("%kubernetes_logs.file"),
402 Some(&value!("foo1"))
403 );
404 }
405}