vector/transforms/window/
transform.rs

1use std::collections::VecDeque;
2
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6    conditions::Condition,
7    event::Event,
8    internal_events::WindowEventsDropped,
9    transforms::{FunctionTransform, OutputBuffer},
10};
11
12#[derive(Clone)]
13pub struct Window {
14    // Configuration parameters
15    forward_when: Option<Condition>,
16    flush_when: Condition,
17    num_events_before: usize,
18    num_events_after: usize,
19
20    // Internal variables
21    buffer: VecDeque<Event>,
22    events_counter: usize,
23    events_dropped: Registered<WindowEventsDropped>,
24    is_flushing: bool,
25}
26
27impl Window {
28    pub fn new(
29        forward_when: Option<Condition>,
30        flush_when: Condition,
31        num_events_before: usize,
32        num_events_after: usize,
33    ) -> crate::Result<Self> {
34        let buffer = VecDeque::with_capacity(num_events_before);
35
36        Ok(Window {
37            forward_when,
38            flush_when,
39            num_events_before,
40            num_events_after,
41            events_dropped: register!(WindowEventsDropped),
42            buffer,
43            events_counter: 0,
44            is_flushing: false,
45        })
46    }
47}
48
49impl FunctionTransform for Window {
50    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
51        let (pass, event) = match self.forward_when.as_ref() {
52            Some(condition) => {
53                let (result, event) = condition.check(event);
54                (result, event)
55            }
56            _ => (false, event),
57        };
58
59        let (flush, event) = self.flush_when.check(event);
60
61        if self.buffer.capacity() < self.num_events_before {
62            self.buffer.reserve(self.num_events_before);
63        }
64
65        if pass {
66            output.push(event);
67        } else if flush {
68            if self.num_events_before > 0 {
69                self.buffer.drain(..).for_each(|evt| output.push(evt));
70            }
71
72            self.events_counter = 0;
73            self.is_flushing = true;
74            output.push(event);
75        } else if self.is_flushing {
76            self.events_counter += 1;
77
78            if self.events_counter > self.num_events_after {
79                self.events_counter = 0;
80                self.is_flushing = false;
81                self.events_dropped.emit(Count(1));
82            } else {
83                output.push(event);
84            }
85        } else if self.buffer.len() >= self.num_events_before {
86            self.buffer.pop_front();
87            self.buffer.push_back(event);
88            self.events_dropped.emit(Count(1));
89        } else if self.num_events_before > 0 {
90            self.buffer.push_back(event);
91        } else {
92            self.events_dropped.emit(Count(1));
93        }
94    }
95}
96
97#[cfg(test)]
98mod test {
99    use std::ops::RangeInclusive;
100    use tokio::sync::mpsc;
101    use tokio::sync::mpsc::{Receiver, Sender};
102    use tokio_stream::wrappers::ReceiverStream;
103    use vrl::core::Value;
104
105    use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig};
106    use crate::transforms::window::config::WindowConfig;
107    use crate::{
108        event::{Event, LogEvent},
109        test_util::components::assert_transform_compliance,
110        transforms::test::create_topology,
111    };
112
113    #[tokio::test]
114    async fn test_flush() {
115        assert_transform_compliance(async {
116            let flush_when = get_condition("flush");
117            let transform_config = get_transform_config(flush_when, None, 1, 0);
118
119            let (tx, rx) = mpsc::channel(1);
120            let (topology, mut out) =
121                create_topology(ReceiverStream::new(rx), transform_config).await;
122
123            send_event(&tx, "flush").await;
124            assert_event("flush", out.recv().await).await;
125
126            drop(tx);
127            topology.stop().await;
128
129            assert_eq!(out.recv().await, None);
130        })
131        .await;
132    }
133
134    #[tokio::test]
135    async fn test_pass() {
136        assert_transform_compliance(async {
137            let flush_when = get_condition("flush");
138            let forward_when = get_condition("forward");
139            let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0);
140
141            let (tx, rx) = mpsc::channel(1);
142            let (topology, mut out) =
143                create_topology(ReceiverStream::new(rx), transform_config).await;
144
145            send_event(&tx, "forward").await;
146            assert_event("forward", out.recv().await).await;
147
148            drop(tx);
149            topology.stop().await;
150
151            assert_eq!(out.recv().await, None);
152        })
153        .await;
154    }
155
156    #[tokio::test]
157    async fn test_10_in_50() {
158        assert_transform_compliance(async {
159            let flush_when = get_condition("flush");
160            let transform_config = get_transform_config(flush_when, None, 50, 0);
161
162            let (tx, rx) = mpsc::channel(1);
163            let (topology, mut out) =
164                create_topology(ReceiverStream::new(rx), transform_config).await;
165
166            send_events(&tx, generate_events(1..=10)).await;
167            send_event(&tx, "flush").await;
168
169            let mut expected: [&str; 11] = [
170                "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush",
171            ];
172
173            assert_events(&mut expected, &mut out).await;
174
175            drop(tx);
176            topology.stop().await;
177
178            assert_eq!(out.recv().await, None);
179        })
180        .await;
181    }
182
183    #[tokio::test]
184    async fn test_50_in_10() {
185        assert_transform_compliance(async {
186            let flush_when = get_condition("flush");
187            let transform_config = get_transform_config(flush_when, None, 10, 0);
188
189            let (tx, rx) = mpsc::channel(1);
190            let (topology, mut out) =
191                create_topology(ReceiverStream::new(rx), transform_config).await;
192
193            send_events(&tx, generate_events(1..=50)).await;
194            send_event(&tx, "flush").await;
195
196            let mut expected: [&str; 11] = [
197                "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
198            ];
199
200            assert_events(&mut expected, &mut out).await;
201
202            drop(tx);
203            topology.stop().await;
204
205            assert_eq!(out.recv().await, None);
206        })
207        .await;
208    }
209
210    #[tokio::test]
211    async fn test_before_and_after() {
212        assert_transform_compliance(async {
213            let flush_when = get_condition("flush");
214            let transform_config = get_transform_config(flush_when, None, 10, 5);
215
216            let (tx, rx) = mpsc::channel(1);
217            let (topology, mut out) =
218                create_topology(ReceiverStream::new(rx), transform_config).await;
219
220            send_events(&tx, generate_events(1..=50)).await;
221            send_event(&tx, "flush").await;
222            send_events(&tx, generate_events(51..=70)).await;
223
224            let mut expected: [&str; 16] = [
225                "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
226                "A51", "A52", "A53", "A54", "A55",
227            ];
228
229            assert_events(&mut expected, &mut out).await;
230
231            drop(tx);
232            topology.stop().await;
233
234            assert_eq!(out.recv().await, None);
235        })
236        .await;
237    }
238
239    #[tokio::test]
240    async fn test_flush_and_pass() {
241        assert_transform_compliance(async {
242            let flush_when = get_condition("flush");
243            let forward_when = get_condition("forward");
244            let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5);
245
246            let (tx, rx) = mpsc::channel(1);
247            let (topology, mut out) =
248                create_topology(ReceiverStream::new(rx), transform_config).await;
249
250            send_events(&tx, generate_events(1..=5)).await;
251            send_event(&tx, "forward").await;
252            send_events(&tx, generate_events(6..=10)).await;
253            send_event(&tx, "forward").await;
254            send_event(&tx, "flush").await;
255            send_event(&tx, "forward").await;
256            send_events(&tx, generate_events(11..=15)).await;
257            send_event(&tx, "forward").await;
258            send_events(&tx, generate_events(16..=20)).await;
259
260            let mut expected: [&str; 20] = [
261                "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08",
262                "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward",
263            ];
264
265            assert_events(&mut expected, &mut out).await;
266
267            drop(tx);
268            topology.stop().await;
269
270            assert_eq!(out.recv().await, None);
271        })
272        .await;
273    }
274
275    #[tokio::test]
276    async fn test_zero_before() {
277        assert_transform_compliance(async {
278            let flush_when = get_condition("flush");
279            let transform_config = get_transform_config(flush_when, None, 0, 5);
280
281            let (tx, rx) = mpsc::channel(1);
282            let (topology, mut out) =
283                create_topology(ReceiverStream::new(rx), transform_config).await;
284
285            send_events(&tx, generate_events(1..=50)).await;
286            send_event(&tx, "flush").await;
287            send_events(&tx, generate_events(51..=70)).await;
288
289            let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"];
290            assert_events(&mut expected, &mut out).await;
291
292            drop(tx);
293            topology.stop().await;
294
295            assert_eq!(out.recv().await, None);
296        })
297        .await;
298    }
299
300    #[tokio::test]
301    async fn test_zero_flush() {
302        assert_transform_compliance(async {
303            let flush_when = get_condition("flush");
304            let transform_config = get_transform_config(flush_when, None, 0, 0);
305
306            let (tx, rx) = mpsc::channel(1);
307            let (topology, mut out) =
308                create_topology(ReceiverStream::new(rx), transform_config).await;
309
310            send_events(&tx, generate_events(1..=50)).await;
311            send_event(&tx, "flush").await;
312            send_events(&tx, generate_events(51..=70)).await;
313
314            let mut expected: [&str; 1] = ["flush"];
315            assert_events(&mut expected, &mut out).await;
316
317            drop(tx);
318            topology.stop().await;
319
320            assert_eq!(out.recv().await, None);
321        })
322        .await;
323    }
324
325    #[tokio::test]
326    async fn test_zero_pass() {
327        assert_transform_compliance(async {
328            let flush_when = get_condition("flush");
329            let forward_when = get_condition("forward");
330            let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0);
331
332            let (tx, rx) = mpsc::channel(1);
333            let (topology, mut out) =
334                create_topology(ReceiverStream::new(rx), transform_config).await;
335
336            let events = generate_events(1..=50);
337            let more_events = generate_events(51..=70);
338
339            send_events(&tx, events).await;
340            send_event(&tx, "forward").await;
341            send_event(&tx, "flush").await;
342            send_events(&tx, more_events).await;
343
344            let mut expected: [&str; 2] = ["forward", "flush"];
345            assert_events(&mut expected, &mut out).await;
346
347            drop(tx);
348            topology.stop().await;
349
350            assert_eq!(out.recv().await, None);
351        })
352        .await;
353    }
354
355    const fn get_transform_config(
356        flush_when: AnyCondition,
357        forward_when: Option<AnyCondition>,
358        num_events_before: usize,
359        num_events_after: usize,
360    ) -> WindowConfig {
361        WindowConfig {
362            flush_when,
363            forward_when,
364            num_events_before,
365            num_events_after,
366        }
367    }
368
369    fn get_condition(message: &str) -> AnyCondition {
370        AnyCondition::from(ConditionConfig::Vrl(VrlConfig {
371            source: format!(r#".message == "{message}""#),
372            runtime: Default::default(),
373        }))
374    }
375
376    fn generate_events(range: RangeInclusive<i32>) -> Vec<Event> {
377        range
378            .map(|n| format!("A{n:02}"))
379            .map(|m| Event::from(LogEvent::from(m)))
380            .collect::<Vec<Event>>()
381    }
382
383    async fn send_events(tx: &Sender<Event>, events: Vec<Event>) {
384        for event in events {
385            tx.send(event).await.unwrap();
386        }
387    }
388
389    async fn send_event(tx: &Sender<Event>, message: &str) {
390        tx.send(Event::from(LogEvent::from(message))).await.unwrap();
391    }
392
393    async fn assert_event(message: &str, event: Option<Event>) {
394        assert_eq!(
395            &Value::from(message),
396            event.unwrap().as_log().get("message").unwrap()
397        );
398    }
399
400    async fn assert_events(messages: &mut [&str], out: &mut Receiver<Event>) {
401        for message in messages {
402            assert_event(message, out.recv().await).await;
403        }
404    }
405}