vector/transforms/dedupe/
config.rs

1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::{
4    common::{
5        CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
6        fill_default_fields_match,
7    },
8    timed_transform::TimedDedupe,
9    transform::Dedupe,
10};
11use crate::{
12    config::{
13        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14        TransformOutput,
15    },
16    schema,
17    transforms::Transform,
18};
19
20/// Configuration for the `dedupe` transform.
21#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct DedupeConfig {
25    #[configurable(derived)]
26    #[serde(default)]
27    pub fields: Option<FieldMatchConfig>,
28
29    #[configurable(derived)]
30    #[serde(default = "default_cache_config")]
31    pub cache: CacheConfig,
32
33    #[configurable(derived)]
34    #[serde(default)]
35    pub time_settings: Option<TimedCacheConfig>,
36}
37
38impl GenerateConfig for DedupeConfig {
39    fn generate_config() -> toml::Value {
40        toml::Value::try_from(Self {
41            fields: None,
42            cache: default_cache_config(),
43            time_settings: None,
44        })
45        .unwrap()
46    }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "dedupe")]
51impl TransformConfig for DedupeConfig {
52    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
53        if let Some(time_config) = &self.time_settings {
54            Ok(Transform::event_task(TimedDedupe::new(
55                self.cache.num_events,
56                fill_default_fields_match(self.fields.as_ref()),
57                time_config.clone(),
58            )))
59        } else {
60            Ok(Transform::event_task(Dedupe::new(
61                self.cache.num_events,
62                fill_default_fields_match(self.fields.as_ref()),
63            )))
64        }
65    }
66
67    fn input(&self) -> Input {
68        Input::log()
69    }
70
71    fn outputs(
72        &self,
73        _: &TransformContext,
74        input_definitions: &[(OutputId, schema::Definition)],
75    ) -> Vec<TransformOutput> {
76        vec![TransformOutput::new(
77            DataType::Log,
78            clone_input_definitions(input_definitions),
79        )]
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use std::{sync::Arc, time::Duration};
86
87    use tokio::sync::mpsc;
88    use tokio_stream::wrappers::ReceiverStream;
89    use vector_lib::{
90        config::{ComponentKey, OutputId},
91        lookup::lookup_v2::ConfigTargetPath,
92    };
93
94    use crate::{
95        config::schema::Definition,
96        event::{Event, LogEvent, ObjectMap, Value},
97        test_util::components::assert_transform_compliance,
98        transforms::{
99            dedupe::{
100                common::TimedCacheConfig,
101                config::{CacheConfig, DedupeConfig, FieldMatchConfig},
102            },
103            test::create_topology,
104        },
105    };
106
107    const TEST_SOURCE_COMPONENT_ID: &str = "in";
108    const TEST_UPSTREAM_COMPONENT_ID: &str = "transform";
109    const TEST_SOURCE_TYPE: &str = "unit_test_stream";
110
111    fn set_expected_metadata(event: &mut Event) {
112        event.set_source_id(Arc::new(ComponentKey::from(TEST_SOURCE_COMPONENT_ID)));
113        event.set_upstream_id(Arc::new(OutputId::from(TEST_UPSTREAM_COMPONENT_ID)));
114        event.set_source_type(TEST_SOURCE_TYPE);
115        // The schema definition is copied from the source for dedupe.
116        event
117            .metadata_mut()
118            .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
119    }
120
121    #[test]
122    fn generate_config() {
123        crate::test_util::test_generate_config::<DedupeConfig>();
124    }
125
126    const fn make_match_transform_config(
127        num_events: usize,
128        fields: Vec<ConfigTargetPath>,
129    ) -> DedupeConfig {
130        DedupeConfig {
131            cache: CacheConfig {
132                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
133            },
134            fields: Some(FieldMatchConfig::MatchFields(fields)),
135            time_settings: None,
136        }
137    }
138
139    fn make_ignore_transform_config(
140        num_events: usize,
141        given_fields: Vec<ConfigTargetPath>,
142    ) -> DedupeConfig {
143        // "message" and "timestamp" are added automatically to all Events
144        let mut fields = vec!["message".into(), "timestamp".into()];
145        fields.extend(given_fields);
146
147        DedupeConfig {
148            cache: CacheConfig {
149                num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
150            },
151            fields: Some(FieldMatchConfig::IgnoreFields(fields)),
152            time_settings: None,
153        }
154    }
155
156    #[tokio::test]
157    async fn dedupe_match_basic() {
158        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
159        basic(transform_config, "matched", "unmatched").await;
160    }
161
162    #[tokio::test]
163    async fn dedupe_ignore_basic() {
164        let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
165        basic(transform_config, "matched", "unmatched").await;
166    }
167
168    #[tokio::test]
169    async fn dedupe_ignore_with_metadata_field() {
170        let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
171        basic(transform_config, "matched", "%ignored").await;
172    }
173
174    async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
175        assert_transform_compliance(async {
176            let (tx, rx) = mpsc::channel(1);
177            let (topology, mut out) =
178                create_topology(ReceiverStream::new(rx), transform_config).await;
179
180            let mut event1 = Event::Log(LogEvent::from("message"));
181            event1.as_mut_log().insert(first_path, "some value");
182            event1.as_mut_log().insert(second_path, "another value");
183
184            // Test that unmatched field isn't considered
185            let mut event2 = Event::Log(LogEvent::from("message"));
186            event2.as_mut_log().insert(first_path, "some value2");
187            event2.as_mut_log().insert(second_path, "another value");
188
189            // Test that matched field is considered
190            let mut event3 = Event::Log(LogEvent::from("message"));
191            event3.as_mut_log().insert(first_path, "some value");
192            event3.as_mut_log().insert(second_path, "another value2");
193
194            // First event should always be passed through as-is.
195            tx.send(event1.clone()).await.unwrap();
196            let new_event = out.recv().await.unwrap();
197
198            set_expected_metadata(&mut event1);
199            assert_eq!(new_event, event1);
200
201            // Second event differs in matched field so should be output even though it
202            // has the same value for unmatched field.
203            tx.send(event2.clone()).await.unwrap();
204            let new_event = out.recv().await.unwrap();
205
206            set_expected_metadata(&mut event2);
207            assert_eq!(new_event, event2);
208
209            // Third event has the same value for "matched" as first event, so it should be dropped.
210            tx.send(event3.clone()).await.unwrap();
211
212            drop(tx);
213            topology.stop().await;
214            assert_eq!(out.recv().await, None);
215        })
216        .await;
217    }
218
219    #[tokio::test]
220    async fn dedupe_match_field_name_matters() {
221        let transform_config =
222            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
223        field_name_matters(transform_config).await;
224    }
225
226    #[tokio::test]
227    async fn dedupe_ignore_field_name_matters() {
228        let transform_config = make_ignore_transform_config(5, vec![]);
229        field_name_matters(transform_config).await;
230    }
231
232    async fn field_name_matters(transform_config: DedupeConfig) {
233        assert_transform_compliance(async {
234            let (tx, rx) = mpsc::channel(1);
235            let (topology, mut out) =
236                create_topology(ReceiverStream::new(rx), transform_config).await;
237
238            let mut event1 = Event::Log(LogEvent::from("message"));
239            event1.as_mut_log().insert("matched1", "some value");
240
241            let mut event2 = Event::Log(LogEvent::from("message"));
242            event2.as_mut_log().insert("matched2", "some value");
243
244            // First event should always be passed through as-is.
245            tx.send(event1.clone()).await.unwrap();
246            let new_event = out.recv().await.unwrap();
247
248            set_expected_metadata(&mut event1);
249            assert_eq!(new_event, event1);
250
251            // Second event has a different matched field name with the same value,
252            // so it should not be considered a dupe
253            tx.send(event2.clone()).await.unwrap();
254            let new_event = out.recv().await.unwrap();
255
256            set_expected_metadata(&mut event2);
257            assert_eq!(new_event, event2);
258
259            drop(tx);
260            topology.stop().await;
261            assert_eq!(out.recv().await, None);
262        })
263        .await;
264    }
265
266    #[tokio::test]
267    async fn dedupe_match_field_order_irrelevant() {
268        let transform_config =
269            make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
270        field_order_irrelevant(transform_config).await;
271    }
272
273    #[tokio::test]
274    async fn dedupe_ignore_field_order_irrelevant() {
275        let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
276        field_order_irrelevant(transform_config).await;
277    }
278
279    /// Test that two Events that are considered duplicates get handled that
280    /// way, even if the order of the matched fields is different between the
281    /// two.
282    async fn field_order_irrelevant(transform_config: DedupeConfig) {
283        assert_transform_compliance(async {
284            let (tx, rx) = mpsc::channel(1);
285            let (topology, mut out) =
286                create_topology(ReceiverStream::new(rx), transform_config).await;
287
288            let mut event1 = Event::Log(LogEvent::from("message"));
289            event1.as_mut_log().insert("matched1", "value1");
290            event1.as_mut_log().insert("matched2", "value2");
291
292            // Add fields in opposite order
293            let mut event2 = Event::Log(LogEvent::from("message"));
294            event2.as_mut_log().insert("matched2", "value2");
295            event2.as_mut_log().insert("matched1", "value1");
296
297            // First event should always be passed through as-is.
298            tx.send(event1.clone()).await.unwrap();
299            let new_event = out.recv().await.unwrap();
300
301            set_expected_metadata(&mut event1);
302            assert_eq!(new_event, event1);
303
304            // Second event is the same just with different field order, so it
305            // shouldn't be output.
306            tx.send(event2).await.unwrap();
307
308            drop(tx);
309            topology.stop().await;
310            assert_eq!(out.recv().await, None);
311        })
312        .await;
313    }
314
315    #[tokio::test]
316    async fn dedupe_match_age_out() {
317        // Construct transform with a cache size of only 1 entry.
318        let transform_config = make_match_transform_config(1, vec!["matched".into()]);
319        age_out(transform_config).await;
320    }
321
322    #[tokio::test]
323    async fn dedupe_ignore_age_out() {
324        // Construct transform with a cache size of only 1 entry.
325        let transform_config = make_ignore_transform_config(1, vec![]);
326        age_out(transform_config).await;
327    }
328
329    /// Test the eviction behavior of the underlying LruCache
330    async fn age_out(transform_config: DedupeConfig) {
331        assert_transform_compliance(async {
332            let (tx, rx) = mpsc::channel(1);
333            let (topology, mut out) =
334                create_topology(ReceiverStream::new(rx), transform_config).await;
335
336            let mut event1 = Event::Log(LogEvent::from("message"));
337            event1.as_mut_log().insert("matched", "some value");
338
339            let mut event2 = Event::Log(LogEvent::from("message"));
340            event2.as_mut_log().insert("matched", "some value2");
341
342            // First event should always be passed through as-is.
343            tx.send(event1.clone()).await.unwrap();
344            let new_event = out.recv().await.unwrap();
345
346            set_expected_metadata(&mut event1);
347            assert_eq!(new_event, event1);
348
349            // Second event gets output because it's not a dupe. This causes the first
350            // Event to be evicted from the cache.
351            tx.send(event2.clone()).await.unwrap();
352            let new_event = out.recv().await.unwrap();
353
354            set_expected_metadata(&mut event2);
355
356            assert_eq!(new_event, event2);
357
358            // Third event is a dupe but gets output anyway because the first
359            // event has aged out of the cache.
360            tx.send(event1.clone()).await.unwrap();
361            let new_event = out.recv().await.unwrap();
362
363            set_expected_metadata(&mut event1);
364            assert_eq!(new_event, event1);
365
366            drop(tx);
367            topology.stop().await;
368            assert_eq!(out.recv().await, None);
369        })
370        .await;
371    }
372
373    #[tokio::test]
374    async fn dedupe_match_timed_age_out() {
375        // Construct transform with timed cache
376        let transform_config = DedupeConfig {
377            time_settings: Some(TimedCacheConfig {
378                max_age_ms: Duration::from_millis(100),
379                refresh_on_drop: false,
380            }),
381            ..make_match_transform_config(5, vec!["matched".into()])
382        };
383        timed_age_out(transform_config).await;
384    }
385
386    #[tokio::test]
387    async fn dedupe_ignore_timed_age_out() {
388        // Construct transform with timed cache
389        let transform_config = DedupeConfig {
390            time_settings: Some(TimedCacheConfig {
391                max_age_ms: Duration::from_millis(100),
392                refresh_on_drop: false,
393            }),
394            ..make_ignore_transform_config(1, vec![])
395        };
396        timed_age_out(transform_config).await;
397    }
398
399    /// Test the eviction behavior of the underlying LruCache
400    async fn timed_age_out(transform_config: DedupeConfig) {
401        assert_transform_compliance(async {
402            let (tx, rx) = mpsc::channel(1);
403            let (topology, mut out) =
404                create_topology(ReceiverStream::new(rx), transform_config).await;
405
406            let mut event1 = Event::Log(LogEvent::from("message"));
407            event1.as_mut_log().insert("matched", "some value");
408
409            // First event should always be passed through as-is.
410            tx.send(event1.clone()).await.unwrap();
411            let new_event = out.recv().await.unwrap();
412
413            set_expected_metadata(&mut event1);
414            assert_eq!(new_event, event1);
415
416            // Second time the event gets dropped because it's a dupe.
417            tx.send(event1.clone()).await.unwrap();
418
419            tokio::time::sleep(Duration::from_millis(101)).await;
420
421            // Third time the event is a dupe but enought time has passed to accept it.
422            tx.send(event1.clone()).await.unwrap();
423            let new_event = out.recv().await.unwrap();
424
425            set_expected_metadata(&mut event1);
426            assert_eq!(new_event, event1);
427
428            drop(tx);
429            topology.stop().await;
430            assert_eq!(out.recv().await, None);
431        })
432        .await;
433    }
434
435    #[tokio::test]
436    async fn dedupe_match_type_matching() {
437        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
438        type_matching(transform_config).await;
439    }
440
441    #[tokio::test]
442    async fn dedupe_ignore_type_matching() {
443        let transform_config = make_ignore_transform_config(5, vec![]);
444        type_matching(transform_config).await;
445    }
446
447    /// Test that two events with values for the matched fields that have
448    /// different types but the same string representation aren't considered
449    /// duplicates.
450    async fn type_matching(transform_config: DedupeConfig) {
451        assert_transform_compliance(async {
452            let (tx, rx) = mpsc::channel(1);
453            let (topology, mut out) =
454                create_topology(ReceiverStream::new(rx), transform_config).await;
455
456            let mut event1 = Event::Log(LogEvent::from("message"));
457            event1.as_mut_log().insert("matched", "123");
458
459            let mut event2 = Event::Log(LogEvent::from("message"));
460            event2.as_mut_log().insert("matched", 123);
461
462            // First event should always be passed through as-is.
463            tx.send(event1.clone()).await.unwrap();
464            let new_event = out.recv().await.unwrap();
465
466            set_expected_metadata(&mut event1);
467            assert_eq!(new_event, event1);
468
469            // Second event should also get passed through even though the string
470            // representations of "matched" are the same.
471            tx.send(event2.clone()).await.unwrap();
472            let new_event = out.recv().await.unwrap();
473
474            set_expected_metadata(&mut event2);
475            assert_eq!(new_event, event2);
476
477            drop(tx);
478            topology.stop().await;
479            assert_eq!(out.recv().await, None);
480        })
481        .await;
482    }
483
484    #[tokio::test]
485    async fn dedupe_match_type_matching_nested_objects() {
486        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
487        type_matching_nested_objects(transform_config).await;
488    }
489
490    #[tokio::test]
491    async fn dedupe_ignore_type_matching_nested_objects() {
492        let transform_config = make_ignore_transform_config(5, vec![]);
493        type_matching_nested_objects(transform_config).await;
494    }
495
496    /// Test that two events where the matched field is a sub object and that
497    /// object contains values that have different types but the same string
498    /// representation aren't considered duplicates.
499    async fn type_matching_nested_objects(transform_config: DedupeConfig) {
500        assert_transform_compliance(async {
501            let (tx, rx) = mpsc::channel(1);
502            let (topology, mut out) =
503                create_topology(ReceiverStream::new(rx), transform_config).await;
504
505            let mut map1 = ObjectMap::new();
506            map1.insert("key".into(), "123".into());
507            let mut event1 = Event::Log(LogEvent::from("message"));
508            event1.as_mut_log().insert("matched", map1);
509
510            let mut map2 = ObjectMap::new();
511            map2.insert("key".into(), 123.into());
512            let mut event2 = Event::Log(LogEvent::from("message"));
513            event2.as_mut_log().insert("matched", map2);
514
515            // First event should always be passed through as-is.
516            tx.send(event1.clone()).await.unwrap();
517            let new_event = out.recv().await.unwrap();
518
519            set_expected_metadata(&mut event1);
520            assert_eq!(new_event, event1);
521
522            // Second event should also get passed through even though the string
523            // representations of "matched" are the same.
524            tx.send(event2.clone()).await.unwrap();
525            let new_event = out.recv().await.unwrap();
526
527            set_expected_metadata(&mut event2);
528            assert_eq!(new_event, event2);
529
530            drop(tx);
531            topology.stop().await;
532            assert_eq!(out.recv().await, None);
533        })
534        .await;
535    }
536
537    #[tokio::test]
538    async fn dedupe_match_null_vs_missing() {
539        let transform_config = make_match_transform_config(5, vec!["matched".into()]);
540        ignore_vs_missing(transform_config).await;
541    }
542
543    #[tokio::test]
544    async fn dedupe_ignore_null_vs_missing() {
545        let transform_config = make_ignore_transform_config(5, vec![]);
546        ignore_vs_missing(transform_config).await;
547    }
548
549    /// Test an explicit null vs a field being missing are treated as different.
550    async fn ignore_vs_missing(transform_config: DedupeConfig) {
551        assert_transform_compliance(async {
552            let (tx, rx) = mpsc::channel(1);
553            let (topology, mut out) =
554                create_topology(ReceiverStream::new(rx), transform_config).await;
555
556            let mut event1 = Event::Log(LogEvent::from("message"));
557            event1.as_mut_log().insert("matched", Value::Null);
558
559            let mut event2 = Event::Log(LogEvent::from("message"));
560
561            // First event should always be passed through as-is.
562            tx.send(event1.clone()).await.unwrap();
563            let new_event = out.recv().await.unwrap();
564
565            set_expected_metadata(&mut event1);
566            assert_eq!(new_event, event1);
567
568            // Second event should also get passed through as null is different than
569            // missing
570            tx.send(event2.clone()).await.unwrap();
571            let new_event = out.recv().await.unwrap();
572
573            set_expected_metadata(&mut event2);
574            assert_eq!(new_event, event2);
575
576            drop(tx);
577            topology.stop().await;
578            assert_eq!(out.recv().await, None);
579        })
580        .await;
581    }
582}