vector/transforms/dedupe/
transform.rs1use std::{future::ready, num::NonZeroUsize, pin::Pin};
2
3use bytes::Bytes;
4use futures::{Stream, StreamExt};
5use lru::LruCache;
6use vector_lib::lookup::lookup_v2::ConfigTargetPath;
7use vrl::path::OwnedTargetPath;
8
9use crate::{
10 event::{Event, Value},
11 internal_events::DedupeEventsDropped,
12 transforms::TaskTransform,
13};
14
15use super::common::FieldMatchConfig;
16
17#[derive(Clone)]
18pub struct Dedupe {
19 fields: FieldMatchConfig,
20 cache: LruCache<CacheEntry, bool>,
21}
22
23type TypeId = u8;
24
25#[derive(Clone, PartialEq, Eq, Hash)]
49pub(crate) enum CacheEntry {
50 Match(Vec<Option<(TypeId, Bytes)>>),
51 Ignore(Vec<(OwnedTargetPath, TypeId, Bytes)>),
52}
53
54const fn type_id_for_value(val: &Value) -> TypeId {
56 match val {
57 Value::Bytes(_) => 0,
58 Value::Timestamp(_) => 1,
59 Value::Integer(_) => 2,
60 Value::Float(_) => 3,
61 Value::Boolean(_) => 4,
62 Value::Object(_) => 5,
63 Value::Array(_) => 6,
64 Value::Null => 7,
65 Value::Regex(_) => 8,
66 }
67}
68
69impl Dedupe {
70 pub fn new(num_entries: NonZeroUsize, fields: FieldMatchConfig) -> Self {
71 Self {
72 fields,
73 cache: LruCache::new(num_entries),
74 }
75 }
76
77 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
78 let cache_entry = build_cache_entry(&event, &self.fields);
79 if self.cache.put(cache_entry, true).is_some() {
80 emit!(DedupeEventsDropped { count: 1 });
81 None
82 } else {
83 Some(event)
84 }
85 }
86}
87
88pub(crate) fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
92 match &fields {
93 FieldMatchConfig::MatchFields(fields) => {
94 let mut entry = Vec::new();
95 for field_name in fields.iter() {
96 if let Some(value) = event.as_log().get(field_name) {
97 entry.push(Some((type_id_for_value(value), value.coerce_to_bytes())));
98 } else {
99 entry.push(None);
100 }
101 }
102 CacheEntry::Match(entry)
103 }
104 FieldMatchConfig::IgnoreFields(fields) => {
105 let mut entry = Vec::new();
106
107 if let Some(event_fields) = event.as_log().all_event_fields() {
108 if let Some(metadata_fields) = event.as_log().all_metadata_fields() {
109 for (field_name, value) in event_fields.chain(metadata_fields) {
110 if let Ok(path) = ConfigTargetPath::try_from(field_name) {
111 if !fields.contains(&path) {
112 entry.push((
113 path.0,
114 type_id_for_value(value),
115 value.coerce_to_bytes(),
116 ));
117 }
118 }
119 }
120 }
121 }
122
123 CacheEntry::Ignore(entry)
124 }
125 }
126}
127
128impl TaskTransform<Event> for Dedupe {
129 fn transform(
130 self: Box<Self>,
131 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
132 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
133 where
134 Self: 'static,
135 {
136 let mut inner = self;
137 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
138 }
139}