vector/transforms/dedupe/
transform.rs1use std::{future::ready, num::NonZeroUsize, pin::Pin};
2
3use bytes::Bytes;
4use futures::{Stream, StreamExt};
5use lru::LruCache;
6use vector_lib::lookup::lookup_v2::ConfigTargetPath;
7use vrl::path::OwnedTargetPath;
8
9use super::common::FieldMatchConfig;
10use crate::{
11 event::{Event, Value},
12 internal_events::DedupeEventsDropped,
13 transforms::TaskTransform,
14};
15
16#[derive(Clone)]
17pub struct Dedupe {
18 fields: FieldMatchConfig,
19 cache: LruCache<CacheEntry, bool>,
20}
21
22type TypeId = u8;
23
24#[derive(Clone, PartialEq, Eq, Hash)]
48pub(crate) enum CacheEntry {
49 Match(Vec<Option<(TypeId, Bytes)>>),
50 Ignore(Vec<(OwnedTargetPath, TypeId, Bytes)>),
51}
52
53const fn type_id_for_value(val: &Value) -> TypeId {
55 match val {
56 Value::Bytes(_) => 0,
57 Value::Timestamp(_) => 1,
58 Value::Integer(_) => 2,
59 Value::Float(_) => 3,
60 Value::Boolean(_) => 4,
61 Value::Object(_) => 5,
62 Value::Array(_) => 6,
63 Value::Null => 7,
64 Value::Regex(_) => 8,
65 }
66}
67
68impl Dedupe {
69 pub fn new(num_entries: NonZeroUsize, fields: FieldMatchConfig) -> Self {
70 Self {
71 fields,
72 cache: LruCache::new(num_entries),
73 }
74 }
75
76 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
77 let cache_entry = build_cache_entry(&event, &self.fields);
78 if self.cache.put(cache_entry, true).is_some() {
79 emit!(DedupeEventsDropped { count: 1 });
80 None
81 } else {
82 Some(event)
83 }
84 }
85}
86
87pub(crate) fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
91 match &fields {
92 FieldMatchConfig::MatchFields(fields) => {
93 let mut entry = Vec::new();
94 for field_name in fields.iter() {
95 if let Some(value) = event.as_log().get(field_name) {
96 entry.push(Some((type_id_for_value(value), value.coerce_to_bytes())));
97 } else {
98 entry.push(None);
99 }
100 }
101 CacheEntry::Match(entry)
102 }
103 FieldMatchConfig::IgnoreFields(fields) => {
104 let mut entry = Vec::new();
105
106 if let Some(event_fields) = event.as_log().all_event_fields()
107 && let Some(metadata_fields) = event.as_log().all_metadata_fields()
108 {
109 for (field_name, value) in event_fields.chain(metadata_fields) {
110 if let Ok(path) = ConfigTargetPath::try_from(field_name)
111 && !fields.contains(&path)
112 {
113 entry.push((path.0, type_id_for_value(value), value.coerce_to_bytes()));
114 }
115 }
116 }
117
118 CacheEntry::Ignore(entry)
119 }
120 }
121}
122
123impl TaskTransform<Event> for Dedupe {
124 fn transform(
125 self: Box<Self>,
126 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
127 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
128 where
129 Self: 'static,
130 {
131 let mut inner = self;
132 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
133 }
134}