vector/transforms/dedupe/
timed_transform.rs1use std::{future::ready, num::NonZeroUsize, pin::Pin, time::Instant};
2
3use futures::{Stream, StreamExt};
4use lru::LruCache;
5
6use crate::{event::Event, internal_events::DedupeEventsDropped, transforms::TaskTransform};
7
8use super::{
9 common::{FieldMatchConfig, TimedCacheConfig},
10 transform::{build_cache_entry, CacheEntry},
11};
12
13#[derive(Clone)]
14pub struct TimedDedupe {
15 fields: FieldMatchConfig,
16 cache: LruCache<CacheEntry, Instant>,
17 time_config: TimedCacheConfig,
18}
19
20impl TimedDedupe {
21 pub fn new(
22 num_entries: NonZeroUsize,
23 fields: FieldMatchConfig,
24 time_config: TimedCacheConfig,
25 ) -> Self {
26 Self {
27 fields,
28 cache: LruCache::new(num_entries),
29 time_config,
30 }
31 }
32
33 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
34 let cache_entry = build_cache_entry(&event, &self.fields);
35 let now = Instant::now();
36 let drop_event = match self.cache.get(&cache_entry) {
37 Some(&time) => {
38 let drop = now.duration_since(time) < self.time_config.max_age_ms;
39 if self.time_config.refresh_on_drop || !drop {
40 self.cache.put(cache_entry, now);
41 }
42 drop
43 }
44 None => {
45 self.cache.put(cache_entry, now);
46 false
47 }
48 };
49 if drop_event {
50 emit!(DedupeEventsDropped { count: 1 });
51 None
52 } else {
53 Some(event)
54 }
55 }
56}
57
58impl TaskTransform<Event> for TimedDedupe {
59 fn transform(
60 self: Box<Self>,
61 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
62 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
63 where
64 Self: 'static,
65 {
66 let mut inner = self;
67 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
68 }
69}