vector/transforms/dedupe/
config.rs

1use vector_lib::{
2    config::{clone_input_definitions, LogNamespace},
3    configurable::configurable_component,
4};
5
6use crate::{
7    config::{
8        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
9        TransformOutput,
10    },
11    schema,
12    transforms::Transform,
13};
14
15use super::{
16    common::{
17        default_cache_config, fill_default_fields_match, CacheConfig, FieldMatchConfig,
18        TimedCacheConfig,
19    },
20    timed_transform::TimedDedupe,
21    transform::Dedupe,
22};
23
24/// Configuration for the `dedupe` transform.
25#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct DedupeConfig {
29    #[configurable(derived)]
30    #[serde(default)]
31    pub fields: Option<FieldMatchConfig>,
32
33    #[configurable(derived)]
34    #[serde(default = "default_cache_config")]
35    pub cache: CacheConfig,
36
37    #[configurable(derived)]
38    #[serde(default)]
39    pub time_settings: Option<TimedCacheConfig>,
40}
41
42impl GenerateConfig for DedupeConfig {
43    fn generate_config() -> toml::Value {
44        toml::Value::try_from(Self {
45            fields: None,
46            cache: default_cache_config(),
47            time_settings: None,
48        })
49        .unwrap()
50    }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde(name = "dedupe")]
55impl TransformConfig for DedupeConfig {
56    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
57        if let Some(time_config) = &self.time_settings {
58            Ok(Transform::event_task(TimedDedupe::new(
59                self.cache.num_events,
60                fill_default_fields_match(self.fields.as_ref()),
61                time_config.clone(),
62            )))
63        } else {
64            Ok(Transform::event_task(Dedupe::new(
65                self.cache.num_events,
66                fill_default_fields_match(self.fields.as_ref()),
67            )))
68        }
69    }
70
71    fn input(&self) -> Input {
72        Input::log()
73    }
74
75    fn outputs(
76        &self,
77        _: vector_lib::enrichment::TableRegistry,
78        input_definitions: &[(OutputId, schema::Definition)],
79        _: LogNamespace,
80    ) -> Vec<TransformOutput> {
81        vec![TransformOutput::new(
82            DataType::Log,
83            clone_input_definitions(input_definitions),
84        )]
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use std::sync::Arc;
91    use std::time::Duration;
92
93    use tokio::sync::mpsc;
94    use tokio_stream::wrappers::ReceiverStream;
95    use vector_lib::config::ComponentKey;
96    use vector_lib::config::OutputId;
97    use vector_lib::lookup::lookup_v2::ConfigTargetPath;
98
99    use crate::config::schema::Definition;
100    use crate::transforms::dedupe::common::TimedCacheConfig;
101    use crate::{
102        event::{Event, LogEvent, ObjectMap, Value},
103        test_util::components::assert_transform_compliance,
104        transforms::{
105            dedupe::config::{CacheConfig, DedupeConfig, FieldMatchConfig},
106            test::create_topology,
107        },
108    };
109
110    #[test]
111    fn generate_config() {
112        crate::test_util::test_generate_config::<DedupeConfig>();
113    }
114
115    const fn make_match_transform_config(
116        num_events: usize,
117        fields: Vec<ConfigTargetPath>,
118    ) -> DedupeConfig {
119        DedupeConfig {
120            cache: CacheConfig {
121                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
122            },
123            fields: Some(FieldMatchConfig::MatchFields(fields)),
124            time_settings: None,
125        }
126    }
127
128    fn make_ignore_transform_config(
129        num_events: usize,
130        given_fields: Vec<ConfigTargetPath>,
131    ) -> DedupeConfig {
132        // "message" and "timestamp" are added automatically to all Events
133        let mut fields = vec!["message".into(), "timestamp".into()];
134        fields.extend(given_fields);
135
136        DedupeConfig {
137            cache: CacheConfig {
138                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
139            },
140            fields: Some(FieldMatchConfig::IgnoreFields(fields)),
141            time_settings: None,
142        }
143    }
144
145    #[tokio::test]
146    async fn dedupe_match_basic() {
147        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
148        basic(transform_config, "matched", "unmatched").await;
149    }
150
151    #[tokio::test]
152    async fn dedupe_ignore_basic() {
153        let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
154        basic(transform_config, "matched", "unmatched").await;
155    }
156
157    #[tokio::test]
158    async fn dedupe_ignore_with_metadata_field() {
159        let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
160        basic(transform_config, "matched", "%ignored").await;
161    }
162
163    async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
164        assert_transform_compliance(async {
165            let (tx, rx) = mpsc::channel(1);
166            let (topology, mut out) =
167                create_topology(ReceiverStream::new(rx), transform_config).await;
168
169            let mut event1 = Event::Log(LogEvent::from("message"));
170            event1.as_mut_log().insert(first_path, "some value");
171            event1.as_mut_log().insert(second_path, "another value");
172
173            // Test that unmatched field isn't considered
174            let mut event2 = Event::Log(LogEvent::from("message"));
175            event2.as_mut_log().insert(first_path, "some value2");
176            event2.as_mut_log().insert(second_path, "another value");
177
178            // Test that matched field is considered
179            let mut event3 = Event::Log(LogEvent::from("message"));
180            event3.as_mut_log().insert(first_path, "some value");
181            event3.as_mut_log().insert(second_path, "another value2");
182
183            // First event should always be passed through as-is.
184            tx.send(event1.clone()).await.unwrap();
185            let new_event = out.recv().await.unwrap();
186
187            event1.set_source_id(Arc::new(ComponentKey::from("in")));
188            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
189            // the schema definition is copied from the source for dedupe
190            event1
191                .metadata_mut()
192                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
193            assert_eq!(new_event, event1);
194
195            // Second event differs in matched field so should be output even though it
196            // has the same value for unmatched field.
197            tx.send(event2.clone()).await.unwrap();
198            let new_event = out.recv().await.unwrap();
199
200            event2.set_source_id(Arc::new(ComponentKey::from("in")));
201            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
202            // the schema definition is copied from the source for dedupe
203            event2
204                .metadata_mut()
205                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
206            assert_eq!(new_event, event2);
207
208            // Third event has the same value for "matched" as first event, so it should be dropped.
209            tx.send(event3.clone()).await.unwrap();
210
211            drop(tx);
212            topology.stop().await;
213            assert_eq!(out.recv().await, None);
214        })
215        .await;
216    }
217
218    #[tokio::test]
219    async fn dedupe_match_field_name_matters() {
220        let transform_config =
221            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
222        field_name_matters(transform_config).await;
223    }
224
225    #[tokio::test]
226    async fn dedupe_ignore_field_name_matters() {
227        let transform_config = make_ignore_transform_config(5, vec![]);
228        field_name_matters(transform_config).await;
229    }
230
231    async fn field_name_matters(transform_config: DedupeConfig) {
232        assert_transform_compliance(async {
233            let (tx, rx) = mpsc::channel(1);
234            let (topology, mut out) =
235                create_topology(ReceiverStream::new(rx), transform_config).await;
236
237            let mut event1 = Event::Log(LogEvent::from("message"));
238            event1.as_mut_log().insert("matched1", "some value");
239
240            let mut event2 = Event::Log(LogEvent::from("message"));
241            event2.as_mut_log().insert("matched2", "some value");
242
243            // First event should always be passed through as-is.
244            tx.send(event1.clone()).await.unwrap();
245            let new_event = out.recv().await.unwrap();
246
247            event1.set_source_id(Arc::new(ComponentKey::from("in")));
248            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
249            // the schema definition is copied from the source for dedupe
250            event1
251                .metadata_mut()
252                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
253            assert_eq!(new_event, event1);
254
255            // Second event has a different matched field name with the same value,
256            // so it should not be considered a dupe
257            tx.send(event2.clone()).await.unwrap();
258            let new_event = out.recv().await.unwrap();
259
260            event2.set_source_id(Arc::new(ComponentKey::from("in")));
261            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
262            // the schema definition is copied from the source for dedupe
263            event2
264                .metadata_mut()
265                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
266            assert_eq!(new_event, event2);
267
268            drop(tx);
269            topology.stop().await;
270            assert_eq!(out.recv().await, None);
271        })
272        .await;
273    }
274
275    #[tokio::test]
276    async fn dedupe_match_field_order_irrelevant() {
277        let transform_config =
278            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
279        field_order_irrelevant(transform_config).await;
280    }
281
282    #[tokio::test]
283    async fn dedupe_ignore_field_order_irrelevant() {
284        let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
285        field_order_irrelevant(transform_config).await;
286    }
287
288    /// Test that two Events that are considered duplicates get handled that
289    /// way, even if the order of the matched fields is different between the
290    /// two.
291    async fn field_order_irrelevant(transform_config: DedupeConfig) {
292        assert_transform_compliance(async {
293            let (tx, rx) = mpsc::channel(1);
294            let (topology, mut out) =
295                create_topology(ReceiverStream::new(rx), transform_config).await;
296
297            let mut event1 = Event::Log(LogEvent::from("message"));
298            event1.as_mut_log().insert("matched1", "value1");
299            event1.as_mut_log().insert("matched2", "value2");
300
301            // Add fields in opposite order
302            let mut event2 = Event::Log(LogEvent::from("message"));
303            event2.as_mut_log().insert("matched2", "value2");
304            event2.as_mut_log().insert("matched1", "value1");
305
306            // First event should always be passed through as-is.
307            tx.send(event1.clone()).await.unwrap();
308            let new_event = out.recv().await.unwrap();
309
310            event1.set_source_id(Arc::new(ComponentKey::from("in")));
311            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
312            // the schema definition is copied from the source for dedupe
313            event1
314                .metadata_mut()
315                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
316            assert_eq!(new_event, event1);
317
318            // Second event is the same just with different field order, so it
319            // shouldn't be output.
320            tx.send(event2).await.unwrap();
321
322            drop(tx);
323            topology.stop().await;
324            assert_eq!(out.recv().await, None);
325        })
326        .await;
327    }
328
329    #[tokio::test]
330    async fn dedupe_match_age_out() {
331        // Construct transform with a cache size of only 1 entry.
332        let transform_config = make_match_transform_config(1, vec!["matched".into()]);
333        age_out(transform_config).await;
334    }
335
336    #[tokio::test]
337    async fn dedupe_ignore_age_out() {
338        // Construct transform with a cache size of only 1 entry.
339        let transform_config = make_ignore_transform_config(1, vec![]);
340        age_out(transform_config).await;
341    }
342
343    /// Test the eviction behavior of the underlying LruCache
344    async fn age_out(transform_config: DedupeConfig) {
345        assert_transform_compliance(async {
346            let (tx, rx) = mpsc::channel(1);
347            let (topology, mut out) =
348                create_topology(ReceiverStream::new(rx), transform_config).await;
349
350            let mut event1 = Event::Log(LogEvent::from("message"));
351            event1.as_mut_log().insert("matched", "some value");
352
353            let mut event2 = Event::Log(LogEvent::from("message"));
354            event2.as_mut_log().insert("matched", "some value2");
355
356            // First event should always be passed through as-is.
357            tx.send(event1.clone()).await.unwrap();
358            let new_event = out.recv().await.unwrap();
359
360            event1.set_source_id(Arc::new(ComponentKey::from("in")));
361            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
362
363            // the schema definition is copied from the source for dedupe
364            event1
365                .metadata_mut()
366                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
367            assert_eq!(new_event, event1);
368
369            // Second event gets output because it's not a dupe. This causes the first
370            // Event to be evicted from the cache.
371            tx.send(event2.clone()).await.unwrap();
372            let new_event = out.recv().await.unwrap();
373
374            event2.set_source_id(Arc::new(ComponentKey::from("in")));
375            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
376            // the schema definition is copied from the source for dedupe
377            event2
378                .metadata_mut()
379                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
380
381            assert_eq!(new_event, event2);
382
383            // Third event is a dupe but gets output anyway because the first
384            // event has aged out of the cache.
385            tx.send(event1.clone()).await.unwrap();
386            let new_event = out.recv().await.unwrap();
387
388            event1.set_source_id(Arc::new(ComponentKey::from("in")));
389            assert_eq!(new_event, event1);
390
391            drop(tx);
392            topology.stop().await;
393            assert_eq!(out.recv().await, None);
394        })
395        .await;
396    }
397
398    #[tokio::test]
399    async fn dedupe_match_timed_age_out() {
400        // Construct transform with timed cache
401        let transform_config = DedupeConfig {
402            time_settings: Some(TimedCacheConfig {
403                max_age_ms: Duration::from_millis(100),
404                refresh_on_drop: false,
405            }),
406            ..make_match_transform_config(5, vec!["matched".into()])
407        };
408        timed_age_out(transform_config).await;
409    }
410
411    #[tokio::test]
412    async fn dedupe_ignore_timed_age_out() {
413        // Construct transform with timed cache
414        let transform_config = DedupeConfig {
415            time_settings: Some(TimedCacheConfig {
416                max_age_ms: Duration::from_millis(100),
417                refresh_on_drop: false,
418            }),
419            ..make_ignore_transform_config(1, vec![])
420        };
421        timed_age_out(transform_config).await;
422    }
423
424    /// Test the eviction behavior of the underlying LruCache
425    async fn timed_age_out(transform_config: DedupeConfig) {
426        assert_transform_compliance(async {
427            let (tx, rx) = mpsc::channel(1);
428            let (topology, mut out) =
429                create_topology(ReceiverStream::new(rx), transform_config).await;
430
431            let mut event1 = Event::Log(LogEvent::from("message"));
432            event1.as_mut_log().insert("matched", "some value");
433
434            // First event should always be passed through as-is.
435            tx.send(event1.clone()).await.unwrap();
436            let new_event = out.recv().await.unwrap();
437
438            event1.set_source_id(Arc::new(ComponentKey::from("in")));
439            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
440
441            // the schema definition is copied from the source for dedupe
442            event1
443                .metadata_mut()
444                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
445            assert_eq!(new_event, event1);
446
447            // Second time the event gets dropped because it's a dupe.
448            tx.send(event1.clone()).await.unwrap();
449
450            tokio::time::sleep(Duration::from_millis(101)).await;
451
452            // Third time the event is a dupe but enought time has passed to accept it.
453            tx.send(event1.clone()).await.unwrap();
454            let new_event = out.recv().await.unwrap();
455
456            event1.set_source_id(Arc::new(ComponentKey::from("in")));
457            assert_eq!(new_event, event1);
458
459            drop(tx);
460            topology.stop().await;
461            assert_eq!(out.recv().await, None);
462        })
463        .await;
464    }
465
466    #[tokio::test]
467    async fn dedupe_match_type_matching() {
468        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
469        type_matching(transform_config).await;
470    }
471
472    #[tokio::test]
473    async fn dedupe_ignore_type_matching() {
474        let transform_config = make_ignore_transform_config(5, vec![]);
475        type_matching(transform_config).await;
476    }
477
478    /// Test that two events with values for the matched fields that have
479    /// different types but the same string representation aren't considered
480    /// duplicates.
481    async fn type_matching(transform_config: DedupeConfig) {
482        assert_transform_compliance(async {
483            let (tx, rx) = mpsc::channel(1);
484            let (topology, mut out) =
485                create_topology(ReceiverStream::new(rx), transform_config).await;
486
487            let mut event1 = Event::Log(LogEvent::from("message"));
488            event1.as_mut_log().insert("matched", "123");
489
490            let mut event2 = Event::Log(LogEvent::from("message"));
491            event2.as_mut_log().insert("matched", 123);
492
493            // First event should always be passed through as-is.
494            tx.send(event1.clone()).await.unwrap();
495            let new_event = out.recv().await.unwrap();
496
497            event1.set_source_id(Arc::new(ComponentKey::from("in")));
498            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
499            // the schema definition is copied from the source for dedupe
500            event1
501                .metadata_mut()
502                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
503            assert_eq!(new_event, event1);
504
505            // Second event should also get passed through even though the string
506            // representations of "matched" are the same.
507            tx.send(event2.clone()).await.unwrap();
508            let new_event = out.recv().await.unwrap();
509
510            event2.set_source_id(Arc::new(ComponentKey::from("in")));
511            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
512            // the schema definition is copied from the source for dedupe
513            event2
514                .metadata_mut()
515                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
516            assert_eq!(new_event, event2);
517
518            drop(tx);
519            topology.stop().await;
520            assert_eq!(out.recv().await, None);
521        })
522        .await;
523    }
524
525    #[tokio::test]
526    async fn dedupe_match_type_matching_nested_objects() {
527        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
528        type_matching_nested_objects(transform_config).await;
529    }
530
531    #[tokio::test]
532    async fn dedupe_ignore_type_matching_nested_objects() {
533        let transform_config = make_ignore_transform_config(5, vec![]);
534        type_matching_nested_objects(transform_config).await;
535    }
536
537    /// Test that two events where the matched field is a sub object and that
538    /// object contains values that have different types but the same string
539    /// representation aren't considered duplicates.
540    async fn type_matching_nested_objects(transform_config: DedupeConfig) {
541        assert_transform_compliance(async {
542            let (tx, rx) = mpsc::channel(1);
543            let (topology, mut out) =
544                create_topology(ReceiverStream::new(rx), transform_config).await;
545
546            let mut map1 = ObjectMap::new();
547            map1.insert("key".into(), "123".into());
548            let mut event1 = Event::Log(LogEvent::from("message"));
549            event1.as_mut_log().insert("matched", map1);
550
551            let mut map2 = ObjectMap::new();
552            map2.insert("key".into(), 123.into());
553            let mut event2 = Event::Log(LogEvent::from("message"));
554            event2.as_mut_log().insert("matched", map2);
555
556            // First event should always be passed through as-is.
557            tx.send(event1.clone()).await.unwrap();
558            let new_event = out.recv().await.unwrap();
559
560            event1.set_source_id(Arc::new(ComponentKey::from("in")));
561            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
562            // the schema definition is copied from the source for dedupe
563            event1
564                .metadata_mut()
565                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
566            assert_eq!(new_event, event1);
567
568            // Second event should also get passed through even though the string
569            // representations of "matched" are the same.
570            tx.send(event2.clone()).await.unwrap();
571            let new_event = out.recv().await.unwrap();
572
573            event2.set_source_id(Arc::new(ComponentKey::from("in")));
574            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
575            // the schema definition is copied from the source for dedupe
576            event2
577                .metadata_mut()
578                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
579            assert_eq!(new_event, event2);
580
581            drop(tx);
582            topology.stop().await;
583            assert_eq!(out.recv().await, None);
584        })
585        .await;
586    }
587
588    #[tokio::test]
589    async fn dedupe_match_null_vs_missing() {
590        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
591        ignore_vs_missing(transform_config).await;
592    }
593
594    #[tokio::test]
595    async fn dedupe_ignore_null_vs_missing() {
596        let transform_config = make_ignore_transform_config(5, vec![]);
597        ignore_vs_missing(transform_config).await;
598    }
599
600    /// Test an explicit null vs a field being missing are treated as different.
601    async fn ignore_vs_missing(transform_config: DedupeConfig) {
602        assert_transform_compliance(async {
603            let (tx, rx) = mpsc::channel(1);
604            let (topology, mut out) =
605                create_topology(ReceiverStream::new(rx), transform_config).await;
606
607            let mut event1 = Event::Log(LogEvent::from("message"));
608            event1.as_mut_log().insert("matched", Value::Null);
609
610            let mut event2 = Event::Log(LogEvent::from("message"));
611
612            // First event should always be passed through as-is.
613            tx.send(event1.clone()).await.unwrap();
614            let new_event = out.recv().await.unwrap();
615
616            event1.set_source_id(Arc::new(ComponentKey::from("in")));
617            event1.set_upstream_id(Arc::new(OutputId::from("transform")));
618            // the schema definition is copied from the source for dedupe
619            event1
620                .metadata_mut()
621                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
622            assert_eq!(new_event, event1);
623
624            // Second event should also get passed through as null is different than
625            // missing
626            tx.send(event2.clone()).await.unwrap();
627            let new_event = out.recv().await.unwrap();
628
629            event2.set_source_id(Arc::new(ComponentKey::from("in")));
630            event2.set_upstream_id(Arc::new(OutputId::from("transform")));
631            // the schema definition is copied from the source for dedupe
632            event2
633                .metadata_mut()
634                .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
635            assert_eq!(new_event, event2);
636
637            drop(tx);
638            topology.stop().await;
639            assert_eq!(out.recv().await, None);
640        })
641        .await;
642    }
643}