vector/sources/kubernetes_logs/
partial_events_merger.rs

1#![deny(missing_docs)]
2
3use bytes::BytesMut;
4use futures::{Stream, StreamExt};
5use std::collections::HashMap;
6use std::time::{Duration, Instant};
7use vector_lib::config::LogNamespace;
8use vector_lib::lookup::OwnedTargetPath;
9use vector_lib::stream::expiration_map::{map_with_expiration, Emitter};
10use vrl::owned_value_path;
11
12use crate::event;
13use crate::event::{Event, LogEvent, Value};
14use crate::internal_events::KubernetesMergedLineTooBigError;
15use crate::sources::kubernetes_logs::transform_utils::get_message_path;
16
17/// The key we use for `file` field.
18const FILE_KEY: &str = "file";
19
20const EXPIRATION_TIME: Duration = Duration::from_secs(30);
21
22struct PartialEventMergeState {
23    buckets: HashMap<String, Bucket>,
24    maybe_max_merged_line_bytes: Option<usize>,
25}
26
27impl PartialEventMergeState {
28    fn add_event(
29        &mut self,
30        event: LogEvent,
31        file: &str,
32        message_path: &OwnedTargetPath,
33        expiration_time: Duration,
34    ) {
35        let mut bytes_mut = BytesMut::new();
36        if let Some(bucket) = self.buckets.get_mut(file) {
37            // don't bother continuing to process new partial events that match existing ones that are already too big
38            if bucket.exceeds_max_merged_line_limit {
39                return;
40            }
41
42            // merging with existing event
43
44            if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) =
45                (bucket.event.get_mut(message_path), event.get(message_path))
46            {
47                bytes_mut.extend_from_slice(prev_value);
48                bytes_mut.extend_from_slice(new_value);
49
50                // drop event if it's bigger than max allowed
51                if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
52                    if bytes_mut.len() > max_merged_line_bytes {
53                        bucket.exceeds_max_merged_line_limit = true;
54                        // perf impact of clone should be minimal since being here means no further processing of this event will occur
55                        emit!(KubernetesMergedLineTooBigError {
56                            event: &Value::Bytes(new_value.clone()),
57                            configured_limit: max_merged_line_bytes,
58                            encountered_size_so_far: bytes_mut.len()
59                        });
60                    }
61                }
62
63                *prev_value = bytes_mut.freeze();
64            }
65        } else {
66            // new event
67
68            let mut exceeds_max_merged_line_limit = false;
69
70            if let Some(Value::Bytes(event_bytes)) = event.get(message_path) {
71                bytes_mut.extend_from_slice(event_bytes);
72                if let Some(max_merged_line_bytes) = self.maybe_max_merged_line_bytes {
73                    exceeds_max_merged_line_limit = bytes_mut.len() > max_merged_line_bytes;
74
75                    if exceeds_max_merged_line_limit {
76                        // perf impact of clone should be minimal since being here means no further processing of this event will occur
77                        emit!(KubernetesMergedLineTooBigError {
78                            event: &Value::Bytes(event_bytes.clone()),
79                            configured_limit: max_merged_line_bytes,
80                            encountered_size_so_far: bytes_mut.len()
81                        });
82                    }
83                }
84            }
85
86            self.buckets.insert(
87                file.to_owned(),
88                Bucket {
89                    event,
90                    expiration: Instant::now() + expiration_time,
91                    exceeds_max_merged_line_limit,
92                },
93            );
94        }
95    }
96
97    fn remove_event(&mut self, file: &str) -> Option<LogEvent> {
98        self.buckets
99            .remove(file)
100            .filter(|bucket| !bucket.exceeds_max_merged_line_limit)
101            .map(|bucket| bucket.event)
102    }
103
104    fn emit_expired_events(&mut self, emitter: &mut Emitter<LogEvent>) {
105        let now = Instant::now();
106        self.buckets.retain(|_key, bucket| {
107            let expired = now >= bucket.expiration;
108            if expired && !bucket.exceeds_max_merged_line_limit {
109                emitter.emit(bucket.event.clone());
110            }
111            !expired
112        });
113    }
114
115    fn flush_events(&mut self, emitter: &mut Emitter<LogEvent>) {
116        for (_, bucket) in self.buckets.drain() {
117            if !bucket.exceeds_max_merged_line_limit {
118                emitter.emit(bucket.event);
119            }
120        }
121    }
122}
123
124struct Bucket {
125    event: LogEvent,
126    expiration: Instant,
127    exceeds_max_merged_line_limit: bool,
128}
129
130pub fn merge_partial_events(
131    stream: impl Stream<Item = Event> + 'static,
132    log_namespace: LogNamespace,
133    maybe_max_merged_line_bytes: Option<usize>,
134) -> impl Stream<Item = Event> {
135    merge_partial_events_with_custom_expiration(
136        stream,
137        log_namespace,
138        EXPIRATION_TIME,
139        maybe_max_merged_line_bytes,
140    )
141}
142
143// internal function that allows customizing the expiration time (for testing)
144fn merge_partial_events_with_custom_expiration(
145    stream: impl Stream<Item = Event> + 'static,
146    log_namespace: LogNamespace,
147    expiration_time: Duration,
148    maybe_max_merged_line_bytes: Option<usize>,
149) -> impl Stream<Item = Event> {
150    let partial_flag_path = match log_namespace {
151        LogNamespace::Vector => {
152            OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, event::PARTIAL))
153        }
154        LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(event::PARTIAL)),
155    };
156
157    let file_path = match log_namespace {
158        LogNamespace::Vector => {
159            OwnedTargetPath::metadata(owned_value_path!(super::Config::NAME, FILE_KEY))
160        }
161        LogNamespace::Legacy => OwnedTargetPath::event(owned_value_path!(FILE_KEY)),
162    };
163
164    let state = PartialEventMergeState {
165        buckets: HashMap::new(),
166        maybe_max_merged_line_bytes,
167    };
168
169    let message_path = get_message_path(log_namespace);
170
171    map_with_expiration(
172        state,
173        stream.map(|e| e.into_log()),
174        Duration::from_secs(1),
175        move |state: &mut PartialEventMergeState,
176              event: LogEvent,
177              emitter: &mut Emitter<LogEvent>| {
178            // called for each event
179            let is_partial = event
180                .get(&partial_flag_path)
181                .and_then(|x| x.as_boolean())
182                .unwrap_or(false);
183
184            let file = event
185                .get(&file_path)
186                .and_then(|x| x.as_str())
187                .map(|x| x.to_string())
188                .unwrap_or_default();
189
190            state.add_event(event, &file, &message_path, expiration_time);
191            if !is_partial {
192                if let Some(log_event) = state.remove_event(&file) {
193                    emitter.emit(log_event);
194                }
195            }
196        },
197        |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
198            // check for expired events
199            state.emit_expired_events(emitter)
200        },
201        |state: &mut PartialEventMergeState, emitter: &mut Emitter<LogEvent>| {
202            // the source is ending, flush all pending events
203            state.flush_events(emitter);
204        },
205    )
206    // LogEvent -> Event
207    .map(|e| e.into())
208}
209
210#[cfg(test)]
211mod test {
212    use super::*;
213    use vector_lib::event::LogEvent;
214    use vrl::value;
215
216    #[tokio::test]
217    async fn merge_single_event_legacy() {
218        let mut e_1 = LogEvent::from("test message 1");
219        e_1.insert("foo", 1);
220
221        let input_stream = futures::stream::iter([e_1.into()]);
222        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
223
224        let output: Vec<Event> = output_stream.collect().await;
225        assert_eq!(output.len(), 1);
226        assert_eq!(
227            output[0].as_log().get(".message"),
228            Some(&value!("test message 1"))
229        );
230    }
231
232    #[tokio::test]
233    async fn merge_single_event_legacy_exceeds_max_merged_line_limit() {
234        let mut e_1 = LogEvent::from("test message 1");
235        e_1.insert("foo", 1);
236
237        let input_stream = futures::stream::iter([e_1.into()]);
238        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(1));
239
240        let output: Vec<Event> = output_stream.collect().await;
241        assert_eq!(output.len(), 0);
242    }
243
244    #[tokio::test]
245    async fn merge_multiple_events_legacy() {
246        let mut e_1 = LogEvent::from("test message 1");
247        e_1.insert("foo", 1);
248        e_1.insert("_partial", true);
249
250        let mut e_2 = LogEvent::from("test message 2");
251        e_2.insert("foo2", 1);
252
253        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
254        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
255
256        let output: Vec<Event> = output_stream.collect().await;
257        assert_eq!(output.len(), 1);
258        assert_eq!(
259            output[0].as_log().get(".message"),
260            Some(&value!("test message 1test message 2"))
261        );
262    }
263
264    #[tokio::test]
265    async fn merge_multiple_events_legacy_exceeds_max_merged_line_limit() {
266        let mut e_1 = LogEvent::from("test message 1");
267        e_1.insert("foo", 1);
268        e_1.insert("_partial", true);
269
270        let mut e_2 = LogEvent::from("test message 2");
271        e_2.insert("foo2", 1);
272
273        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
274        // 24 > length of first message but less than the two combined
275        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
276
277        let output: Vec<Event> = output_stream.collect().await;
278        assert_eq!(output.len(), 0);
279    }
280
281    #[tokio::test]
282    async fn multiple_events_flush_legacy() {
283        let mut e_1 = LogEvent::from("test message 1");
284        e_1.insert("foo", 1);
285        e_1.insert("_partial", true);
286
287        let mut e_2 = LogEvent::from("test message 2");
288        e_2.insert("foo2", 1);
289        e_1.insert("_partial", true);
290
291        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
292        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, None);
293
294        let output: Vec<Event> = output_stream.collect().await;
295        assert_eq!(output.len(), 1);
296        assert_eq!(
297            output[0].as_log().get(".message"),
298            Some(&value!("test message 1test message 2"))
299        );
300    }
301
302    #[tokio::test]
303    async fn multiple_events_flush_legacy_exceeds_max_merged_line_limit() {
304        let mut e_1 = LogEvent::from("test message 1");
305        e_1.insert("foo", 1);
306        e_1.insert("_partial", true);
307
308        let mut e_2 = LogEvent::from("test message 2");
309        e_2.insert("foo2", 1);
310        e_1.insert("_partial", true);
311
312        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
313        // 24 > length of first message but less than the two combined
314        let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, Some(24));
315
316        let output: Vec<Event> = output_stream.collect().await;
317        assert_eq!(output.len(), 0);
318    }
319
320    #[tokio::test]
321    async fn multiple_events_expire_legacy() {
322        let mut e_1 = LogEvent::from("test message");
323        e_1.insert(FILE_KEY, "foo1");
324        e_1.insert("_partial", true);
325
326        let mut e_2 = LogEvent::from("test message");
327        e_2.insert(FILE_KEY, "foo2");
328        e_1.insert("_partial", true);
329
330        // and input stream that never ends
331        let input_stream =
332            futures::stream::iter([e_1.into(), e_2.into()]).chain(futures::stream::pending());
333
334        let output_stream = merge_partial_events_with_custom_expiration(
335            input_stream,
336            LogNamespace::Legacy,
337            Duration::from_secs(1),
338            None,
339        );
340
341        let output: Vec<Event> = output_stream.take(2).collect().await;
342        assert_eq!(output.len(), 2);
343        assert_eq!(
344            output[0].as_log().get(".message"),
345            Some(&value!("test message"))
346        );
347        assert_eq!(
348            output[1].as_log().get(".message"),
349            Some(&value!("test message"))
350        );
351    }
352
353    #[tokio::test]
354    async fn merge_single_event_vector_namespace() {
355        let mut e_1 = LogEvent::from(value!("test message 1"));
356        e_1.insert(
357            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
358            "foo1",
359        );
360
361        let input_stream = futures::stream::iter([e_1.into()]);
362        let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
363
364        let output: Vec<Event> = output_stream.collect().await;
365        assert_eq!(output.len(), 1);
366        assert_eq!(output[0].as_log().get("."), Some(&value!("test message 1")));
367        assert_eq!(
368            output[0].as_log().get("%kubernetes_logs.file"),
369            Some(&value!("foo1"))
370        );
371    }
372
373    #[tokio::test]
374    async fn merge_multiple_events_vector_namespace() {
375        let mut e_1 = LogEvent::from(value!("test message 1"));
376        e_1.insert(
377            vrl::metadata_path!(super::super::Config::NAME, "_partial"),
378            true,
379        );
380        e_1.insert(
381            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
382            "foo1",
383        );
384
385        let mut e_2 = LogEvent::from(value!("test message 2"));
386        e_2.insert(
387            vrl::metadata_path!(super::super::Config::NAME, FILE_KEY),
388            "foo1",
389        );
390
391        let input_stream = futures::stream::iter([e_1.into(), e_2.into()]);
392        let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, None);
393
394        let output: Vec<Event> = output_stream.collect().await;
395        assert_eq!(output.len(), 1);
396        assert_eq!(
397            output[0].as_log().get("."),
398            Some(&value!("test message 1test message 2"))
399        );
400        assert_eq!(
401            output[0].as_log().get("%kubernetes_logs.file"),
402            Some(&value!("foo1"))
403        );
404    }
405}