1use vector_lib::{
2 config::{LogNamespace, clone_input_definitions},
3 configurable::configurable_component,
4};
5
6use super::{
7 common::{
8 CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
9 fill_default_fields_match,
10 },
11 timed_transform::TimedDedupe,
12 transform::Dedupe,
13};
14use crate::{
15 config::{
16 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
17 TransformOutput,
18 },
19 schema,
20 transforms::Transform,
21};
22
23#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct DedupeConfig {
28 #[configurable(derived)]
29 #[serde(default)]
30 pub fields: Option<FieldMatchConfig>,
31
32 #[configurable(derived)]
33 #[serde(default = "default_cache_config")]
34 pub cache: CacheConfig,
35
36 #[configurable(derived)]
37 #[serde(default)]
38 pub time_settings: Option<TimedCacheConfig>,
39}
40
41impl GenerateConfig for DedupeConfig {
42 fn generate_config() -> toml::Value {
43 toml::Value::try_from(Self {
44 fields: None,
45 cache: default_cache_config(),
46 time_settings: None,
47 })
48 .unwrap()
49 }
50}
51
52#[async_trait::async_trait]
53#[typetag::serde(name = "dedupe")]
54impl TransformConfig for DedupeConfig {
55 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
56 if let Some(time_config) = &self.time_settings {
57 Ok(Transform::event_task(TimedDedupe::new(
58 self.cache.num_events,
59 fill_default_fields_match(self.fields.as_ref()),
60 time_config.clone(),
61 )))
62 } else {
63 Ok(Transform::event_task(Dedupe::new(
64 self.cache.num_events,
65 fill_default_fields_match(self.fields.as_ref()),
66 )))
67 }
68 }
69
70 fn input(&self) -> Input {
71 Input::log()
72 }
73
74 fn outputs(
75 &self,
76 _: vector_lib::enrichment::TableRegistry,
77 input_definitions: &[(OutputId, schema::Definition)],
78 _: LogNamespace,
79 ) -> Vec<TransformOutput> {
80 vec![TransformOutput::new(
81 DataType::Log,
82 clone_input_definitions(input_definitions),
83 )]
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use std::{sync::Arc, time::Duration};
90
91 use tokio::sync::mpsc;
92 use tokio_stream::wrappers::ReceiverStream;
93 use vector_lib::{
94 config::{ComponentKey, OutputId},
95 lookup::lookup_v2::ConfigTargetPath,
96 };
97
98 use crate::{
99 config::schema::Definition,
100 event::{Event, LogEvent, ObjectMap, Value},
101 test_util::components::assert_transform_compliance,
102 transforms::{
103 dedupe::{
104 common::TimedCacheConfig,
105 config::{CacheConfig, DedupeConfig, FieldMatchConfig},
106 },
107 test::create_topology,
108 },
109 };
110
111 #[test]
112 fn generate_config() {
113 crate::test_util::test_generate_config::<DedupeConfig>();
114 }
115
116 const fn make_match_transform_config(
117 num_events: usize,
118 fields: Vec<ConfigTargetPath>,
119 ) -> DedupeConfig {
120 DedupeConfig {
121 cache: CacheConfig {
122 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
123 },
124 fields: Some(FieldMatchConfig::MatchFields(fields)),
125 time_settings: None,
126 }
127 }
128
129 fn make_ignore_transform_config(
130 num_events: usize,
131 given_fields: Vec<ConfigTargetPath>,
132 ) -> DedupeConfig {
133 let mut fields = vec!["message".into(), "timestamp".into()];
135 fields.extend(given_fields);
136
137 DedupeConfig {
138 cache: CacheConfig {
139 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
140 },
141 fields: Some(FieldMatchConfig::IgnoreFields(fields)),
142 time_settings: None,
143 }
144 }
145
146 #[tokio::test]
147 async fn dedupe_match_basic() {
148 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
149 basic(transform_config, "matched", "unmatched").await;
150 }
151
152 #[tokio::test]
153 async fn dedupe_ignore_basic() {
154 let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
155 basic(transform_config, "matched", "unmatched").await;
156 }
157
158 #[tokio::test]
159 async fn dedupe_ignore_with_metadata_field() {
160 let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
161 basic(transform_config, "matched", "%ignored").await;
162 }
163
164 async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
165 assert_transform_compliance(async {
166 let (tx, rx) = mpsc::channel(1);
167 let (topology, mut out) =
168 create_topology(ReceiverStream::new(rx), transform_config).await;
169
170 let mut event1 = Event::Log(LogEvent::from("message"));
171 event1.as_mut_log().insert(first_path, "some value");
172 event1.as_mut_log().insert(second_path, "another value");
173
174 let mut event2 = Event::Log(LogEvent::from("message"));
176 event2.as_mut_log().insert(first_path, "some value2");
177 event2.as_mut_log().insert(second_path, "another value");
178
179 let mut event3 = Event::Log(LogEvent::from("message"));
181 event3.as_mut_log().insert(first_path, "some value");
182 event3.as_mut_log().insert(second_path, "another value2");
183
184 tx.send(event1.clone()).await.unwrap();
186 let new_event = out.recv().await.unwrap();
187
188 event1.set_source_id(Arc::new(ComponentKey::from("in")));
189 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
190 event1
192 .metadata_mut()
193 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
194 assert_eq!(new_event, event1);
195
196 tx.send(event2.clone()).await.unwrap();
199 let new_event = out.recv().await.unwrap();
200
201 event2.set_source_id(Arc::new(ComponentKey::from("in")));
202 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
203 event2
205 .metadata_mut()
206 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
207 assert_eq!(new_event, event2);
208
209 tx.send(event3.clone()).await.unwrap();
211
212 drop(tx);
213 topology.stop().await;
214 assert_eq!(out.recv().await, None);
215 })
216 .await;
217 }
218
219 #[tokio::test]
220 async fn dedupe_match_field_name_matters() {
221 let transform_config =
222 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
223 field_name_matters(transform_config).await;
224 }
225
226 #[tokio::test]
227 async fn dedupe_ignore_field_name_matters() {
228 let transform_config = make_ignore_transform_config(5, vec![]);
229 field_name_matters(transform_config).await;
230 }
231
232 async fn field_name_matters(transform_config: DedupeConfig) {
233 assert_transform_compliance(async {
234 let (tx, rx) = mpsc::channel(1);
235 let (topology, mut out) =
236 create_topology(ReceiverStream::new(rx), transform_config).await;
237
238 let mut event1 = Event::Log(LogEvent::from("message"));
239 event1.as_mut_log().insert("matched1", "some value");
240
241 let mut event2 = Event::Log(LogEvent::from("message"));
242 event2.as_mut_log().insert("matched2", "some value");
243
244 tx.send(event1.clone()).await.unwrap();
246 let new_event = out.recv().await.unwrap();
247
248 event1.set_source_id(Arc::new(ComponentKey::from("in")));
249 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
250 event1
252 .metadata_mut()
253 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
254 assert_eq!(new_event, event1);
255
256 tx.send(event2.clone()).await.unwrap();
259 let new_event = out.recv().await.unwrap();
260
261 event2.set_source_id(Arc::new(ComponentKey::from("in")));
262 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
263 event2
265 .metadata_mut()
266 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
267 assert_eq!(new_event, event2);
268
269 drop(tx);
270 topology.stop().await;
271 assert_eq!(out.recv().await, None);
272 })
273 .await;
274 }
275
276 #[tokio::test]
277 async fn dedupe_match_field_order_irrelevant() {
278 let transform_config =
279 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
280 field_order_irrelevant(transform_config).await;
281 }
282
283 #[tokio::test]
284 async fn dedupe_ignore_field_order_irrelevant() {
285 let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
286 field_order_irrelevant(transform_config).await;
287 }
288
289 async fn field_order_irrelevant(transform_config: DedupeConfig) {
293 assert_transform_compliance(async {
294 let (tx, rx) = mpsc::channel(1);
295 let (topology, mut out) =
296 create_topology(ReceiverStream::new(rx), transform_config).await;
297
298 let mut event1 = Event::Log(LogEvent::from("message"));
299 event1.as_mut_log().insert("matched1", "value1");
300 event1.as_mut_log().insert("matched2", "value2");
301
302 let mut event2 = Event::Log(LogEvent::from("message"));
304 event2.as_mut_log().insert("matched2", "value2");
305 event2.as_mut_log().insert("matched1", "value1");
306
307 tx.send(event1.clone()).await.unwrap();
309 let new_event = out.recv().await.unwrap();
310
311 event1.set_source_id(Arc::new(ComponentKey::from("in")));
312 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
313 event1
315 .metadata_mut()
316 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
317 assert_eq!(new_event, event1);
318
319 tx.send(event2).await.unwrap();
322
323 drop(tx);
324 topology.stop().await;
325 assert_eq!(out.recv().await, None);
326 })
327 .await;
328 }
329
330 #[tokio::test]
331 async fn dedupe_match_age_out() {
332 let transform_config = make_match_transform_config(1, vec!["matched".into()]);
334 age_out(transform_config).await;
335 }
336
337 #[tokio::test]
338 async fn dedupe_ignore_age_out() {
339 let transform_config = make_ignore_transform_config(1, vec![]);
341 age_out(transform_config).await;
342 }
343
344 async fn age_out(transform_config: DedupeConfig) {
346 assert_transform_compliance(async {
347 let (tx, rx) = mpsc::channel(1);
348 let (topology, mut out) =
349 create_topology(ReceiverStream::new(rx), transform_config).await;
350
351 let mut event1 = Event::Log(LogEvent::from("message"));
352 event1.as_mut_log().insert("matched", "some value");
353
354 let mut event2 = Event::Log(LogEvent::from("message"));
355 event2.as_mut_log().insert("matched", "some value2");
356
357 tx.send(event1.clone()).await.unwrap();
359 let new_event = out.recv().await.unwrap();
360
361 event1.set_source_id(Arc::new(ComponentKey::from("in")));
362 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
363
364 event1
366 .metadata_mut()
367 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
368 assert_eq!(new_event, event1);
369
370 tx.send(event2.clone()).await.unwrap();
373 let new_event = out.recv().await.unwrap();
374
375 event2.set_source_id(Arc::new(ComponentKey::from("in")));
376 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
377 event2
379 .metadata_mut()
380 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
381
382 assert_eq!(new_event, event2);
383
384 tx.send(event1.clone()).await.unwrap();
387 let new_event = out.recv().await.unwrap();
388
389 event1.set_source_id(Arc::new(ComponentKey::from("in")));
390 assert_eq!(new_event, event1);
391
392 drop(tx);
393 topology.stop().await;
394 assert_eq!(out.recv().await, None);
395 })
396 .await;
397 }
398
399 #[tokio::test]
400 async fn dedupe_match_timed_age_out() {
401 let transform_config = DedupeConfig {
403 time_settings: Some(TimedCacheConfig {
404 max_age_ms: Duration::from_millis(100),
405 refresh_on_drop: false,
406 }),
407 ..make_match_transform_config(5, vec!["matched".into()])
408 };
409 timed_age_out(transform_config).await;
410 }
411
412 #[tokio::test]
413 async fn dedupe_ignore_timed_age_out() {
414 let transform_config = DedupeConfig {
416 time_settings: Some(TimedCacheConfig {
417 max_age_ms: Duration::from_millis(100),
418 refresh_on_drop: false,
419 }),
420 ..make_ignore_transform_config(1, vec![])
421 };
422 timed_age_out(transform_config).await;
423 }
424
425 async fn timed_age_out(transform_config: DedupeConfig) {
427 assert_transform_compliance(async {
428 let (tx, rx) = mpsc::channel(1);
429 let (topology, mut out) =
430 create_topology(ReceiverStream::new(rx), transform_config).await;
431
432 let mut event1 = Event::Log(LogEvent::from("message"));
433 event1.as_mut_log().insert("matched", "some value");
434
435 tx.send(event1.clone()).await.unwrap();
437 let new_event = out.recv().await.unwrap();
438
439 event1.set_source_id(Arc::new(ComponentKey::from("in")));
440 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
441
442 event1
444 .metadata_mut()
445 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
446 assert_eq!(new_event, event1);
447
448 tx.send(event1.clone()).await.unwrap();
450
451 tokio::time::sleep(Duration::from_millis(101)).await;
452
453 tx.send(event1.clone()).await.unwrap();
455 let new_event = out.recv().await.unwrap();
456
457 event1.set_source_id(Arc::new(ComponentKey::from("in")));
458 assert_eq!(new_event, event1);
459
460 drop(tx);
461 topology.stop().await;
462 assert_eq!(out.recv().await, None);
463 })
464 .await;
465 }
466
467 #[tokio::test]
468 async fn dedupe_match_type_matching() {
469 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
470 type_matching(transform_config).await;
471 }
472
473 #[tokio::test]
474 async fn dedupe_ignore_type_matching() {
475 let transform_config = make_ignore_transform_config(5, vec![]);
476 type_matching(transform_config).await;
477 }
478
479 async fn type_matching(transform_config: DedupeConfig) {
483 assert_transform_compliance(async {
484 let (tx, rx) = mpsc::channel(1);
485 let (topology, mut out) =
486 create_topology(ReceiverStream::new(rx), transform_config).await;
487
488 let mut event1 = Event::Log(LogEvent::from("message"));
489 event1.as_mut_log().insert("matched", "123");
490
491 let mut event2 = Event::Log(LogEvent::from("message"));
492 event2.as_mut_log().insert("matched", 123);
493
494 tx.send(event1.clone()).await.unwrap();
496 let new_event = out.recv().await.unwrap();
497
498 event1.set_source_id(Arc::new(ComponentKey::from("in")));
499 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
500 event1
502 .metadata_mut()
503 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
504 assert_eq!(new_event, event1);
505
506 tx.send(event2.clone()).await.unwrap();
509 let new_event = out.recv().await.unwrap();
510
511 event2.set_source_id(Arc::new(ComponentKey::from("in")));
512 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
513 event2
515 .metadata_mut()
516 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
517 assert_eq!(new_event, event2);
518
519 drop(tx);
520 topology.stop().await;
521 assert_eq!(out.recv().await, None);
522 })
523 .await;
524 }
525
526 #[tokio::test]
527 async fn dedupe_match_type_matching_nested_objects() {
528 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
529 type_matching_nested_objects(transform_config).await;
530 }
531
532 #[tokio::test]
533 async fn dedupe_ignore_type_matching_nested_objects() {
534 let transform_config = make_ignore_transform_config(5, vec![]);
535 type_matching_nested_objects(transform_config).await;
536 }
537
538 async fn type_matching_nested_objects(transform_config: DedupeConfig) {
542 assert_transform_compliance(async {
543 let (tx, rx) = mpsc::channel(1);
544 let (topology, mut out) =
545 create_topology(ReceiverStream::new(rx), transform_config).await;
546
547 let mut map1 = ObjectMap::new();
548 map1.insert("key".into(), "123".into());
549 let mut event1 = Event::Log(LogEvent::from("message"));
550 event1.as_mut_log().insert("matched", map1);
551
552 let mut map2 = ObjectMap::new();
553 map2.insert("key".into(), 123.into());
554 let mut event2 = Event::Log(LogEvent::from("message"));
555 event2.as_mut_log().insert("matched", map2);
556
557 tx.send(event1.clone()).await.unwrap();
559 let new_event = out.recv().await.unwrap();
560
561 event1.set_source_id(Arc::new(ComponentKey::from("in")));
562 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
563 event1
565 .metadata_mut()
566 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
567 assert_eq!(new_event, event1);
568
569 tx.send(event2.clone()).await.unwrap();
572 let new_event = out.recv().await.unwrap();
573
574 event2.set_source_id(Arc::new(ComponentKey::from("in")));
575 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
576 event2
578 .metadata_mut()
579 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
580 assert_eq!(new_event, event2);
581
582 drop(tx);
583 topology.stop().await;
584 assert_eq!(out.recv().await, None);
585 })
586 .await;
587 }
588
589 #[tokio::test]
590 async fn dedupe_match_null_vs_missing() {
591 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
592 ignore_vs_missing(transform_config).await;
593 }
594
595 #[tokio::test]
596 async fn dedupe_ignore_null_vs_missing() {
597 let transform_config = make_ignore_transform_config(5, vec![]);
598 ignore_vs_missing(transform_config).await;
599 }
600
601 async fn ignore_vs_missing(transform_config: DedupeConfig) {
603 assert_transform_compliance(async {
604 let (tx, rx) = mpsc::channel(1);
605 let (topology, mut out) =
606 create_topology(ReceiverStream::new(rx), transform_config).await;
607
608 let mut event1 = Event::Log(LogEvent::from("message"));
609 event1.as_mut_log().insert("matched", Value::Null);
610
611 let mut event2 = Event::Log(LogEvent::from("message"));
612
613 tx.send(event1.clone()).await.unwrap();
615 let new_event = out.recv().await.unwrap();
616
617 event1.set_source_id(Arc::new(ComponentKey::from("in")));
618 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
619 event1
621 .metadata_mut()
622 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
623 assert_eq!(new_event, event1);
624
625 tx.send(event2.clone()).await.unwrap();
628 let new_event = out.recv().await.unwrap();
629
630 event2.set_source_id(Arc::new(ComponentKey::from("in")));
631 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
632 event2
634 .metadata_mut()
635 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
636 assert_eq!(new_event, event2);
637
638 drop(tx);
639 topology.stop().await;
640 assert_eq!(out.recv().await, None);
641 })
642 .await;
643 }
644}