vector/transforms/dedupe/
timed_transform.rs1use std::{future::ready, num::NonZeroUsize, pin::Pin, time::Instant};
2
3use futures::{Stream, StreamExt};
4use lru::LruCache;
5
6use super::{
7 common::{FieldMatchConfig, TimedCacheConfig},
8 transform::{CacheEntry, build_cache_entry},
9};
10use crate::{event::Event, internal_events::DedupeEventsDropped, transforms::TaskTransform};
11
12#[derive(Clone)]
13pub struct TimedDedupe {
14 fields: FieldMatchConfig,
15 cache: LruCache<CacheEntry, Instant>,
16 time_config: TimedCacheConfig,
17}
18
19impl TimedDedupe {
20 pub fn new(
21 num_entries: NonZeroUsize,
22 fields: FieldMatchConfig,
23 time_config: TimedCacheConfig,
24 ) -> Self {
25 Self {
26 fields,
27 cache: LruCache::new(num_entries),
28 time_config,
29 }
30 }
31
32 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
33 let cache_entry = build_cache_entry(&event, &self.fields);
34 let now = Instant::now();
35 let drop_event = match self.cache.get(&cache_entry) {
36 Some(&time) => {
37 let drop = now.duration_since(time) < self.time_config.max_age_ms;
38 if self.time_config.refresh_on_drop || !drop {
39 self.cache.put(cache_entry, now);
40 }
41 drop
42 }
43 None => {
44 self.cache.put(cache_entry, now);
45 false
46 }
47 };
48 if drop_event {
49 emit!(DedupeEventsDropped { count: 1 });
50 None
51 } else {
52 Some(event)
53 }
54 }
55}
56
57impl TaskTransform<Event> for TimedDedupe {
58 fn transform(
59 self: Box<Self>,
60 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
61 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
62 where
63 Self: 'static,
64 {
65 let mut inner = self;
66 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
67 }
68}