vector/transforms/dedupe/
timed_transform.rs

1use std::{future::ready, num::NonZeroUsize, pin::Pin, time::Instant};
2
3use futures::{Stream, StreamExt};
4use lru::LruCache;
5
6use crate::{event::Event, internal_events::DedupeEventsDropped, transforms::TaskTransform};
7
8use super::{
9    common::{FieldMatchConfig, TimedCacheConfig},
10    transform::{build_cache_entry, CacheEntry},
11};
12
13#[derive(Clone)]
14pub struct TimedDedupe {
15    fields: FieldMatchConfig,
16    cache: LruCache<CacheEntry, Instant>,
17    time_config: TimedCacheConfig,
18}
19
20impl TimedDedupe {
21    pub fn new(
22        num_entries: NonZeroUsize,
23        fields: FieldMatchConfig,
24        time_config: TimedCacheConfig,
25    ) -> Self {
26        Self {
27            fields,
28            cache: LruCache::new(num_entries),
29            time_config,
30        }
31    }
32
33    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
34        let cache_entry = build_cache_entry(&event, &self.fields);
35        let now = Instant::now();
36        let drop_event = match self.cache.get(&cache_entry) {
37            Some(&time) => {
38                let drop = now.duration_since(time) < self.time_config.max_age_ms;
39                if self.time_config.refresh_on_drop || !drop {
40                    self.cache.put(cache_entry, now);
41                }
42                drop
43            }
44            None => {
45                self.cache.put(cache_entry, now);
46                false
47            }
48        };
49        if drop_event {
50            emit!(DedupeEventsDropped { count: 1 });
51            None
52        } else {
53            Some(event)
54        }
55    }
56}
57
58impl TaskTransform<Event> for TimedDedupe {
59    fn transform(
60        self: Box<Self>,
61        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
62    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
63    where
64        Self: 'static,
65    {
66        let mut inner = self;
67        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
68    }
69}