vector/transforms/dedupe/
timed_transform.rs

1use std::{future::ready, num::NonZeroUsize, pin::Pin, time::Instant};
2
3use futures::{Stream, StreamExt};
4use lru::LruCache;
5
6use super::{
7    common::{FieldMatchConfig, TimedCacheConfig},
8    transform::{CacheEntry, build_cache_entry},
9};
10use crate::{event::Event, internal_events::DedupeEventsDropped, transforms::TaskTransform};
11
12#[derive(Clone)]
13pub struct TimedDedupe {
14    fields: FieldMatchConfig,
15    cache: LruCache<CacheEntry, Instant>,
16    time_config: TimedCacheConfig,
17}
18
19impl TimedDedupe {
20    pub fn new(
21        num_entries: NonZeroUsize,
22        fields: FieldMatchConfig,
23        time_config: TimedCacheConfig,
24    ) -> Self {
25        Self {
26            fields,
27            cache: LruCache::new(num_entries),
28            time_config,
29        }
30    }
31
32    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
33        let cache_entry = build_cache_entry(&event, &self.fields);
34        let now = Instant::now();
35        let drop_event = match self.cache.get(&cache_entry) {
36            Some(&time) => {
37                let drop = now.duration_since(time) < self.time_config.max_age_ms;
38                if self.time_config.refresh_on_drop || !drop {
39                    self.cache.put(cache_entry, now);
40                }
41                drop
42            }
43            None => {
44                self.cache.put(cache_entry, now);
45                false
46            }
47        };
48        if drop_event {
49            emit!(DedupeEventsDropped { count: 1 });
50            None
51        } else {
52            Some(event)
53        }
54    }
55}
56
57impl TaskTransform<Event> for TimedDedupe {
58    fn transform(
59        self: Box<Self>,
60        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
61    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
62    where
63        Self: 'static,
64    {
65        let mut inner = self;
66        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
67    }
68}