vector/transforms/window/
transform.rs

1use std::collections::VecDeque;
2
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6    conditions::Condition,
7    event::Event,
8    internal_events::WindowEventsDropped,
9    transforms::{FunctionTransform, OutputBuffer},
10};
11
12#[derive(Clone)]
13pub struct Window {
14    // Configuration parameters
15    forward_when: Option<Condition>,
16    flush_when: Condition,
17    num_events_before: usize,
18    num_events_after: usize,
19
20    // Internal variables
21    buffer: VecDeque<Event>,
22    events_counter: usize,
23    events_dropped: Registered<WindowEventsDropped>,
24    is_flushing: bool,
25}
26
27impl Window {
28    pub fn new(
29        forward_when: Option<Condition>,
30        flush_when: Condition,
31        num_events_before: usize,
32        num_events_after: usize,
33    ) -> crate::Result<Self> {
34        let buffer = VecDeque::with_capacity(num_events_before);
35
36        Ok(Window {
37            forward_when,
38            flush_when,
39            num_events_before,
40            num_events_after,
41            events_dropped: register!(WindowEventsDropped),
42            buffer,
43            events_counter: 0,
44            is_flushing: false,
45        })
46    }
47}
48
49impl FunctionTransform for Window {
50    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
51        let (pass, event) = match self.forward_when.as_ref() {
52            Some(condition) => {
53                let (result, event) = condition.check(event);
54                (result, event)
55            }
56            _ => (false, event),
57        };
58
59        let (flush, event) = self.flush_when.check(event);
60
61        if self.buffer.capacity() < self.num_events_before {
62            self.buffer.reserve(self.num_events_before);
63        }
64
65        if pass {
66            output.push(event);
67        } else if flush {
68            if self.num_events_before > 0 {
69                self.buffer.drain(..).for_each(|evt| output.push(evt));
70            }
71
72            self.events_counter = 0;
73            self.is_flushing = true;
74            output.push(event);
75        } else if self.is_flushing {
76            self.events_counter += 1;
77
78            if self.events_counter > self.num_events_after {
79                self.events_counter = 0;
80                self.is_flushing = false;
81                self.events_dropped.emit(Count(1));
82            } else {
83                output.push(event);
84            }
85        } else if self.buffer.len() >= self.num_events_before {
86            self.buffer.pop_front();
87            self.buffer.push_back(event);
88            self.events_dropped.emit(Count(1));
89        } else if self.num_events_before > 0 {
90            self.buffer.push_back(event);
91        } else {
92            self.events_dropped.emit(Count(1));
93        }
94    }
95}
96
97#[cfg(test)]
98mod test {
99    use std::ops::RangeInclusive;
100
101    use tokio::sync::{
102        mpsc,
103        mpsc::{Receiver, Sender},
104    };
105    use tokio_stream::wrappers::ReceiverStream;
106    use vrl::core::Value;
107
108    use crate::{
109        conditions::{AnyCondition, ConditionConfig, VrlConfig},
110        event::{Event, LogEvent},
111        test_util::components::assert_transform_compliance,
112        transforms::{test::create_topology, window::config::WindowConfig},
113    };
114
115    #[tokio::test]
116    async fn test_flush() {
117        assert_transform_compliance(async {
118            let flush_when = get_condition("flush");
119            let transform_config = get_transform_config(flush_when, None, 1, 0);
120
121            let (tx, rx) = mpsc::channel(1);
122            let (topology, mut out) =
123                create_topology(ReceiverStream::new(rx), transform_config).await;
124
125            send_event(&tx, "flush").await;
126            assert_event("flush", out.recv().await).await;
127
128            drop(tx);
129            topology.stop().await;
130
131            assert_eq!(out.recv().await, None);
132        })
133        .await;
134    }
135
136    #[tokio::test]
137    async fn test_pass() {
138        assert_transform_compliance(async {
139            let flush_when = get_condition("flush");
140            let forward_when = get_condition("forward");
141            let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0);
142
143            let (tx, rx) = mpsc::channel(1);
144            let (topology, mut out) =
145                create_topology(ReceiverStream::new(rx), transform_config).await;
146
147            send_event(&tx, "forward").await;
148            assert_event("forward", out.recv().await).await;
149
150            drop(tx);
151            topology.stop().await;
152
153            assert_eq!(out.recv().await, None);
154        })
155        .await;
156    }
157
158    #[tokio::test]
159    async fn test_10_in_50() {
160        assert_transform_compliance(async {
161            let flush_when = get_condition("flush");
162            let transform_config = get_transform_config(flush_when, None, 50, 0);
163
164            let (tx, rx) = mpsc::channel(1);
165            let (topology, mut out) =
166                create_topology(ReceiverStream::new(rx), transform_config).await;
167
168            send_events(&tx, generate_events(1..=10)).await;
169            send_event(&tx, "flush").await;
170
171            let mut expected: [&str; 11] = [
172                "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush",
173            ];
174
175            assert_events(&mut expected, &mut out).await;
176
177            drop(tx);
178            topology.stop().await;
179
180            assert_eq!(out.recv().await, None);
181        })
182        .await;
183    }
184
185    #[tokio::test]
186    async fn test_50_in_10() {
187        assert_transform_compliance(async {
188            let flush_when = get_condition("flush");
189            let transform_config = get_transform_config(flush_when, None, 10, 0);
190
191            let (tx, rx) = mpsc::channel(1);
192            let (topology, mut out) =
193                create_topology(ReceiverStream::new(rx), transform_config).await;
194
195            send_events(&tx, generate_events(1..=50)).await;
196            send_event(&tx, "flush").await;
197
198            let mut expected: [&str; 11] = [
199                "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
200            ];
201
202            assert_events(&mut expected, &mut out).await;
203
204            drop(tx);
205            topology.stop().await;
206
207            assert_eq!(out.recv().await, None);
208        })
209        .await;
210    }
211
212    #[tokio::test]
213    async fn test_before_and_after() {
214        assert_transform_compliance(async {
215            let flush_when = get_condition("flush");
216            let transform_config = get_transform_config(flush_when, None, 10, 5);
217
218            let (tx, rx) = mpsc::channel(1);
219            let (topology, mut out) =
220                create_topology(ReceiverStream::new(rx), transform_config).await;
221
222            send_events(&tx, generate_events(1..=50)).await;
223            send_event(&tx, "flush").await;
224            send_events(&tx, generate_events(51..=70)).await;
225
226            let mut expected: [&str; 16] = [
227                "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
228                "A51", "A52", "A53", "A54", "A55",
229            ];
230
231            assert_events(&mut expected, &mut out).await;
232
233            drop(tx);
234            topology.stop().await;
235
236            assert_eq!(out.recv().await, None);
237        })
238        .await;
239    }
240
241    #[tokio::test]
242    async fn test_flush_and_pass() {
243        assert_transform_compliance(async {
244            let flush_when = get_condition("flush");
245            let forward_when = get_condition("forward");
246            let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5);
247
248            let (tx, rx) = mpsc::channel(1);
249            let (topology, mut out) =
250                create_topology(ReceiverStream::new(rx), transform_config).await;
251
252            send_events(&tx, generate_events(1..=5)).await;
253            send_event(&tx, "forward").await;
254            send_events(&tx, generate_events(6..=10)).await;
255            send_event(&tx, "forward").await;
256            send_event(&tx, "flush").await;
257            send_event(&tx, "forward").await;
258            send_events(&tx, generate_events(11..=15)).await;
259            send_event(&tx, "forward").await;
260            send_events(&tx, generate_events(16..=20)).await;
261
262            let mut expected: [&str; 20] = [
263                "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08",
264                "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward",
265            ];
266
267            assert_events(&mut expected, &mut out).await;
268
269            drop(tx);
270            topology.stop().await;
271
272            assert_eq!(out.recv().await, None);
273        })
274        .await;
275    }
276
277    #[tokio::test]
278    async fn test_zero_before() {
279        assert_transform_compliance(async {
280            let flush_when = get_condition("flush");
281            let transform_config = get_transform_config(flush_when, None, 0, 5);
282
283            let (tx, rx) = mpsc::channel(1);
284            let (topology, mut out) =
285                create_topology(ReceiverStream::new(rx), transform_config).await;
286
287            send_events(&tx, generate_events(1..=50)).await;
288            send_event(&tx, "flush").await;
289            send_events(&tx, generate_events(51..=70)).await;
290
291            let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"];
292            assert_events(&mut expected, &mut out).await;
293
294            drop(tx);
295            topology.stop().await;
296
297            assert_eq!(out.recv().await, None);
298        })
299        .await;
300    }
301
302    #[tokio::test]
303    async fn test_zero_flush() {
304        assert_transform_compliance(async {
305            let flush_when = get_condition("flush");
306            let transform_config = get_transform_config(flush_when, None, 0, 0);
307
308            let (tx, rx) = mpsc::channel(1);
309            let (topology, mut out) =
310                create_topology(ReceiverStream::new(rx), transform_config).await;
311
312            send_events(&tx, generate_events(1..=50)).await;
313            send_event(&tx, "flush").await;
314            send_events(&tx, generate_events(51..=70)).await;
315
316            let mut expected: [&str; 1] = ["flush"];
317            assert_events(&mut expected, &mut out).await;
318
319            drop(tx);
320            topology.stop().await;
321
322            assert_eq!(out.recv().await, None);
323        })
324        .await;
325    }
326
327    #[tokio::test]
328    async fn test_zero_pass() {
329        assert_transform_compliance(async {
330            let flush_when = get_condition("flush");
331            let forward_when = get_condition("forward");
332            let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0);
333
334            let (tx, rx) = mpsc::channel(1);
335            let (topology, mut out) =
336                create_topology(ReceiverStream::new(rx), transform_config).await;
337
338            let events = generate_events(1..=50);
339            let more_events = generate_events(51..=70);
340
341            send_events(&tx, events).await;
342            send_event(&tx, "forward").await;
343            send_event(&tx, "flush").await;
344            send_events(&tx, more_events).await;
345
346            let mut expected: [&str; 2] = ["forward", "flush"];
347            assert_events(&mut expected, &mut out).await;
348
349            drop(tx);
350            topology.stop().await;
351
352            assert_eq!(out.recv().await, None);
353        })
354        .await;
355    }
356
357    const fn get_transform_config(
358        flush_when: AnyCondition,
359        forward_when: Option<AnyCondition>,
360        num_events_before: usize,
361        num_events_after: usize,
362    ) -> WindowConfig {
363        WindowConfig {
364            flush_when,
365            forward_when,
366            num_events_before,
367            num_events_after,
368        }
369    }
370
371    fn get_condition(message: &str) -> AnyCondition {
372        AnyCondition::from(ConditionConfig::Vrl(VrlConfig {
373            source: format!(r#".message == "{message}""#),
374            runtime: Default::default(),
375        }))
376    }
377
378    fn generate_events(range: RangeInclusive<i32>) -> Vec<Event> {
379        range
380            .map(|n| format!("A{n:02}"))
381            .map(|m| Event::from(LogEvent::from(m)))
382            .collect::<Vec<Event>>()
383    }
384
385    async fn send_events(tx: &Sender<Event>, events: Vec<Event>) {
386        for event in events {
387            tx.send(event).await.unwrap();
388        }
389    }
390
391    async fn send_event(tx: &Sender<Event>, message: &str) {
392        tx.send(Event::from(LogEvent::from(message))).await.unwrap();
393    }
394
395    async fn assert_event(message: &str, event: Option<Event>) {
396        assert_eq!(
397            &Value::from(message),
398            event.unwrap().as_log().get("message").unwrap()
399        );
400    }
401
402    async fn assert_events(messages: &mut [&str], out: &mut Receiver<Event>) {
403        for message in messages {
404            assert_event(message, out.recv().await).await;
405        }
406    }
407}