vector/sources/kubernetes_logs/
partial_events_merger.rs1#![deny(missing_docs)]
2
3use std::{
4 collections::HashMap,
5 time::{Duration, Instant},
6};
7
8use bytes::BytesMut;
9use futures::{Stream, StreamExt};
10use vector_lib::{
11 config::LogNamespace,
12 lookup::OwnedTargetPath,
13 stream::expiration_map::{Emitter, map_with_expiration},
14};
15use vrl::owned_value_path;
16
17use crate::{
18 event,
19 event::{Event, LogEvent, Value},
20 internal_events::KubernetesMergedLineTooBigError,
21 sources::kubernetes_logs::transform_utils::get_message_path,
22};
23
24const FILE_KEY: &str = "file";
26
27const EXPIRATION_TIME: Duration = Duration::from_secs(30);
28
29struct PartialEventMergeState {
30 buckets: HashMap<String, Bucket>,
31 maybe_max_merged_line_bytes: Option<usize>,
32}
33
34impl PartialEventMergeState {
35 fn add_event(
36 &mut self,
37 event: LogEvent,
38 file: &str,
39 message_path: &OwnedTargetPath,
40 expiration_time: Duration,
41 ) {
42 let mut bytes_mut = BytesMut::new();
43 if let Some(bucket) = self.buckets.get_mut(file) {
44 if bucket.exceeds_max_merged_line_limit {
46 return;
47 }
48
49 if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) =
52 (bucket.event.get_mut(message_path), event.get(message_path))
53 {
54 bytes_mut.extend_from_slice(prev_value);
55 bytes_mut.extend_from_slice(new_value);
56
57 if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes
59 && bytes_mut.len() > max_merged_line_bytes
60 {
61 bucket.exceeds_max_merged_line_limit = true;
62 emit!(KubernetesMergedLineTooBigError {
64 event: &Value::Bytes(new_value.clone()),
65 configured_limit: max_merged_line_bytes,
66 encountered_size_so_far: bytes_mut.len()
67 });
68 }
69
70 *prev_value = bytes_mut.freeze();
71 }
72 } else {
73 let mut exceeds_max_merged_line_limit = false;
76
77 if let Some(Value::Bytes(event_bytes)) = event.get(message_path) {
78 bytes_mut.extend_from_slice(event_bytes);
79 if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
80 exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes;
81
82 if exceeds_max_merged_line_limit {
83 emit!(KubernetesMergedLineTooBigError {
85 event: &Value::Bytes(event_bytes.clone()),
86 configured_limit: max_merged_line_bytes,
87 encountered_size_so_far: bytes_mut.len()
88 });
89 }
90 }
91 }
92
93 self.buckets.insert(
94 file.to_owned(),
95 Bucket {
96 event,
97 expiration: Instant::now() + expiration_time,
98 exceeds_max_merged_line_limit,
99 },
100 );
101 }
102 }
103
104 fn remove_event(&mut self, file: &str) -> Option<LogEvent> {
105 self.buckets
106 .remove(file)
107 .filter(|bucket| !bucket.exceeds_max_merged_line_limit)
108 .map(|bucket| bucket.event)
109 }
110
111 fn emit_expired_events(&mut self, emitter: &mut Emitter<LogEvent>) {
112 let now = Instant::now();
113 self.buckets.retain(|_key, bucket| {
114 let expired = now >= bucket.expiration;
115 if expired && !bucket.exceeds_max_merged_line_limit {
116 emitter.emit(bucket.event.clone());
117 }
118 !expired
119 });
120 }
121
122 fn flush_events(&mut self, emitter: &mut Emitter<LogEvent>) {
123 for (_, bucket) in self.buckets.drain() {
124 if !bucket.exceeds_max_merged_line_limit {
125 emitter.emit(bucket.event);
126 }
127 }
128 }
129}
130
131struct Bucket {
132 event: LogEvent,
133 expiration: Instant,
134 exceeds_max_merged_line_limit: bool,
135}
136
137pub fn merge_partial_events(
138 stream: impl Stream<Item = Event> + 'static,
139 log_namespace: LogNamespace,
140 maybe_max_merged_line_bytes: Option<usize>,
141) -> impl Stream<Item = Event> {
142 merge_partial_events_with_custom_expiration(
143 stream,
144 log_namespace,
145 EXPIRATION_TIME,
146 maybe_max_merged_line_bytes,
147 )
148}
149
150fn merge_partial_events_with_custom_expiration(
152 stream: impl Stream<Item = Event> + 'static,
153 log_namespace: LogNamespace,
154 expiration_time: Duration,
155 maybe_max_merged_line_bytes: Option<usize>,
156) -> impl Stream<Item = Event> {
157 let partial_flag_path = match log_namespace {
158 LogNamespace::Vector => {
159 OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, event::PARTIAL))
160 }
161 LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(event::PARTIAL)),
162 };
163
164 let file_path = match log_namespace {
165 LogNamespace::Vector => {
166 OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, FILE_KEY))
167 }
168 LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(FILE_KEY)),
169 };
170
171 let state = PartialEventMergeState {
172 buckets: HashMap::new(),
173 maybe_max_merged_line_bytes,
174 };
175
176 let message_path = get_message_path(log_namespace);
177
178 map_with_expiration(
179 state,
180 stream.map(|e| e.into_log()),
181 Duration::from_secs(1),
182 move |state: &mut PartialEventMergeState,
183 event: LogEvent,
184 emitter: &mut Emitter<LogEvent>| {
185 let is_partial = event
187 .get(&partial_flag_path)
188 .and_then(|x| x.as_boolean())
189 .unwrap_or(false);
190
191 let file = event
192 .get(&file_path)
193 .and_then(|x| x.as_str())
194 .map(|x| x.to_string())
195 .unwrap_or_default();
196
197 state.add_event(event, &file, &message_path, expiration_time);
198 if !is_partial && let Some(log_event) = state.remove_event(&file) {
199 emitter.emit(log_event);
200 }
201 },
202 |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
203 state.emit_expired_events(emitter)
205 },
206 |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
207 state.flush_events(emitter);
209 },
210 )
211 .map(|e| e.into())
213}
214
215#[cfg(test)]
216mod test {
217 use vector_lib::event::LogEvent;
218 use vrl::value;
219
220 use super::*;
221
222 #[tokio::test]
223 async fn merge_single_event_legacy() {
224 let mut e_1 = LogEvent::from("test message 1");
225 e_1.insert("foo", 1);
226
227 let input_stream = futures::stream::iter([e_1.into()]);
228 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
229
230 let output: Vec<Event> = output_stream.collect().await;
231 assert_eq!(output.len(), 1);
232 assert_eq!(
233 output[0].as_log().get(".message"),
234 Some(&value!("test message 1"))
235 );
236 }
237
238 #[tokio::test]
239 async fn merge_single_event_legacy_exceeds_max_merged_line_limit() {
240 let mut e_1 = LogEvent::from("test message 1");
241 e_1.insert("foo", 1);
242
243 let input_stream = futures::stream::iter([e_1.into()]);
244 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1));
245
246 let output: Vec<Event> = output_stream.collect().await;
247 assert_eq!(output.len(), 0);
248 }
249
250 #[tokio::test]
251 async fn merge_multiple_events_legacy() {
252 let mut e_1 = LogEvent::from("test message 1");
253 e_1.insert("foo", 1);
254 e_1.insert("_partial", true);
255
256 let mut e_2 = LogEvent::from("test message 2");
257 e_2.insert("foo2", 1);
258
259 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
260 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
261
262 let output: Vec<Event> = output_stream.collect().await;
263 assert_eq!(output.len(), 1);
264 assert_eq!(
265 output[0].as_log().get(".message"),
266 Some(&value!("test message 1test message 2"))
267 );
268 }
269
270 #[tokio::test]
271 async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() {
272 let mut e_1 = LogEvent::from("test message 1");
273 e_1.insert("foo", 1);
274 e_1.insert("_partial", true);
275
276 let mut e_2 = LogEvent::from("test message 2");
277 e_2.insert("foo2", 1);
278
279 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
280 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
282
283 let output: Vec<Event> = output_stream.collect().await;
284 assert_eq!(output.len(), 0);
285 }
286
287 #[tokio::test]
288 async fn multiple_events_flush_legacy() {
289 let mut e_1 = LogEvent::from("test message 1");
290 e_1.insert("foo", 1);
291 e_1.insert("_partial", true);
292
293 let mut e_2 = LogEvent::from("test message 2");
294 e_2.insert("foo2", 1);
295 e_1.insert("_partial", true);
296
297 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
298 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
299
300 let output: Vec<Event> = output_stream.collect().await;
301 assert_eq!(output.len(), 1);
302 assert_eq!(
303 output[0].as_log().get(".message"),
304 Some(&value!("test message 1test message 2"))
305 );
306 }
307
308 #[tokio::test]
309 async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() {
310 let mut e_1 = LogEvent::from("test message 1");
311 e_1.insert("foo", 1);
312 e_1.insert("_partial", true);
313
314 let mut e_2 = LogEvent::from("test message 2");
315 e_2.insert("foo2", 1);
316 e_1.insert("_partial", true);
317
318 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
319 let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
321
322 let output: Vec<Event> = output_stream.collect().await;
323 assert_eq!(output.len(), 0);
324 }
325
326 #[tokio::test]
327 async fn multiple_events_expire_legacy() {
328 let mut e_1 = LogEvent::from("test message");
329 e_1.insert(FILE_KEY, "foo1");
330 e_1.insert("_partial", true);
331
332 let mut e_2 = LogEvent::from("test message");
333 e_2.insert(FILE_KEY, "foo2");
334 e_1.insert("_partial", true);
335
336 let input_stream =
338 futures::stream::iter([e_1.into(), e_2.into()]).chain(futures::stream::pending());
339
340 let output_stream = merge_partial_events_with_custom_expiration(
341 input_stream,
342 LogNamespace::Legacy,
343 Duration::from_secs(1),
344 None,
345 );
346
347 let output: Vec<Event> = output_stream.take(2).collect().await;
348 assert_eq!(output.len(), 2);
349 assert_eq!(
350 output[0].as_log().get(".message"),
351 Some(&value!("test message"))
352 );
353 assert_eq!(
354 output[1].as_log().get(".message"),
355 Some(&value!("test message"))
356 );
357 }
358
359 #[tokio::test]
360 async fn merge_single_event_vector_namespace() {
361 let mut e_1 = LogEvent::from(value!("test message 1"));
362 e_1.insert(
363 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
364 "foo1",
365 );
366
367 let input_stream = futures::stream::iter([e_1.into()]);
368 let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
369
370 let output: Vec<Event> = output_stream.collect().await;
371 assert_eq!(output.len(), 1);
372 assert_eq!(output[0].as_log().get("."), Some(&value!("test message 1")));
373 assert_eq!(
374 output[0].as_log().get("%kubernetes_logs.file"),
375 Some(&value!("foo1"))
376 );
377 }
378
379 #[tokio::test]
380 async fn merge_multiple_events_vector_namespace() {
381 let mut e_1 = LogEvent::from(value!("test message 1"));
382 e_1.insert(
383 vrl::metadata_path!(super::super::Config::NAME, "_partial"),
384 true,
385 );
386 e_1.insert(
387 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
388 "foo1",
389 );
390
391 let mut e_2 = LogEvent::from(value!("test message 2"));
392 e_2.insert(
393 vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
394 "foo1",
395 );
396
397 let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
398 let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
399
400 let output: Vec<Event> = output_stream.collect().await;
401 assert_eq!(output.len(), 1);
402 assert_eq!(
403 output[0].as_log().get("."),
404 Some(&value!("test message 1test message 2"))
405 );
406 assert_eq!(
407 output[0].as_log().get("%kubernetes_logs.file"),
408 Some(&value!("foo1"))
409 );
410 }
411}