vector/transforms/dedupe/
config.rs

1use vector_lib::{
2    config::{LogNamespace, clone_input_definitions},
3    configurable::configurable_component,
4};
5
6use super::{
7    common::{
8        CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
9        fill_default_fields_match,
10    },
11    timed_transform::TimedDedupe,
12    transform::Dedupe,
13};
14use crate::{
15    config::{
16        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
17        TransformOutput,
18    },
19    schema,
20    transforms::Transform,
21};
22
23/// Configuration for the `dedupe` transform.
24#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct DedupeConfig {
28    #[configurable(derived)]
29    #[serde(default)]
30    pub fields: Option<FieldMatchConfig>,
31
32    #[configurable(derived)]
33    #[serde(default = "default_cache_config")]
34    pub cache: CacheConfig,
35
36    #[configurable(derived)]
37    #[serde(default)]
38    pub time_settings: Option<TimedCacheConfig>,
39}
40
41impl GenerateConfig for DedupeConfig {
42    fn generate_config() -> toml::Value {
43        toml::Value::try_from(Self {
44            fields: None,
45            cache: default_cache_config(),
46            time_settings: None,
47        })
48        .unwrap()
49    }
50}
51
52#[async_trait::async_trait]
53#[typetag::serde(name = "dedupe")]
54impl TransformConfig for DedupeConfig {
55    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
56        if let Some(time_config) = &self.time_settings {
57            Ok(Transform::event_task(TimedDedupe::new(
58                self.cache.num_events,
59                fill_default_fields_match(self.fields.as_ref()),
60                time_config.clone(),
61            )))
62        } else {
63            Ok(Transform::event_task(Dedupe::new(
64                self.cache.num_events,
65                fill_default_fields_match(self.fields.as_ref()),
66            )))
67        }
68    }
69
70    fn input(&self) -> Input {
71        Input::log()
72    }
73
74    fn outputs(
75        &self,
76        _: vector_lib::enrichment::TableRegistry,
77        input_definitions: &[(OutputId, schema::Definition)],
78        _: LogNamespace,
79    ) -> Vec<TransformOutput> {
80        vec![TransformOutput::new(
81            DataType::Log,
82            clone_input_definitions(input_definitions),
83        )]
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use std::{sync::Arc, time::Duration};
90
91    use tokio::sync::mpsc;
92    use tokio_stream::wrappers::ReceiverStream;
93    use vector_lib::{
94        config::{ComponentKey, OutputId},
95        lookup::lookup_v2::ConfigTargetPath,
96    };
97
98    use crate::{
99        config::schema::Definition,
100        event::{Event, LogEvent, ObjectMap, Value},
101        test_util::components::assert_transform_compliance,
102        transforms::{
103            dedupe::{
104                common::TimedCacheConfig,
105                config::{CacheConfig, DedupeConfig, FieldMatchConfig},
106            },
107            test::create_topology,
108        },
109    };
110
111    #[test]
112    fn generate_config() {
113        crate::test_util::test_generate_config::<DedupeConfig>();
114    }
115
116    const fn make_match_transform_config(
117        num_events: usize,
118        fields: Vec<ConfigTargetPath>,
119    ) -> DedupeConfig {
120        DedupeConfig {
121            cache: CacheConfig {
122                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
123            },
124            fields: Some(FieldMatchConfig::MatchFields(fields)),
125            time_settings: None,
126        }
127    }
128
129    fn make_ignore_transform_config(
130        num_events: usize,
131        given_fields: Vec<ConfigTargetPath>,
132    ) -> DedupeConfig {
133        // "message" and "timestamp" are added automatically to all Events
134        let mut fields = vec!["message".into(), "timestamp".into()];
135        fields.extend(given_fields);
136
137        DedupeConfig {
138            cache: CacheConfig {
139                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
140            },
141            fields: Some(FieldMatchConfig::IgnoreFields(fields)),
142            time_settings: None,
143        }
144    }
145
146    #[tokio::test]
147    async fn dedupe_match_basic() {
148        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
149        basic(transform_config, "matched", "unmatched").await;
150    }
151
152    #[tokio::test]
153    async fn dedupe_ignore_basic() {
154        let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
155        basic(transform_config, "matched", "unmatched").await;
156    }
157
158    #[tokio::test]
159    async fn dedupe_ignore_with_metadata_field() {
160        let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
161        basic(transform_config, "matched", "%ignored").await;
162    }
163
164    async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
165        assert_transform_compliance(async {
166            let (tx, rx) = mpsc::channel(1);
167            let (topology, mut out) =
168                create_topology(ReceiverStream::new(rx), transform_config).await;
169
170            let mut event1 = Event::Log(LogEvent::from("message"));
171            event1.as_mut_log().insert(first_path, "some value");
172            event1.as_mut_log().insert(second_path, "another value");
173
174            // Test that unmatched field isn't considered
175            let mut event2 = Event::Log(LogEvent::from("message"));
176            event2.as_mut_log().insert(first_path, "some value2");
177            event2.as_mut_log().insert(second_path, "another value");
178
179            // Test that matched field is considered
180            let mut event3 = Event::Log(LogEvent::from("message"));
181            event3.as_mut_log().insert(first_path, "some value");
182            event3.as_mut_log().insert(second_path, "another value2");
183
184            // First event should always be passed through as-is.
185            tx.send(event1.clone()).await.unwrap();
186            let new_event = out.recv().await.unwrap();
187
188            event1.set_source_id(Arc::new(ComponentKey::from("in")));
189            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
190            // the schema definition is copied from the source for dedupe
191            event1
192                .metadata_mut()
193                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
194            assert_eq!(new_event, event1);
195
196            // Second event differs in matched field so should be output even though it
197            // has the same value for unmatched field.
198            tx.send(event2.clone()).await.unwrap();
199            let new_event = out.recv().await.unwrap();
200
201            event2.set_source_id(Arc::new(ComponentKey::from("in")));
202            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
203            // the schema definition is copied from the source for dedupe
204            event2
205                .metadata_mut()
206                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
207            assert_eq!(new_event, event2);
208
209            // Third event has the same value for "matched" as first event, so it should be dropped.
210            tx.send(event3.clone()).await.unwrap();
211
212            drop(tx);
213            topology.stop().await;
214            assert_eq!(out.recv().await, None);
215        })
216        .await;
217    }
218
219    #[tokio::test]
220    async fn dedupe_match_field_name_matters() {
221        let transform_config =
222            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
223        field_name_matters(transform_config).await;
224    }
225
226    #[tokio::test]
227    async fn dedupe_ignore_field_name_matters() {
228        let transform_config = make_ignore_transform_config(5, vec![]);
229        field_name_matters(transform_config).await;
230    }
231
232    async fn field_name_matters(transform_config: DedupeConfig) {
233        assert_transform_compliance(async {
234            let (tx, rx) = mpsc::channel(1);
235            let (topology, mut out) =
236                create_topology(ReceiverStream::new(rx), transform_config).await;
237
238            let mut event1 = Event::Log(LogEvent::from("message"));
239            event1.as_mut_log().insert("matched1", "some value");
240
241            let mut event2 = Event::Log(LogEvent::from("message"));
242            event2.as_mut_log().insert("matched2", "some value");
243
244            // First event should always be passed through as-is.
245            tx.send(event1.clone()).await.unwrap();
246            let new_event = out.recv().await.unwrap();
247
248            event1.set_source_id(Arc::new(ComponentKey::from("in")));
249            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
250            // the schema definition is copied from the source for dedupe
251            event1
252                .metadata_mut()
253                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
254            assert_eq!(new_event, event1);
255
256            // Second event has a different matched field name with the same value,
257            // so it should not be considered a dupe
258            tx.send(event2.clone()).await.unwrap();
259            let new_event = out.recv().await.unwrap();
260
261            event2.set_source_id(Arc::new(ComponentKey::from("in")));
262            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
263            // the schema definition is copied from the source for dedupe
264            event2
265                .metadata_mut()
266                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
267            assert_eq!(new_event, event2);
268
269            drop(tx);
270            topology.stop().await;
271            assert_eq!(out.recv().await, None);
272        })
273        .await;
274    }
275
276    #[tokio::test]
277    async fn dedupe_match_field_order_irrelevant() {
278        let transform_config =
279            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
280        field_order_irrelevant(transform_config).await;
281    }
282
283    #[tokio::test]
284    async fn dedupe_ignore_field_order_irrelevant() {
285        let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
286        field_order_irrelevant(transform_config).await;
287    }
288
289    /// Test that two Events that are considered duplicates get handled that
290    /// way, even if the order of the matched fields is different between the
291    /// two.
292    async fn field_order_irrelevant(transform_config: DedupeConfig) {
293        assert_transform_compliance(async {
294            let (tx, rx) = mpsc::channel(1);
295            let (topology, mut out) =
296                create_topology(ReceiverStream::new(rx), transform_config).await;
297
298            let mut event1 = Event::Log(LogEvent::from("message"));
299            event1.as_mut_log().insert("matched1", "value1");
300            event1.as_mut_log().insert("matched2", "value2");
301
302            // Add fields in opposite order
303            let mut event2 = Event::Log(LogEvent::from("message"));
304            event2.as_mut_log().insert("matched2", "value2");
305            event2.as_mut_log().insert("matched1", "value1");
306
307            // First event should always be passed through as-is.
308            tx.send(event1.clone()).await.unwrap();
309            let new_event = out.recv().await.unwrap();
310
311            event1.set_source_id(Arc::new(ComponentKey::from("in")));
312            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
313            // the schema definition is copied from the source for dedupe
314            event1
315                .metadata_mut()
316                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
317            assert_eq!(new_event, event1);
318
319            // Second event is the same just with different field order, so it
320            // shouldn't be output.
321            tx.send(event2).await.unwrap();
322
323            drop(tx);
324            topology.stop().await;
325            assert_eq!(out.recv().await, None);
326        })
327        .await;
328    }
329
330    #[tokio::test]
331    async fn dedupe_match_age_out() {
332        // Construct transform with a cache size of only 1 entry.
333        let transform_config = make_match_transform_config(1, vec!["matched".into()]);
334        age_out(transform_config).await;
335    }
336
337    #[tokio::test]
338    async fn dedupe_ignore_age_out() {
339        // Construct transform with a cache size of only 1 entry.
340        let transform_config = make_ignore_transform_config(1, vec![]);
341        age_out(transform_config).await;
342    }
343
344    /// Test the eviction behavior of the underlying LruCache
345    async fn age_out(transform_config: DedupeConfig) {
346        assert_transform_compliance(async {
347            let (tx, rx) = mpsc::channel(1);
348            let (topology, mut out) =
349                create_topology(ReceiverStream::new(rx), transform_config).await;
350
351            let mut event1 = Event::Log(LogEvent::from("message"));
352            event1.as_mut_log().insert("matched", "some value");
353
354            let mut event2 = Event::Log(LogEvent::from("message"));
355            event2.as_mut_log().insert("matched", "some value2");
356
357            // First event should always be passed through as-is.
358            tx.send(event1.clone()).await.unwrap();
359            let new_event = out.recv().await.unwrap();
360
361            event1.set_source_id(Arc::new(ComponentKey::from("in")));
362            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
363
364            // the schema definition is copied from the source for dedupe
365            event1
366                .metadata_mut()
367                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
368            assert_eq!(new_event, event1);
369
370            // Second event gets output because it's not a dupe. This causes the first
371            // Event to be evicted from the cache.
372            tx.send(event2.clone()).await.unwrap();
373            let new_event = out.recv().await.unwrap();
374
375            event2.set_source_id(Arc::new(ComponentKey::from("in")));
376            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
377            // the schema definition is copied from the source for dedupe
378            event2
379                .metadata_mut()
380                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
381
382            assert_eq!(new_event, event2);
383
384            // Third event is a dupe but gets output anyway because the first
385            // event has aged out of the cache.
386            tx.send(event1.clone()).await.unwrap();
387            let new_event = out.recv().await.unwrap();
388
389            event1.set_source_id(Arc::new(ComponentKey::from("in")));
390            assert_eq!(new_event, event1);
391
392            drop(tx);
393            topology.stop().await;
394            assert_eq!(out.recv().await, None);
395        })
396        .await;
397    }
398
399    #[tokio::test]
400    async fn dedupe_match_timed_age_out() {
401        // Construct transform with timed cache
402        let transform_config = DedupeConfig {
403            time_settings: Some(TimedCacheConfig {
404                max_age_ms: Duration::from_millis(100),
405                refresh_on_drop: false,
406            }),
407            ..make_match_transform_config(5, vec!["matched".into()])
408        };
409        timed_age_out(transform_config).await;
410    }
411
412    #[tokio::test]
413    async fn dedupe_ignore_timed_age_out() {
414        // Construct transform with timed cache
415        let transform_config = DedupeConfig {
416            time_settings: Some(TimedCacheConfig {
417                max_age_ms: Duration::from_millis(100),
418                refresh_on_drop: false,
419            }),
420            ..make_ignore_transform_config(1, vec![])
421        };
422        timed_age_out(transform_config).await;
423    }
424
425    /// Test the eviction behavior of the underlying LruCache
426    async fn timed_age_out(transform_config: DedupeConfig) {
427        assert_transform_compliance(async {
428            let (tx, rx) = mpsc::channel(1);
429            let (topology, mut out) =
430                create_topology(ReceiverStream::new(rx), transform_config).await;
431
432            let mut event1 = Event::Log(LogEvent::from("message"));
433            event1.as_mut_log().insert("matched", "some value");
434
435            // First event should always be passed through as-is.
436            tx.send(event1.clone()).await.unwrap();
437            let new_event = out.recv().await.unwrap();
438
439            event1.set_source_id(Arc::new(ComponentKey::from("in")));
440            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
441
442            // the schema definition is copied from the source for dedupe
443            event1
444                .metadata_mut()
445                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
446            assert_eq!(new_event, event1);
447
448            // Second time the event gets dropped because it's a dupe.
449            tx.send(event1.clone()).await.unwrap();
450
451            tokio::time::sleep(Duration::from_millis(101)).await;
452
453            // Third time the event is a dupe but enought time has passed to accept it.
454            tx.send(event1.clone()).await.unwrap();
455            let new_event = out.recv().await.unwrap();
456
457            event1.set_source_id(Arc::new(ComponentKey::from("in")));
458            assert_eq!(new_event, event1);
459
460            drop(tx);
461            topology.stop().await;
462            assert_eq!(out.recv().await, None);
463        })
464        .await;
465    }
466
467    #[tokio::test]
468    async fn dedupe_match_type_matching() {
469        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
470        type_matching(transform_config).await;
471    }
472
473    #[tokio::test]
474    async fn dedupe_ignore_type_matching() {
475        let transform_config = make_ignore_transform_config(5, vec![]);
476        type_matching(transform_config).await;
477    }
478
479    /// Test that two events with values for the matched fields that have
480    /// different types but the same string representation aren't considered
481    /// duplicates.
482    async fn type_matching(transform_config: DedupeConfig) {
483        assert_transform_compliance(async {
484            let (tx, rx) = mpsc::channel(1);
485            let (topology, mut out) =
486                create_topology(ReceiverStream::new(rx), transform_config).await;
487
488            let mut event1 = Event::Log(LogEvent::from("message"));
489            event1.as_mut_log().insert("matched", "123");
490
491            let mut event2 = Event::Log(LogEvent::from("message"));
492            event2.as_mut_log().insert("matched", 123);
493
494            // First event should always be passed through as-is.
495            tx.send(event1.clone()).await.unwrap();
496            let new_event = out.recv().await.unwrap();
497
498            event1.set_source_id(Arc::new(ComponentKey::from("in")));
499            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
500            // the schema definition is copied from the source for dedupe
501            event1
502                .metadata_mut()
503                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
504            assert_eq!(new_event, event1);
505
506            // Second event should also get passed through even though the string
507            // representations of "matched" are the same.
508            tx.send(event2.clone()).await.unwrap();
509            let new_event = out.recv().await.unwrap();
510
511            event2.set_source_id(Arc::new(ComponentKey::from("in")));
512            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
513            // the schema definition is copied from the source for dedupe
514            event2
515                .metadata_mut()
516                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
517            assert_eq!(new_event, event2);
518
519            drop(tx);
520            topology.stop().await;
521            assert_eq!(out.recv().await, None);
522        })
523        .await;
524    }
525
526    #[tokio::test]
527    async fn dedupe_match_type_matching_nested_objects() {
528        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
529        type_matching_nested_objects(transform_config).await;
530    }
531
532    #[tokio::test]
533    async fn dedupe_ignore_type_matching_nested_objects() {
534        let transform_config = make_ignore_transform_config(5, vec![]);
535        type_matching_nested_objects(transform_config).await;
536    }
537
538    /// Test that two events where the matched field is a sub object and that
539    /// object contains values that have different types but the same string
540    /// representation aren't considered duplicates.
541    async fn type_matching_nested_objects(transform_config: DedupeConfig) {
542        assert_transform_compliance(async {
543            let (tx, rx) = mpsc::channel(1);
544            let (topology, mut out) =
545                create_topology(ReceiverStream::new(rx), transform_config).await;
546
547            let mut map1 = ObjectMap::new();
548            map1.insert("key".into(), "123".into());
549            let mut event1 = Event::Log(LogEvent::from("message"));
550            event1.as_mut_log().insert("matched", map1);
551
552            let mut map2 = ObjectMap::new();
553            map2.insert("key".into(), 123.into());
554            let mut event2 = Event::Log(LogEvent::from("message"));
555            event2.as_mut_log().insert("matched", map2);
556
557            // First event should always be passed through as-is.
558            tx.send(event1.clone()).await.unwrap();
559            let new_event = out.recv().await.unwrap();
560
561            event1.set_source_id(Arc::new(ComponentKey::from("in")));
562            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
563            // the schema definition is copied from the source for dedupe
564            event1
565                .metadata_mut()
566                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
567            assert_eq!(new_event, event1);
568
569            // Second event should also get passed through even though the string
570            // representations of "matched" are the same.
571            tx.send(event2.clone()).await.unwrap();
572            let new_event = out.recv().await.unwrap();
573
574            event2.set_source_id(Arc::new(ComponentKey::from("in")));
575            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
576            // the schema definition is copied from the source for dedupe
577            event2
578                .metadata_mut()
579                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
580            assert_eq!(new_event, event2);
581
582            drop(tx);
583            topology.stop().await;
584            assert_eq!(out.recv().await, None);
585        })
586        .await;
587    }
588
589    #[tokio::test]
590    async fn dedupe_match_null_vs_missing() {
591        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
592        ignore_vs_missing(transform_config).await;
593    }
594
595    #[tokio::test]
596    async fn dedupe_ignore_null_vs_missing() {
597        let transform_config = make_ignore_transform_config(5, vec![]);
598        ignore_vs_missing(transform_config).await;
599    }
600
601    /// Test an explicit null vs a field being missing are treated as different.
602    async fn ignore_vs_missing(transform_config: DedupeConfig) {
603        assert_transform_compliance(async {
604            let (tx, rx) = mpsc::channel(1);
605            let (topology, mut out) =
606                create_topology(ReceiverStream::new(rx), transform_config).await;
607
608            let mut event1 = Event::Log(LogEvent::from("message"));
609            event1.as_mut_log().insert("matched", Value::Null);
610
611            let mut event2 = Event::Log(LogEvent::from("message"));
612
613            // First event should always be passed through as-is.
614            tx.send(event1.clone()).await.unwrap();
615            let new_event = out.recv().await.unwrap();
616
617            event1.set_source_id(Arc::new(ComponentKey::from("in")));
618            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
619            // the schema definition is copied from the source for dedupe
620            event1
621                .metadata_mut()
622                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
623            assert_eq!(new_event, event1);
624
625            // Second event should also get passed through as null is different than
626            // missing
627            tx.send(event2.clone()).await.unwrap();
628            let new_event = out.recv().await.unwrap();
629
630            event2.set_source_id(Arc::new(ComponentKey::from("in")));
631            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
632            // the schema definition is copied from the source for dedupe
633            event2
634                .metadata_mut()
635                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
636            assert_eq!(new_event, event2);
637
638            drop(tx);
639            topology.stop().await;
640            assert_eq!(out.recv().await, None);
641        })
642        .await;
643    }
644}