1use std::collections::VecDeque;
2
3use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered};
4
5use crate::{
6 conditions::Condition,
7 event::Event,
8 internal_events::WindowEventsDropped,
9 transforms::{FunctionTransform, OutputBuffer},
10};
11
12#[derive(Clone)]
13pub struct Window {
14 forward_when: Option<Condition>,
16 flush_when: Condition,
17 num_events_before: usize,
18 num_events_after: usize,
19
20 buffer: VecDeque<Event>,
22 events_counter: usize,
23 events_dropped: Registered<WindowEventsDropped>,
24 is_flushing: bool,
25}
26
27impl Window {
28 pub fn new(
29 forward_when: Option<Condition>,
30 flush_when: Condition,
31 num_events_before: usize,
32 num_events_after: usize,
33 ) -> crate::Result<Self> {
34 let buffer = VecDeque::with_capacity(num_events_before);
35
36 Ok(Window {
37 forward_when,
38 flush_when,
39 num_events_before,
40 num_events_after,
41 events_dropped: register!(WindowEventsDropped),
42 buffer,
43 events_counter: 0,
44 is_flushing: false,
45 })
46 }
47}
48
49impl FunctionTransform for Window {
50 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
51 let (pass, event) = match self.forward_when.as_ref() {
52 Some(condition) => {
53 let (result, event) = condition.check(event);
54 (result, event)
55 }
56 _ => (false, event),
57 };
58
59 let (flush, event) = self.flush_when.check(event);
60
61 if self.buffer.capacity() < self.num_events_before {
62 self.buffer.reserve(self.num_events_before);
63 }
64
65 if pass {
66 output.push(event);
67 } else if flush {
68 if self.num_events_before > 0 {
69 self.buffer.drain(..).for_each(|evt| output.push(evt));
70 }
71
72 self.events_counter = 0;
73 self.is_flushing = true;
74 output.push(event);
75 } else if self.is_flushing {
76 self.events_counter += 1;
77
78 if self.events_counter > self.num_events_after {
79 self.events_counter = 0;
80 self.is_flushing = false;
81 self.events_dropped.emit(Count(1));
82 } else {
83 output.push(event);
84 }
85 } else if self.buffer.len() >= self.num_events_before {
86 self.buffer.pop_front();
87 self.buffer.push_back(event);
88 self.events_dropped.emit(Count(1));
89 } else if self.num_events_before > 0 {
90 self.buffer.push_back(event);
91 } else {
92 self.events_dropped.emit(Count(1));
93 }
94 }
95}
96
97#[cfg(test)]
98mod test {
99 use std::ops::RangeInclusive;
100
101 use tokio::sync::{
102 mpsc,
103 mpsc::{Receiver, Sender},
104 };
105 use tokio_stream::wrappers::ReceiverStream;
106 use vrl::core::Value;
107
108 use crate::{
109 conditions::{AnyCondition, ConditionConfig, VrlConfig},
110 event::{Event, LogEvent},
111 test_util::components::assert_transform_compliance,
112 transforms::{test::create_topology, window::config::WindowConfig},
113 };
114
115 #[tokio::test]
116 async fn test_flush() {
117 assert_transform_compliance(async {
118 let flush_when = get_condition("flush");
119 let transform_config = get_transform_config(flush_when, None, 1, 0);
120
121 let (tx, rx) = mpsc::channel(1);
122 let (topology, mut out) =
123 create_topology(ReceiverStream::new(rx), transform_config).await;
124
125 send_event(&tx, "flush").await;
126 assert_event("flush", out.recv().await).await;
127
128 drop(tx);
129 topology.stop().await;
130
131 assert_eq!(out.recv().await, None);
132 })
133 .await;
134 }
135
136 #[tokio::test]
137 async fn test_pass() {
138 assert_transform_compliance(async {
139 let flush_when = get_condition("flush");
140 let forward_when = get_condition("forward");
141 let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0);
142
143 let (tx, rx) = mpsc::channel(1);
144 let (topology, mut out) =
145 create_topology(ReceiverStream::new(rx), transform_config).await;
146
147 send_event(&tx, "forward").await;
148 assert_event("forward", out.recv().await).await;
149
150 drop(tx);
151 topology.stop().await;
152
153 assert_eq!(out.recv().await, None);
154 })
155 .await;
156 }
157
158 #[tokio::test]
159 async fn test_10_in_50() {
160 assert_transform_compliance(async {
161 let flush_when = get_condition("flush");
162 let transform_config = get_transform_config(flush_when, None, 50, 0);
163
164 let (tx, rx) = mpsc::channel(1);
165 let (topology, mut out) =
166 create_topology(ReceiverStream::new(rx), transform_config).await;
167
168 send_events(&tx, generate_events(1..=10)).await;
169 send_event(&tx, "flush").await;
170
171 let mut expected: [&str; 11] = [
172 "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush",
173 ];
174
175 assert_events(&mut expected, &mut out).await;
176
177 drop(tx);
178 topology.stop().await;
179
180 assert_eq!(out.recv().await, None);
181 })
182 .await;
183 }
184
185 #[tokio::test]
186 async fn test_50_in_10() {
187 assert_transform_compliance(async {
188 let flush_when = get_condition("flush");
189 let transform_config = get_transform_config(flush_when, None, 10, 0);
190
191 let (tx, rx) = mpsc::channel(1);
192 let (topology, mut out) =
193 create_topology(ReceiverStream::new(rx), transform_config).await;
194
195 send_events(&tx, generate_events(1..=50)).await;
196 send_event(&tx, "flush").await;
197
198 let mut expected: [&str; 11] = [
199 "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
200 ];
201
202 assert_events(&mut expected, &mut out).await;
203
204 drop(tx);
205 topology.stop().await;
206
207 assert_eq!(out.recv().await, None);
208 })
209 .await;
210 }
211
212 #[tokio::test]
213 async fn test_before_and_after() {
214 assert_transform_compliance(async {
215 let flush_when = get_condition("flush");
216 let transform_config = get_transform_config(flush_when, None, 10, 5);
217
218 let (tx, rx) = mpsc::channel(1);
219 let (topology, mut out) =
220 create_topology(ReceiverStream::new(rx), transform_config).await;
221
222 send_events(&tx, generate_events(1..=50)).await;
223 send_event(&tx, "flush").await;
224 send_events(&tx, generate_events(51..=70)).await;
225
226 let mut expected: [&str; 16] = [
227 "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush",
228 "A51", "A52", "A53", "A54", "A55",
229 ];
230
231 assert_events(&mut expected, &mut out).await;
232
233 drop(tx);
234 topology.stop().await;
235
236 assert_eq!(out.recv().await, None);
237 })
238 .await;
239 }
240
241 #[tokio::test]
242 async fn test_flush_and_pass() {
243 assert_transform_compliance(async {
244 let flush_when = get_condition("flush");
245 let forward_when = get_condition("forward");
246 let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5);
247
248 let (tx, rx) = mpsc::channel(1);
249 let (topology, mut out) =
250 create_topology(ReceiverStream::new(rx), transform_config).await;
251
252 send_events(&tx, generate_events(1..=5)).await;
253 send_event(&tx, "forward").await;
254 send_events(&tx, generate_events(6..=10)).await;
255 send_event(&tx, "forward").await;
256 send_event(&tx, "flush").await;
257 send_event(&tx, "forward").await;
258 send_events(&tx, generate_events(11..=15)).await;
259 send_event(&tx, "forward").await;
260 send_events(&tx, generate_events(16..=20)).await;
261
262 let mut expected: [&str; 20] = [
263 "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08",
264 "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward",
265 ];
266
267 assert_events(&mut expected, &mut out).await;
268
269 drop(tx);
270 topology.stop().await;
271
272 assert_eq!(out.recv().await, None);
273 })
274 .await;
275 }
276
277 #[tokio::test]
278 async fn test_zero_before() {
279 assert_transform_compliance(async {
280 let flush_when = get_condition("flush");
281 let transform_config = get_transform_config(flush_when, None, 0, 5);
282
283 let (tx, rx) = mpsc::channel(1);
284 let (topology, mut out) =
285 create_topology(ReceiverStream::new(rx), transform_config).await;
286
287 send_events(&tx, generate_events(1..=50)).await;
288 send_event(&tx, "flush").await;
289 send_events(&tx, generate_events(51..=70)).await;
290
291 let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"];
292 assert_events(&mut expected, &mut out).await;
293
294 drop(tx);
295 topology.stop().await;
296
297 assert_eq!(out.recv().await, None);
298 })
299 .await;
300 }
301
302 #[tokio::test]
303 async fn test_zero_flush() {
304 assert_transform_compliance(async {
305 let flush_when = get_condition("flush");
306 let transform_config = get_transform_config(flush_when, None, 0, 0);
307
308 let (tx, rx) = mpsc::channel(1);
309 let (topology, mut out) =
310 create_topology(ReceiverStream::new(rx), transform_config).await;
311
312 send_events(&tx, generate_events(1..=50)).await;
313 send_event(&tx, "flush").await;
314 send_events(&tx, generate_events(51..=70)).await;
315
316 let mut expected: [&str; 1] = ["flush"];
317 assert_events(&mut expected, &mut out).await;
318
319 drop(tx);
320 topology.stop().await;
321
322 assert_eq!(out.recv().await, None);
323 })
324 .await;
325 }
326
327 #[tokio::test]
328 async fn test_zero_pass() {
329 assert_transform_compliance(async {
330 let flush_when = get_condition("flush");
331 let forward_when = get_condition("forward");
332 let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0);
333
334 let (tx, rx) = mpsc::channel(1);
335 let (topology, mut out) =
336 create_topology(ReceiverStream::new(rx), transform_config).await;
337
338 let events = generate_events(1..=50);
339 let more_events = generate_events(51..=70);
340
341 send_events(&tx, events).await;
342 send_event(&tx, "forward").await;
343 send_event(&tx, "flush").await;
344 send_events(&tx, more_events).await;
345
346 let mut expected: [&str; 2] = ["forward", "flush"];
347 assert_events(&mut expected, &mut out).await;
348
349 drop(tx);
350 topology.stop().await;
351
352 assert_eq!(out.recv().await, None);
353 })
354 .await;
355 }
356
357 const fn get_transform_config(
358 flush_when: AnyCondition,
359 forward_when: Option<AnyCondition>,
360 num_events_before: usize,
361 num_events_after: usize,
362 ) -> WindowConfig {
363 WindowConfig {
364 flush_when,
365 forward_when,
366 num_events_before,
367 num_events_after,
368 }
369 }
370
371 fn get_condition(message: &str) -> AnyCondition {
372 AnyCondition::from(ConditionConfig::Vrl(VrlConfig {
373 source: format!(r#".message == "{message}""#),
374 runtime: Default::default(),
375 }))
376 }
377
378 fn generate_events(range: RangeInclusive<i32>) -> Vec<Event> {
379 range
380 .map(|n| format!("A{n:02}"))
381 .map(|m| Event::from(LogEvent::from(m)))
382 .collect::<Vec<Event>>()
383 }
384
385 async fn send_events(tx: &Sender<Event>, events: Vec<Event>) {
386 for event in events {
387 tx.send(event).await.unwrap();
388 }
389 }
390
391 async fn send_event(tx: &Sender<Event>, message: &str) {
392 tx.send(Event::from(LogEvent::from(message))).await.unwrap();
393 }
394
395 async fn assert_event(message: &str, event: Option<Event>) {
396 assert_eq!(
397 &Value::from(message),
398 event.unwrap().as_log().get("message").unwrap()
399 );
400 }
401
402 async fn assert_events(messages: &mut [&str], out: &mut Receiver<Event>) {
403 for message in messages {
404 assert_event(message, out.recv().await).await;
405 }
406 }
407}