vector/transforms/dedupe/
transform.rs

1use std::{future::ready, num::NonZeroUsize, pin::Pin};
2
3use bytes::Bytes;
4use futures::{Stream, StreamExt};
5use lru::LruCache;
6use vector_lib::lookup::lookup_v2::ConfigTargetPath;
7use vrl::path::OwnedTargetPath;
8
9use crate::{
10    event::{Event, Value},
11    internal_events::DedupeEventsDropped,
12    transforms::TaskTransform,
13};
14
15use super::common::FieldMatchConfig;
16
17#[derive(Clone)]
18pub struct Dedupe {
19    fields: FieldMatchConfig,
20    cache: LruCache<CacheEntry, bool>,
21}
22
23type TypeId = u8;
24
25/// A CacheEntry comes in two forms, depending on the FieldMatchConfig in use.
26///
27/// When matching fields, a CacheEntry contains a vector of optional 2-tuples.
28/// Each element in the vector represents one field in the corresponding
29/// LogEvent. Elements in the vector will correspond 1:1 (and in order) to the
30/// fields specified in "fields.match". The tuples each store the TypeId for
31/// this field and the data as Bytes for the field. There is no need to store
32/// the field name because the elements of the vector correspond 1:1 to
33/// "fields.match", so there is never any ambiguity about what field is being
34/// referred to. If a field from "fields.match" does not show up in an incoming
35/// Event, the CacheEntry will have None in the correspond location in the
36/// vector.
37///
38/// When ignoring fields, a CacheEntry contains a vector of 3-tuples. Each
39/// element in the vector represents one field in the corresponding LogEvent.
40/// The tuples will each contain the field name, TypeId, and data as Bytes for
41/// the corresponding field (in that order). Since the set of fields that might
42/// go into CacheEntries is not known at startup, we must store the field names
43/// as part of CacheEntries. Since Event objects store their field in alphabetic
44/// order (as they are backed by a BTreeMap), and we build CacheEntries by
45/// iterating over the fields of the incoming Events, we know that the
46/// CacheEntries for 2 equivalent events will always contain the fields in the
47/// same order.
48#[derive(Clone, PartialEq, Eq, Hash)]
49pub(crate) enum CacheEntry {
50    Match(Vec<Option<(TypeId, Bytes)>>),
51    Ignore(Vec<(OwnedTargetPath, TypeId, Bytes)>),
52}
53
54/// Assigns a unique number to each of the types supported by Event::Value.
55const fn type_id_for_value(val: &Value) -> TypeId {
56    match val {
57        Value::Bytes(_) => 0,
58        Value::Timestamp(_) => 1,
59        Value::Integer(_) => 2,
60        Value::Float(_) => 3,
61        Value::Boolean(_) => 4,
62        Value::Object(_) => 5,
63        Value::Array(_) => 6,
64        Value::Null => 7,
65        Value::Regex(_) => 8,
66    }
67}
68
69impl Dedupe {
70    pub fn new(num_entries: NonZeroUsize, fields: FieldMatchConfig) -> Self {
71        Self {
72            fields,
73            cache: LruCache::new(num_entries),
74        }
75    }
76
77    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
78        let cache_entry = build_cache_entry(&event, &self.fields);
79        if self.cache.put(cache_entry, true).is_some() {
80            emit!(DedupeEventsDropped { count: 1 });
81            None
82        } else {
83            Some(event)
84        }
85    }
86}
87
88/// Takes in an Event and returns a CacheEntry to place into the LRU cache
89/// containing all relevant information for the fields that need matching
90/// against according to the specified FieldMatchConfig.
91pub(crate) fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
92    match &fields {
93        FieldMatchConfig::MatchFields(fields) => {
94            let mut entry = Vec::new();
95            for field_name in fields.iter() {
96                if let Some(value) = event.as_log().get(field_name) {
97                    entry.push(Some((type_id_for_value(value), value.coerce_to_bytes())));
98                } else {
99                    entry.push(None);
100                }
101            }
102            CacheEntry::Match(entry)
103        }
104        FieldMatchConfig::IgnoreFields(fields) => {
105            let mut entry = Vec::new();
106
107            if let Some(event_fields) = event.as_log().all_event_fields() {
108                if let Some(metadata_fields) = event.as_log().all_metadata_fields() {
109                    for (field_name, value) in event_fields.chain(metadata_fields) {
110                        if let Ok(path) = ConfigTargetPath::try_from(field_name) {
111                            if !fields.contains(&path) {
112                                entry.push((
113                                    path.0,
114                                    type_id_for_value(value),
115                                    value.coerce_to_bytes(),
116                                ));
117                            }
118                        }
119                    }
120                }
121            }
122
123            CacheEntry::Ignore(entry)
124        }
125    }
126}
127
128impl TaskTransform<Event> for Dedupe {
129    fn transform(
130        self: Box<Self>,
131        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
132    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
133    where
134        Self: 'static,
135    {
136        let mut inner = self;
137        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
138    }
139}