vector/transforms/dedupe/
config.rs

1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::{
4    common::{
5        CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
6        fill_default_fields_match,
7    },
8    timed_transform::TimedDedupe,
9    transform::Dedupe,
10};
11use crate::{
12    config::{
13        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14        TransformOutput,
15    },
16    schema,
17    transforms::Transform,
18};
19
20/// Configuration for the `dedupe` transform.
21#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct DedupeConfig {
25    #[configurable(derived)]
26    #[serde(default)]
27    pub fields: Option<FieldMatchConfig>,
28
29    #[configurable(derived)]
30    #[serde(default = "default_cache_config")]
31    pub cache: CacheConfig,
32
33    #[configurable(derived)]
34    #[serde(default)]
35    pub time_settings: Option<TimedCacheConfig>,
36}
37
38impl GenerateConfig for DedupeConfig {
39    fn generate_config() -> toml::Value {
40        toml::Value::try_from(Self {
41            fields: None,
42            cache: default_cache_config(),
43            time_settings: None,
44        })
45        .unwrap()
46    }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "dedupe")]
51impl TransformConfig for DedupeConfig {
52    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
53        if let Some(time_config) = &self.time_settings {
54            Ok(Transform::event_task(TimedDedupe::new(
55                self.cache.num_events,
56                fill_default_fields_match(self.fields.as_ref()),
57                time_config.clone(),
58            )))
59        } else {
60            Ok(Transform::event_task(Dedupe::new(
61                self.cache.num_events,
62                fill_default_fields_match(self.fields.as_ref()),
63            )))
64        }
65    }
66
67    fn input(&self) -> Input {
68        Input::log()
69    }
70
71    fn outputs(
72        &self,
73        _: &TransformContext,
74        input_definitions: &[(OutputId, schema::Definition)],
75    ) -> Vec<TransformOutput> {
76        vec![TransformOutput::new(
77            DataType::Log,
78            clone_input_definitions(input_definitions),
79        )]
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use std::{sync::Arc, time::Duration};
86
87    use tokio::sync::mpsc;
88    use tokio_stream::wrappers::ReceiverStream;
89    use vector_lib::{
90        config::{ComponentKey, OutputId},
91        lookup::lookup_v2::ConfigTargetPath,
92    };
93
94    use crate::{
95        config::schema::Definition,
96        event::{Event, LogEvent, ObjectMap, Value},
97        test_util::components::assert_transform_compliance,
98        transforms::{
99            dedupe::{
100                common::TimedCacheConfig,
101                config::{CacheConfig, DedupeConfig, FieldMatchConfig},
102            },
103            test::create_topology,
104        },
105    };
106
107    #[test]
108    fn generate_config() {
109        crate::test_util::test_generate_config::<DedupeConfig>();
110    }
111
112    const fn make_match_transform_config(
113        num_events: usize,
114        fields: Vec<ConfigTargetPath>,
115    ) -> DedupeConfig {
116        DedupeConfig {
117            cache: CacheConfig {
118                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
119            },
120            fields: Some(FieldMatchConfig::MatchFields(fields)),
121            time_settings: None,
122        }
123    }
124
125    fn make_ignore_transform_config(
126        num_events: usize,
127        given_fields: Vec<ConfigTargetPath>,
128    ) -> DedupeConfig {
129        // "message" and "timestamp" are added automatically to all Events
130        let mut fields = vec!["message".into(), "timestamp".into()];
131        fields.extend(given_fields);
132
133        DedupeConfig {
134            cache: CacheConfig {
135                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
136            },
137            fields: Some(FieldMatchConfig::IgnoreFields(fields)),
138            time_settings: None,
139        }
140    }
141
142    #[tokio::test]
143    async fn dedupe_match_basic() {
144        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
145        basic(transform_config, "matched", "unmatched").await;
146    }
147
148    #[tokio::test]
149    async fn dedupe_ignore_basic() {
150        let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
151        basic(transform_config, "matched", "unmatched").await;
152    }
153
154    #[tokio::test]
155    async fn dedupe_ignore_with_metadata_field() {
156        let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
157        basic(transform_config, "matched", "%ignored").await;
158    }
159
160    async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
161        assert_transform_compliance(async {
162            let (tx, rx) = mpsc::channel(1);
163            let (topology, mut out) =
164                create_topology(ReceiverStream::new(rx), transform_config).await;
165
166            let mut event1 = Event::Log(LogEvent::from("message"));
167            event1.as_mut_log().insert(first_path, "some value");
168            event1.as_mut_log().insert(second_path, "another value");
169
170            // Test that unmatched field isn't considered
171            let mut event2 = Event::Log(LogEvent::from("message"));
172            event2.as_mut_log().insert(first_path, "some value2");
173            event2.as_mut_log().insert(second_path, "another value");
174
175            // Test that matched field is considered
176            let mut event3 = Event::Log(LogEvent::from("message"));
177            event3.as_mut_log().insert(first_path, "some value");
178            event3.as_mut_log().insert(second_path, "another value2");
179
180            // First event should always be passed through as-is.
181            tx.send(event1.clone()).await.unwrap();
182            let new_event = out.recv().await.unwrap();
183
184            event1.set_source_id(Arc::new(ComponentKey::from("in")));
185            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
186            // the schema definition is copied from the source for dedupe
187            event1
188                .metadata_mut()
189                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
190            assert_eq!(new_event, event1);
191
192            // Second event differs in matched field so should be output even though it
193            // has the same value for unmatched field.
194            tx.send(event2.clone()).await.unwrap();
195            let new_event = out.recv().await.unwrap();
196
197            event2.set_source_id(Arc::new(ComponentKey::from("in")));
198            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
199            // the schema definition is copied from the source for dedupe
200            event2
201                .metadata_mut()
202                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
203            assert_eq!(new_event, event2);
204
205            // Third event has the same value for "matched" as first event, so it should be dropped.
206            tx.send(event3.clone()).await.unwrap();
207
208            drop(tx);
209            topology.stop().await;
210            assert_eq!(out.recv().await, None);
211        })
212        .await;
213    }
214
215    #[tokio::test]
216    async fn dedupe_match_field_name_matters() {
217        let transform_config =
218            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
219        field_name_matters(transform_config).await;
220    }
221
222    #[tokio::test]
223    async fn dedupe_ignore_field_name_matters() {
224        let transform_config = make_ignore_transform_config(5, vec![]);
225        field_name_matters(transform_config).await;
226    }
227
228    async fn field_name_matters(transform_config: DedupeConfig) {
229        assert_transform_compliance(async {
230            let (tx, rx) = mpsc::channel(1);
231            let (topology, mut out) =
232                create_topology(ReceiverStream::new(rx), transform_config).await;
233
234            let mut event1 = Event::Log(LogEvent::from("message"));
235            event1.as_mut_log().insert("matched1", "some value");
236
237            let mut event2 = Event::Log(LogEvent::from("message"));
238            event2.as_mut_log().insert("matched2", "some value");
239
240            // First event should always be passed through as-is.
241            tx.send(event1.clone()).await.unwrap();
242            let new_event = out.recv().await.unwrap();
243
244            event1.set_source_id(Arc::new(ComponentKey::from("in")));
245            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
246            // the schema definition is copied from the source for dedupe
247            event1
248                .metadata_mut()
249                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
250            assert_eq!(new_event, event1);
251
252            // Second event has a different matched field name with the same value,
253            // so it should not be considered a dupe
254            tx.send(event2.clone()).await.unwrap();
255            let new_event = out.recv().await.unwrap();
256
257            event2.set_source_id(Arc::new(ComponentKey::from("in")));
258            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
259            // the schema definition is copied from the source for dedupe
260            event2
261                .metadata_mut()
262                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
263            assert_eq!(new_event, event2);
264
265            drop(tx);
266            topology.stop().await;
267            assert_eq!(out.recv().await, None);
268        })
269        .await;
270    }
271
272    #[tokio::test]
273    async fn dedupe_match_field_order_irrelevant() {
274        let transform_config =
275            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
276        field_order_irrelevant(transform_config).await;
277    }
278
279    #[tokio::test]
280    async fn dedupe_ignore_field_order_irrelevant() {
281        let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
282        field_order_irrelevant(transform_config).await;
283    }
284
285    /// Test that two Events that are considered duplicates get handled that
286    /// way, even if the order of the matched fields is different between the
287    /// two.
288    async fn field_order_irrelevant(transform_config: DedupeConfig) {
289        assert_transform_compliance(async {
290            let (tx, rx) = mpsc::channel(1);
291            let (topology, mut out) =
292                create_topology(ReceiverStream::new(rx), transform_config).await;
293
294            let mut event1 = Event::Log(LogEvent::from("message"));
295            event1.as_mut_log().insert("matched1", "value1");
296            event1.as_mut_log().insert("matched2", "value2");
297
298            // Add fields in opposite order
299            let mut event2 = Event::Log(LogEvent::from("message"));
300            event2.as_mut_log().insert("matched2", "value2");
301            event2.as_mut_log().insert("matched1", "value1");
302
303            // First event should always be passed through as-is.
304            tx.send(event1.clone()).await.unwrap();
305            let new_event = out.recv().await.unwrap();
306
307            event1.set_source_id(Arc::new(ComponentKey::from("in")));
308            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
309            // the schema definition is copied from the source for dedupe
310            event1
311                .metadata_mut()
312                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
313            assert_eq!(new_event, event1);
314
315            // Second event is the same just with different field order, so it
316            // shouldn't be output.
317            tx.send(event2).await.unwrap();
318
319            drop(tx);
320            topology.stop().await;
321            assert_eq!(out.recv().await, None);
322        })
323        .await;
324    }
325
326    #[tokio::test]
327    async fn dedupe_match_age_out() {
328        // Construct transform with a cache size of only 1 entry.
329        let transform_config = make_match_transform_config(1, vec!["matched".into()]);
330        age_out(transform_config).await;
331    }
332
333    #[tokio::test]
334    async fn dedupe_ignore_age_out() {
335        // Construct transform with a cache size of only 1 entry.
336        let transform_config = make_ignore_transform_config(1, vec![]);
337        age_out(transform_config).await;
338    }
339
340    /// Test the eviction behavior of the underlying LruCache
341    async fn age_out(transform_config: DedupeConfig) {
342        assert_transform_compliance(async {
343            let (tx, rx) = mpsc::channel(1);
344            let (topology, mut out) =
345                create_topology(ReceiverStream::new(rx), transform_config).await;
346
347            let mut event1 = Event::Log(LogEvent::from("message"));
348            event1.as_mut_log().insert("matched", "some value");
349
350            let mut event2 = Event::Log(LogEvent::from("message"));
351            event2.as_mut_log().insert("matched", "some value2");
352
353            // First event should always be passed through as-is.
354            tx.send(event1.clone()).await.unwrap();
355            let new_event = out.recv().await.unwrap();
356
357            event1.set_source_id(Arc::new(ComponentKey::from("in")));
358            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
359
360            // the schema definition is copied from the source for dedupe
361            event1
362                .metadata_mut()
363                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
364            assert_eq!(new_event, event1);
365
366            // Second event gets output because it's not a dupe. This causes the first
367            // Event to be evicted from the cache.
368            tx.send(event2.clone()).await.unwrap();
369            let new_event = out.recv().await.unwrap();
370
371            event2.set_source_id(Arc::new(ComponentKey::from("in")));
372            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
373            // the schema definition is copied from the source for dedupe
374            event2
375                .metadata_mut()
376                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
377
378            assert_eq!(new_event, event2);
379
380            // Third event is a dupe but gets output anyway because the first
381            // event has aged out of the cache.
382            tx.send(event1.clone()).await.unwrap();
383            let new_event = out.recv().await.unwrap();
384
385            event1.set_source_id(Arc::new(ComponentKey::from("in")));
386            assert_eq!(new_event, event1);
387
388            drop(tx);
389            topology.stop().await;
390            assert_eq!(out.recv().await, None);
391        })
392        .await;
393    }
394
395    #[tokio::test]
396    async fn dedupe_match_timed_age_out() {
397        // Construct transform with timed cache
398        let transform_config = DedupeConfig {
399            time_settings: Some(TimedCacheConfig {
400                max_age_ms: Duration::from_millis(100),
401                refresh_on_drop: false,
402            }),
403            ..make_match_transform_config(5, vec!["matched".into()])
404        };
405        timed_age_out(transform_config).await;
406    }
407
408    #[tokio::test]
409    async fn dedupe_ignore_timed_age_out() {
410        // Construct transform with timed cache
411        let transform_config = DedupeConfig {
412            time_settings: Some(TimedCacheConfig {
413                max_age_ms: Duration::from_millis(100),
414                refresh_on_drop: false,
415            }),
416            ..make_ignore_transform_config(1, vec![])
417        };
418        timed_age_out(transform_config).await;
419    }
420
421    /// Test the eviction behavior of the underlying LruCache
422    async fn timed_age_out(transform_config: DedupeConfig) {
423        assert_transform_compliance(async {
424            let (tx, rx) = mpsc::channel(1);
425            let (topology, mut out) =
426                create_topology(ReceiverStream::new(rx), transform_config).await;
427
428            let mut event1 = Event::Log(LogEvent::from("message"));
429            event1.as_mut_log().insert("matched", "some value");
430
431            // First event should always be passed through as-is.
432            tx.send(event1.clone()).await.unwrap();
433            let new_event = out.recv().await.unwrap();
434
435            event1.set_source_id(Arc::new(ComponentKey::from("in")));
436            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
437
438            // the schema definition is copied from the source for dedupe
439            event1
440                .metadata_mut()
441                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
442            assert_eq!(new_event, event1);
443
444            // Second time the event gets dropped because it's a dupe.
445            tx.send(event1.clone()).await.unwrap();
446
447            tokio::time::sleep(Duration::from_millis(101)).await;
448
449            // Third time the event is a dupe but enought time has passed to accept it.
450            tx.send(event1.clone()).await.unwrap();
451            let new_event = out.recv().await.unwrap();
452
453            event1.set_source_id(Arc::new(ComponentKey::from("in")));
454            assert_eq!(new_event, event1);
455
456            drop(tx);
457            topology.stop().await;
458            assert_eq!(out.recv().await, None);
459        })
460        .await;
461    }
462
463    #[tokio::test]
464    async fn dedupe_match_type_matching() {
465        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
466        type_matching(transform_config).await;
467    }
468
469    #[tokio::test]
470    async fn dedupe_ignore_type_matching() {
471        let transform_config = make_ignore_transform_config(5, vec![]);
472        type_matching(transform_config).await;
473    }
474
475    /// Test that two events with values for the matched fields that have
476    /// different types but the same string representation aren't considered
477    /// duplicates.
478    async fn type_matching(transform_config: DedupeConfig) {
479        assert_transform_compliance(async {
480            let (tx, rx) = mpsc::channel(1);
481            let (topology, mut out) =
482                create_topology(ReceiverStream::new(rx), transform_config).await;
483
484            let mut event1 = Event::Log(LogEvent::from("message"));
485            event1.as_mut_log().insert("matched", "123");
486
487            let mut event2 = Event::Log(LogEvent::from("message"));
488            event2.as_mut_log().insert("matched", 123);
489
490            // First event should always be passed through as-is.
491            tx.send(event1.clone()).await.unwrap();
492            let new_event = out.recv().await.unwrap();
493
494            event1.set_source_id(Arc::new(ComponentKey::from("in")));
495            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
496            // the schema definition is copied from the source for dedupe
497            event1
498                .metadata_mut()
499                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
500            assert_eq!(new_event, event1);
501
502            // Second event should also get passed through even though the string
503            // representations of "matched" are the same.
504            tx.send(event2.clone()).await.unwrap();
505            let new_event = out.recv().await.unwrap();
506
507            event2.set_source_id(Arc::new(ComponentKey::from("in")));
508            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
509            // the schema definition is copied from the source for dedupe
510            event2
511                .metadata_mut()
512                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
513            assert_eq!(new_event, event2);
514
515            drop(tx);
516            topology.stop().await;
517            assert_eq!(out.recv().await, None);
518        })
519        .await;
520    }
521
522    #[tokio::test]
523    async fn dedupe_match_type_matching_nested_objects() {
524        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
525        type_matching_nested_objects(transform_config).await;
526    }
527
528    #[tokio::test]
529    async fn dedupe_ignore_type_matching_nested_objects() {
530        let transform_config = make_ignore_transform_config(5, vec![]);
531        type_matching_nested_objects(transform_config).await;
532    }
533
534    /// Test that two events where the matched field is a sub object and that
535    /// object contains values that have different types but the same string
536    /// representation aren't considered duplicates.
537    async fn type_matching_nested_objects(transform_config: DedupeConfig) {
538        assert_transform_compliance(async {
539            let (tx, rx) = mpsc::channel(1);
540            let (topology, mut out) =
541                create_topology(ReceiverStream::new(rx), transform_config).await;
542
543            let mut map1 = ObjectMap::new();
544            map1.insert("key".into(), "123".into());
545            let mut event1 = Event::Log(LogEvent::from("message"));
546            event1.as_mut_log().insert("matched", map1);
547
548            let mut map2 = ObjectMap::new();
549            map2.insert("key".into(), 123.into());
550            let mut event2 = Event::Log(LogEvent::from("message"));
551            event2.as_mut_log().insert("matched", map2);
552
553            // First event should always be passed through as-is.
554            tx.send(event1.clone()).await.unwrap();
555            let new_event = out.recv().await.unwrap();
556
557            event1.set_source_id(Arc::new(ComponentKey::from("in")));
558            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
559            // the schema definition is copied from the source for dedupe
560            event1
561                .metadata_mut()
562                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
563            assert_eq!(new_event, event1);
564
565            // Second event should also get passed through even though the string
566            // representations of "matched" are the same.
567            tx.send(event2.clone()).await.unwrap();
568            let new_event = out.recv().await.unwrap();
569
570            event2.set_source_id(Arc::new(ComponentKey::from("in")));
571            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
572            // the schema definition is copied from the source for dedupe
573            event2
574                .metadata_mut()
575                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
576            assert_eq!(new_event, event2);
577
578            drop(tx);
579            topology.stop().await;
580            assert_eq!(out.recv().await, None);
581        })
582        .await;
583    }
584
585    #[tokio::test]
586    async fn dedupe_match_null_vs_missing() {
587        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
588        ignore_vs_missing(transform_config).await;
589    }
590
591    #[tokio::test]
592    async fn dedupe_ignore_null_vs_missing() {
593        let transform_config = make_ignore_transform_config(5, vec![]);
594        ignore_vs_missing(transform_config).await;
595    }
596
597    /// Test an explicit null vs a field being missing are treated as different.
598    async fn ignore_vs_missing(transform_config: DedupeConfig) {
599        assert_transform_compliance(async {
600            let (tx, rx) = mpsc::channel(1);
601            let (topology, mut out) =
602                create_topology(ReceiverStream::new(rx), transform_config).await;
603
604            let mut event1 = Event::Log(LogEvent::from("message"));
605            event1.as_mut_log().insert("matched", Value::Null);
606
607            let mut event2 = Event::Log(LogEvent::from("message"));
608
609            // First event should always be passed through as-is.
610            tx.send(event1.clone()).await.unwrap();
611            let new_event = out.recv().await.unwrap();
612
613            event1.set_source_id(Arc::new(ComponentKey::from("in")));
614            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
615            // the schema definition is copied from the source for dedupe
616            event1
617                .metadata_mut()
618                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
619            assert_eq!(new_event, event1);
620
621            // Second event should also get passed through as null is different than
622            // missing
623            tx.send(event2.clone()).await.unwrap();
624            let new_event = out.recv().await.unwrap();
625
626            event2.set_source_id(Arc::new(ComponentKey::from("in")));
627            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
628            // the schema definition is copied from the source for dedupe
629            event2
630                .metadata_mut()
631                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
632            assert_eq!(new_event, event2);
633
634            drop(tx);
635            topology.stop().await;
636            assert_eq!(out.recv().await, None);
637        })
638        .await;
639    }
640}