1use std::collections::VecDeque;
2
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6 conditions::Condition,
7 event::Event,
8 internal_events::WindowEventsDropped,
9 transforms::{FunctionTransform, OutputBuffer},
10};
11
12#[derive(Clone)]
13pub struct Window {
14 forward_when: Option<Condition>,
16 flush_when: Condition,
17 num_events_before: usize,
18 num_events_after: usize,
19
20 buffer: VecDeque<Event>,
22 events_counter: usize,
23 events_dropped: Registered<WindowEventsDropped>,
24 is_flushing: bool,
25}
26
27impl Window {
28 pub fn new(
29 forward_when: Option<Condition>,
30 flush_when: Condition,
31 num_events_before: usize,
32 num_events_after: usize,
33 ) -> crate::Result<Self> {
34 let buffer = VecDeque::with_capacity(num_events_before);
35
36 Ok(Window {
37 forward_when,
38 flush_when,
39 num_events_before,
40 num_events_after,
41 events_dropped: register!(WindowEventsDropped),
42 buffer,
43 events_counter: 0,
44 is_flushing: false,
45 })
46 }
47}
48
49impl FunctionTransform for Window {
50 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
51 let (pass, event) = match self.forward_when.as_ref() {
52 Some(condition) => {
53 let (result, event) = condition.check(event);
54 (result, event)
55 }
56 _ => (false, event),
57 };
58
59 let (flush, event) = self.flush_when.check(event);
60
61 if self.buffer.capacity() < self.num_events_before {
62 self.buffer.reserve(self.num_events_before);
63 }
64
65 if pass {
66 output.push(event);
67 } else if flush {
68 if self.num_events_before > 0 {
69 self.buffer.drain(..).for_each(|evt| output.push(evt));
70 }
71
72 self.events_counter = 0;
73 self.is_flushing = true;
74 output.push(event);
75 } else if self.is_flushing {
76 self.events_counter += 1;
77
78 if self.events_counter > self.num_events_after {
79 self.events_counter = 0;
80 self.is_flushing = false;
81 self.events_dropped.emit(Count(1));
82 } else {
83 output.push(event);
84 }
85 } else if self.buffer.len() >= self.num_events_before {
86 self.buffer.pop_front();
87 self.buffer.push_back(event);
88 self.events_dropped.emit(Count(1));
89 } else if self.num_events_before > 0 {
90 self.buffer.push_back(event);
91 } else {
92 self.events_dropped.emit(Count(1));
93 }
94 }
95}
96
97#[cfg(test)]
98mod test {
99 use std::ops::RangeInclusive;
100 use tokio::sync::mpsc;
101 use tokio::sync::mpsc::{Receiver, Sender};
102 use tokio_stream::wrappers::ReceiverStream;
103 use vrl::core::Value;
104
105 use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig};
106 use crate::transforms::window::config::WindowConfig;
107 use crate::{
108 event::{Event, LogEvent},
109 test_util::components::assert_transform_compliance,
110 transforms::test::create_topology,
111 };
112
113 #[tokio::test]
114 async fn test_flush() {
115 assert_transform_compliance(async {
116 let flush_when = get_condition("flush");
117 let transform_config = get_transform_config(flush_when, None, 1, 0);
118
119 let (tx, rx) = mpsc::channel(1);
120 let (topology, mut out) =
121 create_topology(ReceiverStream::new(rx), transform_config).await;
122
123 send_event(&tx, "flush").await;
124 assert_event("flush", out.recv().await).await;
125
126 drop(tx);
127 topology.stop().await;
128
129 assert_eq!(out.recv().await, None);
130 })
131 .await;
132 }
133
134 #[tokio::test]
135 async fn test_pass() {
136 assert_transform_compliance(async {
137 let flush_when = get_condition("flush");
138 let forward_when = get_condition("forward");
139 let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0);
140
141 let (tx, rx) = mpsc::channel(1);
142 let (topology, mut out) =
143 create_topology(ReceiverStream::new(rx), transform_config).await;
144
145 send_event(&tx, "forward").await;
146 assert_event("forward", out.recv().await).await;
147
148 drop(tx);
149 topology.stop().await;
150
151 assert_eq!(out.recv().await, None);
152 })
153 .await;
154 }
155
156 #[tokio::test]
157 async fn test_10_in_50() {
158 assert_transform_compliance(async {
159 let flush_when = get_condition("flush");
160 let transform_config = get_transform_config(flush_when, None, 50, 0);
161
162 let (tx, rx) = mpsc::channel(1);
163 let (topology, mut out) =
164 create_topology(ReceiverStream::new(rx), transform_config).await;
165
166 send_events(&tx, generate_events(1..=10)).await;
167 send_event(&tx, "flush").await;
168
169 let mut expected: [&str; 11] = [
170 "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush",
171 ];
172
173 assert_events(&mut expected, &mut out).await;
174
175 drop(tx);
176 topology.stop().await;
177
178 assert_eq!(out.recv().await, None);
179 })
180 .await;
181 }
182
183 #[tokio::test]
184 async fn test_50_in_10() {
185 assert_transform_compliance(async {
186 let flush_when = get_condition("flush");
187 let transform_config = get_transform_config(flush_when, None, 10, 0);
188
189 let (tx, rx) = mpsc::channel(1);
190 let (topology, mut out) =
191 create_topology(ReceiverStream::new(rx), transform_config).await;
192
193 send_events(&tx, generate_events(1..=50)).await;
194 send_event(&tx, "flush").await;
195
196 let mut expected: [&str; 11] = [
197 "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
198 ];
199
200 assert_events(&mut expected, &mut out).await;
201
202 drop(tx);
203 topology.stop().await;
204
205 assert_eq!(out.recv().await, None);
206 })
207 .await;
208 }
209
210 #[tokio::test]
211 async fn test_before_and_after() {
212 assert_transform_compliance(async {
213 let flush_when = get_condition("flush");
214 let transform_config = get_transform_config(flush_when, None, 10, 5);
215
216 let (tx, rx) = mpsc::channel(1);
217 let (topology, mut out) =
218 create_topology(ReceiverStream::new(rx), transform_config).await;
219
220 send_events(&tx, generate_events(1..=50)).await;
221 send_event(&tx, "flush").await;
222 send_events(&tx, generate_events(51..=70)).await;
223
224 let mut expected: [&str; 16] = [
225 "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
226 "A51", "A52", "A53", "A54", "A55",
227 ];
228
229 assert_events(&mut expected, &mut out).await;
230
231 drop(tx);
232 topology.stop().await;
233
234 assert_eq!(out.recv().await, None);
235 })
236 .await;
237 }
238
239 #[tokio::test]
240 async fn test_flush_and_pass() {
241 assert_transform_compliance(async {
242 let flush_when = get_condition("flush");
243 let forward_when = get_condition("forward");
244 let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5);
245
246 let (tx, rx) = mpsc::channel(1);
247 let (topology, mut out) =
248 create_topology(ReceiverStream::new(rx), transform_config).await;
249
250 send_events(&tx, generate_events(1..=5)).await;
251 send_event(&tx, "forward").await;
252 send_events(&tx, generate_events(6..=10)).await;
253 send_event(&tx, "forward").await;
254 send_event(&tx, "flush").await;
255 send_event(&tx, "forward").await;
256 send_events(&tx, generate_events(11..=15)).await;
257 send_event(&tx, "forward").await;
258 send_events(&tx, generate_events(16..=20)).await;
259
260 let mut expected: [&str; 20] = [
261 "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08",
262 "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward",
263 ];
264
265 assert_events(&mut expected, &mut out).await;
266
267 drop(tx);
268 topology.stop().await;
269
270 assert_eq!(out.recv().await, None);
271 })
272 .await;
273 }
274
275 #[tokio::test]
276 async fn test_zero_before() {
277 assert_transform_compliance(async {
278 let flush_when = get_condition("flush");
279 let transform_config = get_transform_config(flush_when, None, 0, 5);
280
281 let (tx, rx) = mpsc::channel(1);
282 let (topology, mut out) =
283 create_topology(ReceiverStream::new(rx), transform_config).await;
284
285 send_events(&tx, generate_events(1..=50)).await;
286 send_event(&tx, "flush").await;
287 send_events(&tx, generate_events(51..=70)).await;
288
289 let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"];
290 assert_events(&mut expected, &mut out).await;
291
292 drop(tx);
293 topology.stop().await;
294
295 assert_eq!(out.recv().await, None);
296 })
297 .await;
298 }
299
300 #[tokio::test]
301 async fn test_zero_flush() {
302 assert_transform_compliance(async {
303 let flush_when = get_condition("flush");
304 let transform_config = get_transform_config(flush_when, None, 0, 0);
305
306 let (tx, rx) = mpsc::channel(1);
307 let (topology, mut out) =
308 create_topology(ReceiverStream::new(rx), transform_config).await;
309
310 send_events(&tx, generate_events(1..=50)).await;
311 send_event(&tx, "flush").await;
312 send_events(&tx, generate_events(51..=70)).await;
313
314 let mut expected: [&str; 1] = ["flush"];
315 assert_events(&mut expected, &mut out).await;
316
317 drop(tx);
318 topology.stop().await;
319
320 assert_eq!(out.recv().await, None);
321 })
322 .await;
323 }
324
325 #[tokio::test]
326 async fn test_zero_pass() {
327 assert_transform_compliance(async {
328 let flush_when = get_condition("flush");
329 let forward_when = get_condition("forward");
330 let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0);
331
332 let (tx, rx) = mpsc::channel(1);
333 let (topology, mut out) =
334 create_topology(ReceiverStream::new(rx), transform_config).await;
335
336 let events = generate_events(1..=50);
337 let more_events = generate_events(51..=70);
338
339 send_events(&tx, events).await;
340 send_event(&tx, "forward").await;
341 send_event(&tx, "flush").await;
342 send_events(&tx, more_events).await;
343
344 let mut expected: [&str; 2] = ["forward", "flush"];
345 assert_events(&mut expected, &mut out).await;
346
347 drop(tx);
348 topology.stop().await;
349
350 assert_eq!(out.recv().await, None);
351 })
352 .await;
353 }
354
355 const fn get_transform_config(
356 flush_when: AnyCondition,
357 forward_when: Option<AnyCondition>,
358 num_events_before: usize,
359 num_events_after: usize,
360 ) -> WindowConfig {
361 WindowConfig {
362 flush_when,
363 forward_when,
364 num_events_before,
365 num_events_after,
366 }
367 }
368
369 fn get_condition(message: &str) -> AnyCondition {
370 AnyCondition::from(ConditionConfig::Vrl(VrlConfig {
371 source: format!(r#".message == "{message}""#),
372 runtime: Default::default(),
373 }))
374 }
375
376 fn generate_events(range: RangeInclusive<i32>) -> Vec<Event> {
377 range
378 .map(|n| format!("A{n:02}"))
379 .map(|m| Event::from(LogEvent::from(m)))
380 .collect::<Vec<Event>>()
381 }
382
383 async fn send_events(tx: &Sender<Event>, events: Vec<Event>) {
384 for event in events {
385 tx.send(event).await.unwrap();
386 }
387 }
388
389 async fn send_event(tx: &Sender<Event>, message: &str) {
390 tx.send(Event::from(LogEvent::from(message))).await.unwrap();
391 }
392
393 async fn assert_event(message: &str, event: Option<Event>) {
394 assert_eq!(
395 &Value::from(message),
396 event.unwrap().as_log().get("message").unwrap()
397 );
398 }
399
400 async fn assert_events(messages: &mut [&str], out: &mut Receiver<Event>) {
401 for message in messages {
402 assert_event(message, out.recv().await).await;
403 }
404 }
405}