vector/transforms/dedupe/
transform.rs

1use std::{future::ready, num::NonZeroUsize, pin::Pin};
2
3use bytes::Bytes;
4use futures::{Stream, StreamExt};
5use lru::LruCache;
6use vector_lib::lookup::lookup_v2::ConfigTargetPath;
7use vrl::path::OwnedTargetPath;
8
9use super::common::FieldMatchConfig;
10use crate::{
11    event::{Event, Value},
12    internal_events::DedupeEventsDropped,
13    transforms::TaskTransform,
14};
15
16#[derive(Clone)]
17pub struct Dedupe {
18    fields: FieldMatchConfig,
19    cache: LruCache<CacheEntry, bool>,
20}
21
22type TypeId = u8;
23
24/// A CacheEntry comes in two forms, depending on the FieldMatchConfig in use.
25///
26/// When matching fields, a CacheEntry contains a vector of optional 2-tuples.
27/// Each element in the vector represents one field in the corresponding
28/// LogEvent. Elements in the vector will correspond 1:1 (and in order) to the
29/// fields specified in "fields.match". The tuples each store the TypeId for
30/// this field and the data as Bytes for the field. There is no need to store
31/// the field name because the elements of the vector correspond 1:1 to
32/// "fields.match", so there is never any ambiguity about what field is being
33/// referred to. If a field from "fields.match" does not show up in an incoming
34/// Event, the CacheEntry will have None in the correspond location in the
35/// vector.
36///
37/// When ignoring fields, a CacheEntry contains a vector of 3-tuples. Each
38/// element in the vector represents one field in the corresponding LogEvent.
39/// The tuples will each contain the field name, TypeId, and data as Bytes for
40/// the corresponding field (in that order). Since the set of fields that might
41/// go into CacheEntries is not known at startup, we must store the field names
42/// as part of CacheEntries. Since Event objects store their field in alphabetic
43/// order (as they are backed by a BTreeMap), and we build CacheEntries by
44/// iterating over the fields of the incoming Events, we know that the
45/// CacheEntries for 2 equivalent events will always contain the fields in the
46/// same order.
47#[derive(Clone, PartialEq, Eq, Hash)]
48pub(crate) enum CacheEntry {
49    Match(Vec<Option<(TypeId, Bytes)>>),
50    Ignore(Vec<(OwnedTargetPath, TypeId, Bytes)>),
51}
52
53/// Assigns a unique number to each of the types supported by Event::Value.
54const fn type_id_for_value(val: &Value) -> TypeId {
55    match val {
56        Value::Bytes(_) => 0,
57        Value::Timestamp(_) => 1,
58        Value::Integer(_) => 2,
59        Value::Float(_) => 3,
60        Value::Boolean(_) => 4,
61        Value::Object(_) => 5,
62        Value::Array(_) => 6,
63        Value::Null => 7,
64        Value::Regex(_) => 8,
65    }
66}
67
68impl Dedupe {
69    pub fn new(num_entries: NonZeroUsize, fields: FieldMatchConfig) -> Self {
70        Self {
71            fields,
72            cache: LruCache::new(num_entries),
73        }
74    }
75
76    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
77        let cache_entry = build_cache_entry(&event, &self.fields);
78        if self.cache.put(cache_entry, true).is_some() {
79            emit!(DedupeEventsDropped { count: 1 });
80            None
81        } else {
82            Some(event)
83        }
84    }
85}
86
87/// Takes in an Event and returns a CacheEntry to place into the LRU cache
88/// containing all relevant information for the fields that need matching
89/// against according to the specified FieldMatchConfig.
90pub(crate) fn build_cache_entry(event: &Event, fields: &FieldMatchConfig) -> CacheEntry {
91    match &fields {
92        FieldMatchConfig::MatchFields(fields) => {
93            let mut entry = Vec::new();
94            for field_name in fields.iter() {
95                if let Some(value) = event.as_log().get(field_name) {
96                    entry.push(Some((type_id_for_value(value), value.coerce_to_bytes())));
97                } else {
98                    entry.push(None);
99                }
100            }
101            CacheEntry::Match(entry)
102        }
103        FieldMatchConfig::IgnoreFields(fields) => {
104            let mut entry = Vec::new();
105
106            if let Some(event_fields) = event.as_log().all_event_fields()
107                && let Some(metadata_fields) = event.as_log().all_metadata_fields()
108            {
109                for (field_name, value) in event_fields.chain(metadata_fields) {
110                    if let Ok(path) = ConfigTargetPath::try_from(field_name)
111                        && !fields.contains(&path)
112                    {
113                        entry.push((path.0, type_id_for_value(value), value.coerce_to_bytes()));
114                    }
115                }
116            }
117
118            CacheEntry::Ignore(entry)
119        }
120    }
121}
122
123impl TaskTransform<Event> for Dedupe {
124    fn transform(
125        self: Box<Self>,
126        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
127    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
128    where
129        Self: 'static,
130    {
131        let mut inner = self;
132        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
133    }
134}