1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::{
4 common::{
5 CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
6 fill_default_fields_match,
7 },
8 timed_transform::TimedDedupe,
9 transform::Dedupe,
10};
11use crate::{
12 config::{
13 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14 TransformOutput,
15 },
16 schema,
17 transforms::Transform,
18};
19
20#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct DedupeConfig {
25 #[configurable(derived)]
26 #[serde(default)]
27 pub fields: Option<FieldMatchConfig>,
28
29 #[configurable(derived)]
30 #[serde(default = "default_cache_config")]
31 pub cache: CacheConfig,
32
33 #[configurable(derived)]
34 #[serde(default)]
35 pub time_settings: Option<TimedCacheConfig>,
36}
37
38impl GenerateConfig for DedupeConfig {
39 fn generate_config() -> toml::Value {
40 toml::Value::try_from(Self {
41 fields: None,
42 cache: default_cache_config(),
43 time_settings: None,
44 })
45 .unwrap()
46 }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "dedupe")]
51impl TransformConfig for DedupeConfig {
52 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
53 if let Some(time_config) = &self.time_settings {
54 Ok(Transform::event_task(TimedDedupe::new(
55 self.cache.num_events,
56 fill_default_fields_match(self.fields.as_ref()),
57 time_config.clone(),
58 )))
59 } else {
60 Ok(Transform::event_task(Dedupe::new(
61 self.cache.num_events,
62 fill_default_fields_match(self.fields.as_ref()),
63 )))
64 }
65 }
66
67 fn input(&self) -> Input {
68 Input::log()
69 }
70
71 fn outputs(
72 &self,
73 _: &TransformContext,
74 input_definitions: &[(OutputId, schema::Definition)],
75 ) -> Vec<TransformOutput> {
76 vec![TransformOutput::new(
77 DataType::Log,
78 clone_input_definitions(input_definitions),
79 )]
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use std::{sync::Arc, time::Duration};
86
87 use tokio::sync::mpsc;
88 use tokio_stream::wrappers::ReceiverStream;
89 use vector_lib::{
90 config::{ComponentKey, OutputId},
91 lookup::lookup_v2::ConfigTargetPath,
92 };
93
94 use crate::{
95 config::schema::Definition,
96 event::{Event, LogEvent, ObjectMap, Value},
97 test_util::components::assert_transform_compliance,
98 transforms::{
99 dedupe::{
100 common::TimedCacheConfig,
101 config::{CacheConfig, DedupeConfig, FieldMatchConfig},
102 },
103 test::create_topology,
104 },
105 };
106
107 #[test]
108 fn generate_config() {
109 crate::test_util::test_generate_config::<DedupeConfig>();
110 }
111
112 const fn make_match_transform_config(
113 num_events: usize,
114 fields: Vec<ConfigTargetPath>,
115 ) -> DedupeConfig {
116 DedupeConfig {
117 cache: CacheConfig {
118 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
119 },
120 fields: Some(FieldMatchConfig::MatchFields(fields)),
121 time_settings: None,
122 }
123 }
124
125 fn make_ignore_transform_config(
126 num_events: usize,
127 given_fields: Vec<ConfigTargetPath>,
128 ) -> DedupeConfig {
129 let mut fields = vec!["message".into(), "timestamp".into()];
131 fields.extend(given_fields);
132
133 DedupeConfig {
134 cache: CacheConfig {
135 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
136 },
137 fields: Some(FieldMatchConfig::IgnoreFields(fields)),
138 time_settings: None,
139 }
140 }
141
142 #[tokio::test]
143 async fn dedupe_match_basic() {
144 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
145 basic(transform_config, "matched", "unmatched").await;
146 }
147
148 #[tokio::test]
149 async fn dedupe_ignore_basic() {
150 let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
151 basic(transform_config, "matched", "unmatched").await;
152 }
153
154 #[tokio::test]
155 async fn dedupe_ignore_with_metadata_field() {
156 let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
157 basic(transform_config, "matched", "%ignored").await;
158 }
159
160 async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
161 assert_transform_compliance(async {
162 let (tx, rx) = mpsc::channel(1);
163 let (topology, mut out) =
164 create_topology(ReceiverStream::new(rx), transform_config).await;
165
166 let mut event1 = Event::Log(LogEvent::from("message"));
167 event1.as_mut_log().insert(first_path, "some value");
168 event1.as_mut_log().insert(second_path, "another value");
169
170 let mut event2 = Event::Log(LogEvent::from("message"));
172 event2.as_mut_log().insert(first_path, "some value2");
173 event2.as_mut_log().insert(second_path, "another value");
174
175 let mut event3 = Event::Log(LogEvent::from("message"));
177 event3.as_mut_log().insert(first_path, "some value");
178 event3.as_mut_log().insert(second_path, "another value2");
179
180 tx.send(event1.clone()).await.unwrap();
182 let new_event = out.recv().await.unwrap();
183
184 event1.set_source_id(Arc::new(ComponentKey::from("in")));
185 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
186 event1
188 .metadata_mut()
189 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
190 assert_eq!(new_event, event1);
191
192 tx.send(event2.clone()).await.unwrap();
195 let new_event = out.recv().await.unwrap();
196
197 event2.set_source_id(Arc::new(ComponentKey::from("in")));
198 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
199 event2
201 .metadata_mut()
202 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
203 assert_eq!(new_event, event2);
204
205 tx.send(event3.clone()).await.unwrap();
207
208 drop(tx);
209 topology.stop().await;
210 assert_eq!(out.recv().await, None);
211 })
212 .await;
213 }
214
215 #[tokio::test]
216 async fn dedupe_match_field_name_matters() {
217 let transform_config =
218 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
219 field_name_matters(transform_config).await;
220 }
221
222 #[tokio::test]
223 async fn dedupe_ignore_field_name_matters() {
224 let transform_config = make_ignore_transform_config(5, vec![]);
225 field_name_matters(transform_config).await;
226 }
227
228 async fn field_name_matters(transform_config: DedupeConfig) {
229 assert_transform_compliance(async {
230 let (tx, rx) = mpsc::channel(1);
231 let (topology, mut out) =
232 create_topology(ReceiverStream::new(rx), transform_config).await;
233
234 let mut event1 = Event::Log(LogEvent::from("message"));
235 event1.as_mut_log().insert("matched1", "some value");
236
237 let mut event2 = Event::Log(LogEvent::from("message"));
238 event2.as_mut_log().insert("matched2", "some value");
239
240 tx.send(event1.clone()).await.unwrap();
242 let new_event = out.recv().await.unwrap();
243
244 event1.set_source_id(Arc::new(ComponentKey::from("in")));
245 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
246 event1
248 .metadata_mut()
249 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
250 assert_eq!(new_event, event1);
251
252 tx.send(event2.clone()).await.unwrap();
255 let new_event = out.recv().await.unwrap();
256
257 event2.set_source_id(Arc::new(ComponentKey::from("in")));
258 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
259 event2
261 .metadata_mut()
262 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
263 assert_eq!(new_event, event2);
264
265 drop(tx);
266 topology.stop().await;
267 assert_eq!(out.recv().await, None);
268 })
269 .await;
270 }
271
272 #[tokio::test]
273 async fn dedupe_match_field_order_irrelevant() {
274 let transform_config =
275 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
276 field_order_irrelevant(transform_config).await;
277 }
278
279 #[tokio::test]
280 async fn dedupe_ignore_field_order_irrelevant() {
281 let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
282 field_order_irrelevant(transform_config).await;
283 }
284
285 async fn field_order_irrelevant(transform_config: DedupeConfig) {
289 assert_transform_compliance(async {
290 let (tx, rx) = mpsc::channel(1);
291 let (topology, mut out) =
292 create_topology(ReceiverStream::new(rx), transform_config).await;
293
294 let mut event1 = Event::Log(LogEvent::from("message"));
295 event1.as_mut_log().insert("matched1", "value1");
296 event1.as_mut_log().insert("matched2", "value2");
297
298 let mut event2 = Event::Log(LogEvent::from("message"));
300 event2.as_mut_log().insert("matched2", "value2");
301 event2.as_mut_log().insert("matched1", "value1");
302
303 tx.send(event1.clone()).await.unwrap();
305 let new_event = out.recv().await.unwrap();
306
307 event1.set_source_id(Arc::new(ComponentKey::from("in")));
308 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
309 event1
311 .metadata_mut()
312 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
313 assert_eq!(new_event, event1);
314
315 tx.send(event2).await.unwrap();
318
319 drop(tx);
320 topology.stop().await;
321 assert_eq!(out.recv().await, None);
322 })
323 .await;
324 }
325
326 #[tokio::test]
327 async fn dedupe_match_age_out() {
328 let transform_config = make_match_transform_config(1, vec!["matched".into()]);
330 age_out(transform_config).await;
331 }
332
333 #[tokio::test]
334 async fn dedupe_ignore_age_out() {
335 let transform_config = make_ignore_transform_config(1, vec![]);
337 age_out(transform_config).await;
338 }
339
340 async fn age_out(transform_config: DedupeConfig) {
342 assert_transform_compliance(async {
343 let (tx, rx) = mpsc::channel(1);
344 let (topology, mut out) =
345 create_topology(ReceiverStream::new(rx), transform_config).await;
346
347 let mut event1 = Event::Log(LogEvent::from("message"));
348 event1.as_mut_log().insert("matched", "some value");
349
350 let mut event2 = Event::Log(LogEvent::from("message"));
351 event2.as_mut_log().insert("matched", "some value2");
352
353 tx.send(event1.clone()).await.unwrap();
355 let new_event = out.recv().await.unwrap();
356
357 event1.set_source_id(Arc::new(ComponentKey::from("in")));
358 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
359
360 event1
362 .metadata_mut()
363 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
364 assert_eq!(new_event, event1);
365
366 tx.send(event2.clone()).await.unwrap();
369 let new_event = out.recv().await.unwrap();
370
371 event2.set_source_id(Arc::new(ComponentKey::from("in")));
372 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
373 event2
375 .metadata_mut()
376 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
377
378 assert_eq!(new_event, event2);
379
380 tx.send(event1.clone()).await.unwrap();
383 let new_event = out.recv().await.unwrap();
384
385 event1.set_source_id(Arc::new(ComponentKey::from("in")));
386 assert_eq!(new_event, event1);
387
388 drop(tx);
389 topology.stop().await;
390 assert_eq!(out.recv().await, None);
391 })
392 .await;
393 }
394
395 #[tokio::test]
396 async fn dedupe_match_timed_age_out() {
397 let transform_config = DedupeConfig {
399 time_settings: Some(TimedCacheConfig {
400 max_age_ms: Duration::from_millis(100),
401 refresh_on_drop: false,
402 }),
403 ..make_match_transform_config(5, vec!["matched".into()])
404 };
405 timed_age_out(transform_config).await;
406 }
407
408 #[tokio::test]
409 async fn dedupe_ignore_timed_age_out() {
410 let transform_config = DedupeConfig {
412 time_settings: Some(TimedCacheConfig {
413 max_age_ms: Duration::from_millis(100),
414 refresh_on_drop: false,
415 }),
416 ..make_ignore_transform_config(1, vec![])
417 };
418 timed_age_out(transform_config).await;
419 }
420
421 async fn timed_age_out(transform_config: DedupeConfig) {
423 assert_transform_compliance(async {
424 let (tx, rx) = mpsc::channel(1);
425 let (topology, mut out) =
426 create_topology(ReceiverStream::new(rx), transform_config).await;
427
428 let mut event1 = Event::Log(LogEvent::from("message"));
429 event1.as_mut_log().insert("matched", "some value");
430
431 tx.send(event1.clone()).await.unwrap();
433 let new_event = out.recv().await.unwrap();
434
435 event1.set_source_id(Arc::new(ComponentKey::from("in")));
436 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
437
438 event1
440 .metadata_mut()
441 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
442 assert_eq!(new_event, event1);
443
444 tx.send(event1.clone()).await.unwrap();
446
447 tokio::time::sleep(Duration::from_millis(101)).await;
448
449 tx.send(event1.clone()).await.unwrap();
451 let new_event = out.recv().await.unwrap();
452
453 event1.set_source_id(Arc::new(ComponentKey::from("in")));
454 assert_eq!(new_event, event1);
455
456 drop(tx);
457 topology.stop().await;
458 assert_eq!(out.recv().await, None);
459 })
460 .await;
461 }
462
463 #[tokio::test]
464 async fn dedupe_match_type_matching() {
465 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
466 type_matching(transform_config).await;
467 }
468
469 #[tokio::test]
470 async fn dedupe_ignore_type_matching() {
471 let transform_config = make_ignore_transform_config(5, vec![]);
472 type_matching(transform_config).await;
473 }
474
475 async fn type_matching(transform_config: DedupeConfig) {
479 assert_transform_compliance(async {
480 let (tx, rx) = mpsc::channel(1);
481 let (topology, mut out) =
482 create_topology(ReceiverStream::new(rx), transform_config).await;
483
484 let mut event1 = Event::Log(LogEvent::from("message"));
485 event1.as_mut_log().insert("matched", "123");
486
487 let mut event2 = Event::Log(LogEvent::from("message"));
488 event2.as_mut_log().insert("matched", 123);
489
490 tx.send(event1.clone()).await.unwrap();
492 let new_event = out.recv().await.unwrap();
493
494 event1.set_source_id(Arc::new(ComponentKey::from("in")));
495 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
496 event1
498 .metadata_mut()
499 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
500 assert_eq!(new_event, event1);
501
502 tx.send(event2.clone()).await.unwrap();
505 let new_event = out.recv().await.unwrap();
506
507 event2.set_source_id(Arc::new(ComponentKey::from("in")));
508 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
509 event2
511 .metadata_mut()
512 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
513 assert_eq!(new_event, event2);
514
515 drop(tx);
516 topology.stop().await;
517 assert_eq!(out.recv().await, None);
518 })
519 .await;
520 }
521
522 #[tokio::test]
523 async fn dedupe_match_type_matching_nested_objects() {
524 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
525 type_matching_nested_objects(transform_config).await;
526 }
527
528 #[tokio::test]
529 async fn dedupe_ignore_type_matching_nested_objects() {
530 let transform_config = make_ignore_transform_config(5, vec![]);
531 type_matching_nested_objects(transform_config).await;
532 }
533
534 async fn type_matching_nested_objects(transform_config: DedupeConfig) {
538 assert_transform_compliance(async {
539 let (tx, rx) = mpsc::channel(1);
540 let (topology, mut out) =
541 create_topology(ReceiverStream::new(rx), transform_config).await;
542
543 let mut map1 = ObjectMap::new();
544 map1.insert("key".into(), "123".into());
545 let mut event1 = Event::Log(LogEvent::from("message"));
546 event1.as_mut_log().insert("matched", map1);
547
548 let mut map2 = ObjectMap::new();
549 map2.insert("key".into(), 123.into());
550 let mut event2 = Event::Log(LogEvent::from("message"));
551 event2.as_mut_log().insert("matched", map2);
552
553 tx.send(event1.clone()).await.unwrap();
555 let new_event = out.recv().await.unwrap();
556
557 event1.set_source_id(Arc::new(ComponentKey::from("in")));
558 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
559 event1
561 .metadata_mut()
562 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
563 assert_eq!(new_event, event1);
564
565 tx.send(event2.clone()).await.unwrap();
568 let new_event = out.recv().await.unwrap();
569
570 event2.set_source_id(Arc::new(ComponentKey::from("in")));
571 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
572 event2
574 .metadata_mut()
575 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
576 assert_eq!(new_event, event2);
577
578 drop(tx);
579 topology.stop().await;
580 assert_eq!(out.recv().await, None);
581 })
582 .await;
583 }
584
585 #[tokio::test]
586 async fn dedupe_match_null_vs_missing() {
587 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
588 ignore_vs_missing(transform_config).await;
589 }
590
591 #[tokio::test]
592 async fn dedupe_ignore_null_vs_missing() {
593 let transform_config = make_ignore_transform_config(5, vec![]);
594 ignore_vs_missing(transform_config).await;
595 }
596
597 async fn ignore_vs_missing(transform_config: DedupeConfig) {
599 assert_transform_compliance(async {
600 let (tx, rx) = mpsc::channel(1);
601 let (topology, mut out) =
602 create_topology(ReceiverStream::new(rx), transform_config).await;
603
604 let mut event1 = Event::Log(LogEvent::from("message"));
605 event1.as_mut_log().insert("matched", Value::Null);
606
607 let mut event2 = Event::Log(LogEvent::from("message"));
608
609 tx.send(event1.clone()).await.unwrap();
611 let new_event = out.recv().await.unwrap();
612
613 event1.set_source_id(Arc::new(ComponentKey::from("in")));
614 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
615 event1
617 .metadata_mut()
618 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
619 assert_eq!(new_event, event1);
620
621 tx.send(event2.clone()).await.unwrap();
624 let new_event = out.recv().await.unwrap();
625
626 event2.set_source_id(Arc::new(ComponentKey::from("in")));
627 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
628 event2
630 .metadata_mut()
631 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
632 assert_eq!(new_event, event2);
633
634 drop(tx);
635 topology.stop().await;
636 assert_eq!(out.recv().await, None);
637 })
638 .await;
639 }
640}