vector/sources/kubernetes_logs/
partial_events_merger.rs

1#![deny(missing_docs)]
2
3use std::{
4    collections::HashMap,
5    time::{Duration, Instant},
6};
7
8use bytes::BytesMut;
9use futures::{Stream, StreamExt};
10use vector_lib::{
11    config::LogNamespace,
12    lookup::OwnedTargetPath,
13    stream::expiration_map::{Emitter, map_with_expiration},
14};
15use vrl::owned_value_path;
16
17use crate::{
18    event,
19    event::{Event, LogEvent, Value},
20    internal_events::KubernetesMergedLineTooBigError,
21    sources::kubernetes_logs::transform_utils::get_message_path,
22};
23
24/// The key we use for `file` field.
25const FILE_KEY: &str = "file";
26
27const EXPIRATION_TIME: Duration = Duration::from_secs(30);
28
29struct PartialEventMergeState {
30    buckets: HashMap<String, Bucket>,
31    maybe_max_merged_line_bytes: Option<usize>,
32}
33
34impl PartialEventMergeState {
35    fn add_event(
36        &mut self,
37        event: LogEvent,
38        file: &str,
39        message_path: &OwnedTargetPath,
40        expiration_time: Duration,
41    ) {
42        let mut bytes_mut = BytesMut::new();
43        if let Some(bucket) = self.buckets.get_mut(file) {
44            // don't bother continuing to process new partial events that match existing ones that are already too big
45            if bucket.exceeds_max_merged_line_limit {
46                return;
47            }
48
49            // merging with existing event
50
51            if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) =
52                (bucket.event.get_mut(message_path), event.get(message_path))
53            {
54                bytes_mut.extend_from_slice(prev_value);
55                bytes_mut.extend_from_slice(new_value);
56
57                // drop event if it's bigger than max allowed
58                if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes
59                    && bytes_mut.len() > max_merged_line_bytes
60                {
61                    bucket.exceeds_max_merged_line_limit = true;
62                    // perf impact of clone should be minimal since being here means no further processing of this event will occur
63                    emit!(KubernetesMergedLineTooBigError {
64                        event: &Value::Bytes(new_value.clone()),
65                        configured_limit: max_merged_line_bytes,
66                        encountered_size_so_far: bytes_mut.len()
67                    });
68                }
69
70                *prev_value = bytes_mut.freeze();
71            }
72        } else {
73            // new event
74
75            let mut exceeds_max_merged_line_limit = false;
76
77            if let Some(Value::Bytes(event_bytes)) = event.get(message_path) {
78                bytes_mut.extend_from_slice(event_bytes);
79                if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
80                    exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes;
81
82                    if exceeds_max_merged_line_limit {
83                        // perf impact of clone should be minimal since being here means no further processing of this event will occur
84                        emit!(KubernetesMergedLineTooBigError {
85                            event: &Value::Bytes(event_bytes.clone()),
86                            configured_limit: max_merged_line_bytes,
87                            encountered_size_so_far: bytes_mut.len()
88                        });
89                    }
90                }
91            }
92
93            self.buckets.insert(
94                file.to_owned(),
95                Bucket {
96                    event,
97                    expiration: Instant::now() + expiration_time,
98                    exceeds_max_merged_line_limit,
99                },
100            );
101        }
102    }
103
104    fn remove_event(&mut self, file: &str) -> Option<LogEvent> {
105        self.buckets
106            .remove(file)
107            .filter(|bucket| !bucket.exceeds_max_merged_line_limit)
108            .map(|bucket| bucket.event)
109    }
110
111    fn emit_expired_events(&mut self, emitter: &mut Emitter<LogEvent>) {
112        let now = Instant::now();
113        self.buckets.retain(|_key, bucket| {
114            let expired = now >= bucket.expiration;
115            if expired && !bucket.exceeds_max_merged_line_limit {
116                emitter.emit(bucket.event.clone());
117            }
118            !expired
119        });
120    }
121
122    fn flush_events(&mut self, emitter: &mut Emitter<LogEvent>) {
123        for (_, bucket) in self.buckets.drain() {
124            if !bucket.exceeds_max_merged_line_limit {
125                emitter.emit(bucket.event);
126            }
127        }
128    }
129}
130
131struct Bucket {
132    event: LogEvent,
133    expiration: Instant,
134    exceeds_max_merged_line_limit: bool,
135}
136
137pub fn merge_partial_events(
138    stream: impl Stream<Item = Event> + 'static,
139    log_namespace: LogNamespace,
140    maybe_max_merged_line_bytes: Option<usize>,
141) -> impl Stream<Item = Event> {
142    merge_partial_events_with_custom_expiration(
143        stream,
144        log_namespace,
145        EXPIRATION_TIME,
146        maybe_max_merged_line_bytes,
147    )
148}
149
150// internal function that allows customizing the expiration time (for testing)
151fn merge_partial_events_with_custom_expiration(
152    stream: impl Stream<Item = Event> + 'static,
153    log_namespace: LogNamespace,
154    expiration_time: Duration,
155    maybe_max_merged_line_bytes: Option<usize>,
156) -> impl Stream<Item = Event> {
157    let partial_flag_path = match log_namespace {
158        LogNamespace::Vector => {
159            OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, event::PARTIAL))
160        }
161        LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(event::PARTIAL)),
162    };
163
164    let file_path = match log_namespace {
165        LogNamespace::Vector => {
166            OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, FILE_KEY))
167        }
168        LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(FILE_KEY)),
169    };
170
171    let state = PartialEventMergeState {
172        buckets: HashMap::new(),
173        maybe_max_merged_line_bytes,
174    };
175
176    let message_path = get_message_path(log_namespace);
177
178    map_with_expiration(
179        state,
180        stream.map(|e| e.into_log()),
181        Duration::from_secs(1),
182        move |state: &mut PartialEventMergeState,
183              event: LogEvent,
184              emitter: &mut Emitter<LogEvent>| {
185            // called for each event
186            let is_partial = event
187                .get(&partial_flag_path)
188                .and_then(|x| x.as_boolean())
189                .unwrap_or(false);
190
191            let file = event
192                .get(&file_path)
193                .and_then(|x| x.as_str())
194                .map(|x| x.to_string())
195                .unwrap_or_default();
196
197            state.add_event(event, &file, &message_path, expiration_time);
198            if !is_partial && let Some(log_event) = state.remove_event(&file) {
199                emitter.emit(log_event);
200            }
201        },
202        |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
203            // check for expired events
204            state.emit_expired_events(emitter)
205        },
206        |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
207            // the source is ending, flush all pending events
208            state.flush_events(emitter);
209        },
210    )
211    // LogEvent -> Event
212    .map(|e| e.into())
213}
214
215#[cfg(test)]
216mod test {
217    use vector_lib::event::LogEvent;
218    use vrl::value;
219
220    use super::*;
221
222    #[tokio::test]
223    async fn merge_single_event_legacy() {
224        let mut e_1 = LogEvent::from("test message 1");
225        e_1.insert("foo", 1);
226
227        let input_stream = futures::stream::iter([e_1.into()]);
228        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
229
230        let output: Vec<Event> = output_stream.collect().await;
231        assert_eq!(output.len(), 1);
232        assert_eq!(
233            output[0].as_log().get(".message"),
234            Some(&value!("test message 1"))
235        );
236    }
237
238    #[tokio::test]
239    async fn merge_single_event_legacy_exceeds_max_merged_line_limit() {
240        let mut e_1 = LogEvent::from("test message 1");
241        e_1.insert("foo", 1);
242
243        let input_stream = futures::stream::iter([e_1.into()]);
244        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1));
245
246        let output: Vec<Event> = output_stream.collect().await;
247        assert_eq!(output.len(), 0);
248    }
249
250    #[tokio::test]
251    async fn merge_multiple_events_legacy() {
252        let mut e_1 = LogEvent::from("test message 1");
253        e_1.insert("foo", 1);
254        e_1.insert("_partial", true);
255
256        let mut e_2 = LogEvent::from("test message 2");
257        e_2.insert("foo2", 1);
258
259        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
260        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
261
262        let output: Vec<Event> = output_stream.collect().await;
263        assert_eq!(output.len(), 1);
264        assert_eq!(
265            output[0].as_log().get(".message"),
266            Some(&value!("test message 1test message 2"))
267        );
268    }
269
270    #[tokio::test]
271    async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() {
272        let mut e_1 = LogEvent::from("test message 1");
273        e_1.insert("foo", 1);
274        e_1.insert("_partial", true);
275
276        let mut e_2 = LogEvent::from("test message 2");
277        e_2.insert("foo2", 1);
278
279        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
280        // 24 > length of first message but less than the two combined
281        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
282
283        let output: Vec<Event> = output_stream.collect().await;
284        assert_eq!(output.len(), 0);
285    }
286
287    #[tokio::test]
288    async fn multiple_events_flush_legacy() {
289        let mut e_1 = LogEvent::from("test message 1");
290        e_1.insert("foo", 1);
291        e_1.insert("_partial", true);
292
293        let mut e_2 = LogEvent::from("test message 2");
294        e_2.insert("foo2", 1);
295        e_1.insert("_partial", true);
296
297        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
298        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
299
300        let output: Vec<Event> = output_stream.collect().await;
301        assert_eq!(output.len(), 1);
302        assert_eq!(
303            output[0].as_log().get(".message"),
304            Some(&value!("test message 1test message 2"))
305        );
306    }
307
308    #[tokio::test]
309    async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() {
310        let mut e_1 = LogEvent::from("test message 1");
311        e_1.insert("foo", 1);
312        e_1.insert("_partial", true);
313
314        let mut e_2 = LogEvent::from("test message 2");
315        e_2.insert("foo2", 1);
316        e_1.insert("_partial", true);
317
318        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
319        // 24 > length of first message but less than the two combined
320        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
321
322        let output: Vec<Event> = output_stream.collect().await;
323        assert_eq!(output.len(), 0);
324    }
325
326    #[tokio::test]
327    async fn multiple_events_expire_legacy() {
328        let mut e_1 = LogEvent::from("test message");
329        e_1.insert(FILE_KEY, "foo1");
330        e_1.insert("_partial", true);
331
332        let mut e_2 = LogEvent::from("test message");
333        e_2.insert(FILE_KEY, "foo2");
334        e_1.insert("_partial", true);
335
336        // and input stream that never ends
337        let input_stream =
338            futures::stream::iter([e_1.into(), e_2.into()]).chain(futures::stream::pending());
339
340        let output_stream = merge_partial_events_with_custom_expiration(
341            input_stream,
342            LogNamespace::Legacy,
343            Duration::from_secs(1),
344            None,
345        );
346
347        let output: Vec<Event> = output_stream.take(2).collect().await;
348        assert_eq!(output.len(), 2);
349        assert_eq!(
350            output[0].as_log().get(".message"),
351            Some(&value!("test message"))
352        );
353        assert_eq!(
354            output[1].as_log().get(".message"),
355            Some(&value!("test message"))
356        );
357    }
358
359    #[tokio::test]
360    async fn merge_single_event_vector_namespace() {
361        let mut e_1 = LogEvent::from(value!("test message 1"));
362        e_1.insert(
363            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
364            "foo1",
365        );
366
367        let input_stream = futures::stream::iter([e_1.into()]);
368        let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
369
370        let output: Vec<Event> = output_stream.collect().await;
371        assert_eq!(output.len(), 1);
372        assert_eq!(output[0].as_log().get("."), Some(&value!("test message 1")));
373        assert_eq!(
374            output[0].as_log().get("%kubernetes_logs.file"),
375            Some(&value!("foo1"))
376        );
377    }
378
379    #[tokio::test]
380    async fn merge_multiple_events_vector_namespace() {
381        let mut e_1 = LogEvent::from(value!("test message 1"));
382        e_1.insert(
383            vrl::metadata_path!(super::super::Config::NAME, "_partial"),
384            true,
385        );
386        e_1.insert(
387            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
388            "foo1",
389        );
390
391        let mut e_2 = LogEvent::from(value!("test message 2"));
392        e_2.insert(
393            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
394            "foo1",
395        );
396
397        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
398        let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
399
400        let output: Vec<Event> = output_stream.collect().await;
401        assert_eq!(output.len(), 1);
402        assert_eq!(
403            output[0].as_log().get("."),
404            Some(&value!("test message 1test message 2"))
405        );
406        assert_eq!(
407            output[0].as_log().get("%kubernetes_logs.file"),
408            Some(&value!("foo1"))
409        );
410    }
411}