1use vector_lib::{
2 config::{clone_input_definitions, LogNamespace},
3 configurable::configurable_component,
4};
5
6use crate::{
7 config::{
8 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
9 TransformOutput,
10 },
11 schema,
12 transforms::Transform,
13};
14
15use super::{
16 common::{
17 default_cache_config, fill_default_fields_match, CacheConfig, FieldMatchConfig,
18 TimedCacheConfig,
19 },
20 timed_transform::TimedDedupe,
21 transform::Dedupe,
22};
23
24#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct DedupeConfig {
29 #[configurable(derived)]
30 #[serde(default)]
31 pub fields: Option<FieldMatchConfig>,
32
33 #[configurable(derived)]
34 #[serde(default = "default_cache_config")]
35 pub cache: CacheConfig,
36
37 #[configurable(derived)]
38 #[serde(default)]
39 pub time_settings: Option<TimedCacheConfig>,
40}
41
42impl GenerateConfig for DedupeConfig {
43 fn generate_config() -> toml::Value {
44 toml::Value::try_from(Self {
45 fields: None,
46 cache: default_cache_config(),
47 time_settings: None,
48 })
49 .unwrap()
50 }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde(name = "dedupe")]
55impl TransformConfig for DedupeConfig {
56 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
57 if let Some(time_config) = &self.time_settings {
58 Ok(Transform::event_task(TimedDedupe::new(
59 self.cache.num_events,
60 fill_default_fields_match(self.fields.as_ref()),
61 time_config.clone(),
62 )))
63 } else {
64 Ok(Transform::event_task(Dedupe::new(
65 self.cache.num_events,
66 fill_default_fields_match(self.fields.as_ref()),
67 )))
68 }
69 }
70
71 fn input(&self) -> Input {
72 Input::log()
73 }
74
75 fn outputs(
76 &self,
77 _: vector_lib::enrichment::TableRegistry,
78 input_definitions: &[(OutputId, schema::Definition)],
79 _: LogNamespace,
80 ) -> Vec<TransformOutput> {
81 vec![TransformOutput::new(
82 DataType::Log,
83 clone_input_definitions(input_definitions),
84 )]
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use std::sync::Arc;
91 use std::time::Duration;
92
93 use tokio::sync::mpsc;
94 use tokio_stream::wrappers::ReceiverStream;
95 use vector_lib::config::ComponentKey;
96 use vector_lib::config::OutputId;
97 use vector_lib::lookup::lookup_v2::ConfigTargetPath;
98
99 use crate::config::schema::Definition;
100 use crate::transforms::dedupe::common::TimedCacheConfig;
101 use crate::{
102 event::{Event, LogEvent, ObjectMap, Value},
103 test_util::components::assert_transform_compliance,
104 transforms::{
105 dedupe::config::{CacheConfig, DedupeConfig, FieldMatchConfig},
106 test::create_topology,
107 },
108 };
109
110 #[test]
111 fn generate_config() {
112 crate::test_util::test_generate_config::<DedupeConfig>();
113 }
114
115 const fn make_match_transform_config(
116 num_events: usize,
117 fields: Vec<ConfigTargetPath>,
118 ) -> DedupeConfig {
119 DedupeConfig {
120 cache: CacheConfig {
121 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
122 },
123 fields: Some(FieldMatchConfig::MatchFields(fields)),
124 time_settings: None,
125 }
126 }
127
128 fn make_ignore_transform_config(
129 num_events: usize,
130 given_fields: Vec<ConfigTargetPath>,
131 ) -> DedupeConfig {
132 let mut fields = vec!["message".into(), "timestamp".into()];
134 fields.extend(given_fields);
135
136 DedupeConfig {
137 cache: CacheConfig {
138 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
139 },
140 fields: Some(FieldMatchConfig::IgnoreFields(fields)),
141 time_settings: None,
142 }
143 }
144
145 #[tokio::test]
146 async fn dedupe_match_basic() {
147 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
148 basic(transform_config, "matched", "unmatched").await;
149 }
150
151 #[tokio::test]
152 async fn dedupe_ignore_basic() {
153 let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
154 basic(transform_config, "matched", "unmatched").await;
155 }
156
157 #[tokio::test]
158 async fn dedupe_ignore_with_metadata_field() {
159 let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
160 basic(transform_config, "matched", "%ignored").await;
161 }
162
163 async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
164 assert_transform_compliance(async {
165 let (tx, rx) = mpsc::channel(1);
166 let (topology, mut out) =
167 create_topology(ReceiverStream::new(rx), transform_config).await;
168
169 let mut event1 = Event::Log(LogEvent::from("message"));
170 event1.as_mut_log().insert(first_path, "some value");
171 event1.as_mut_log().insert(second_path, "another value");
172
173 let mut event2 = Event::Log(LogEvent::from("message"));
175 event2.as_mut_log().insert(first_path, "some value2");
176 event2.as_mut_log().insert(second_path, "another value");
177
178 let mut event3 = Event::Log(LogEvent::from("message"));
180 event3.as_mut_log().insert(first_path, "some value");
181 event3.as_mut_log().insert(second_path, "another value2");
182
183 tx.send(event1.clone()).await.unwrap();
185 let new_event = out.recv().await.unwrap();
186
187 event1.set_source_id(Arc::new(ComponentKey::from("in")));
188 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
189 event1
191 .metadata_mut()
192 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
193 assert_eq!(new_event, event1);
194
195 tx.send(event2.clone()).await.unwrap();
198 let new_event = out.recv().await.unwrap();
199
200 event2.set_source_id(Arc::new(ComponentKey::from("in")));
201 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
202 event2
204 .metadata_mut()
205 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
206 assert_eq!(new_event, event2);
207
208 tx.send(event3.clone()).await.unwrap();
210
211 drop(tx);
212 topology.stop().await;
213 assert_eq!(out.recv().await, None);
214 })
215 .await;
216 }
217
218 #[tokio::test]
219 async fn dedupe_match_field_name_matters() {
220 let transform_config =
221 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
222 field_name_matters(transform_config).await;
223 }
224
225 #[tokio::test]
226 async fn dedupe_ignore_field_name_matters() {
227 let transform_config = make_ignore_transform_config(5, vec![]);
228 field_name_matters(transform_config).await;
229 }
230
231 async fn field_name_matters(transform_config: DedupeConfig) {
232 assert_transform_compliance(async {
233 let (tx, rx) = mpsc::channel(1);
234 let (topology, mut out) =
235 create_topology(ReceiverStream::new(rx), transform_config).await;
236
237 let mut event1 = Event::Log(LogEvent::from("message"));
238 event1.as_mut_log().insert("matched1", "some value");
239
240 let mut event2 = Event::Log(LogEvent::from("message"));
241 event2.as_mut_log().insert("matched2", "some value");
242
243 tx.send(event1.clone()).await.unwrap();
245 let new_event = out.recv().await.unwrap();
246
247 event1.set_source_id(Arc::new(ComponentKey::from("in")));
248 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
249 event1
251 .metadata_mut()
252 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
253 assert_eq!(new_event, event1);
254
255 tx.send(event2.clone()).await.unwrap();
258 let new_event = out.recv().await.unwrap();
259
260 event2.set_source_id(Arc::new(ComponentKey::from("in")));
261 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
262 event2
264 .metadata_mut()
265 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
266 assert_eq!(new_event, event2);
267
268 drop(tx);
269 topology.stop().await;
270 assert_eq!(out.recv().await, None);
271 })
272 .await;
273 }
274
275 #[tokio::test]
276 async fn dedupe_match_field_order_irrelevant() {
277 let transform_config =
278 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
279 field_order_irrelevant(transform_config).await;
280 }
281
282 #[tokio::test]
283 async fn dedupe_ignore_field_order_irrelevant() {
284 let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
285 field_order_irrelevant(transform_config).await;
286 }
287
288 async fn field_order_irrelevant(transform_config: DedupeConfig) {
292 assert_transform_compliance(async {
293 let (tx, rx) = mpsc::channel(1);
294 let (topology, mut out) =
295 create_topology(ReceiverStream::new(rx), transform_config).await;
296
297 let mut event1 = Event::Log(LogEvent::from("message"));
298 event1.as_mut_log().insert("matched1", "value1");
299 event1.as_mut_log().insert("matched2", "value2");
300
301 let mut event2 = Event::Log(LogEvent::from("message"));
303 event2.as_mut_log().insert("matched2", "value2");
304 event2.as_mut_log().insert("matched1", "value1");
305
306 tx.send(event1.clone()).await.unwrap();
308 let new_event = out.recv().await.unwrap();
309
310 event1.set_source_id(Arc::new(ComponentKey::from("in")));
311 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
312 event1
314 .metadata_mut()
315 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
316 assert_eq!(new_event, event1);
317
318 tx.send(event2).await.unwrap();
321
322 drop(tx);
323 topology.stop().await;
324 assert_eq!(out.recv().await, None);
325 })
326 .await;
327 }
328
329 #[tokio::test]
330 async fn dedupe_match_age_out() {
331 let transform_config = make_match_transform_config(1, vec!["matched".into()]);
333 age_out(transform_config).await;
334 }
335
336 #[tokio::test]
337 async fn dedupe_ignore_age_out() {
338 let transform_config = make_ignore_transform_config(1, vec![]);
340 age_out(transform_config).await;
341 }
342
343 async fn age_out(transform_config: DedupeConfig) {
345 assert_transform_compliance(async {
346 let (tx, rx) = mpsc::channel(1);
347 let (topology, mut out) =
348 create_topology(ReceiverStream::new(rx), transform_config).await;
349
350 let mut event1 = Event::Log(LogEvent::from("message"));
351 event1.as_mut_log().insert("matched", "some value");
352
353 let mut event2 = Event::Log(LogEvent::from("message"));
354 event2.as_mut_log().insert("matched", "some value2");
355
356 tx.send(event1.clone()).await.unwrap();
358 let new_event = out.recv().await.unwrap();
359
360 event1.set_source_id(Arc::new(ComponentKey::from("in")));
361 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
362
363 event1
365 .metadata_mut()
366 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
367 assert_eq!(new_event, event1);
368
369 tx.send(event2.clone()).await.unwrap();
372 let new_event = out.recv().await.unwrap();
373
374 event2.set_source_id(Arc::new(ComponentKey::from("in")));
375 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
376 event2
378 .metadata_mut()
379 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
380
381 assert_eq!(new_event, event2);
382
383 tx.send(event1.clone()).await.unwrap();
386 let new_event = out.recv().await.unwrap();
387
388 event1.set_source_id(Arc::new(ComponentKey::from("in")));
389 assert_eq!(new_event, event1);
390
391 drop(tx);
392 topology.stop().await;
393 assert_eq!(out.recv().await, None);
394 })
395 .await;
396 }
397
398 #[tokio::test]
399 async fn dedupe_match_timed_age_out() {
400 let transform_config = DedupeConfig {
402 time_settings: Some(TimedCacheConfig {
403 max_age_ms: Duration::from_millis(100),
404 refresh_on_drop: false,
405 }),
406 ..make_match_transform_config(5, vec!["matched".into()])
407 };
408 timed_age_out(transform_config).await;
409 }
410
411 #[tokio::test]
412 async fn dedupe_ignore_timed_age_out() {
413 let transform_config = DedupeConfig {
415 time_settings: Some(TimedCacheConfig {
416 max_age_ms: Duration::from_millis(100),
417 refresh_on_drop: false,
418 }),
419 ..make_ignore_transform_config(1, vec![])
420 };
421 timed_age_out(transform_config).await;
422 }
423
424 async fn timed_age_out(transform_config: DedupeConfig) {
426 assert_transform_compliance(async {
427 let (tx, rx) = mpsc::channel(1);
428 let (topology, mut out) =
429 create_topology(ReceiverStream::new(rx), transform_config).await;
430
431 let mut event1 = Event::Log(LogEvent::from("message"));
432 event1.as_mut_log().insert("matched", "some value");
433
434 tx.send(event1.clone()).await.unwrap();
436 let new_event = out.recv().await.unwrap();
437
438 event1.set_source_id(Arc::new(ComponentKey::from("in")));
439 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
440
441 event1
443 .metadata_mut()
444 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
445 assert_eq!(new_event, event1);
446
447 tx.send(event1.clone()).await.unwrap();
449
450 tokio::time::sleep(Duration::from_millis(101)).await;
451
452 tx.send(event1.clone()).await.unwrap();
454 let new_event = out.recv().await.unwrap();
455
456 event1.set_source_id(Arc::new(ComponentKey::from("in")));
457 assert_eq!(new_event, event1);
458
459 drop(tx);
460 topology.stop().await;
461 assert_eq!(out.recv().await, None);
462 })
463 .await;
464 }
465
466 #[tokio::test]
467 async fn dedupe_match_type_matching() {
468 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
469 type_matching(transform_config).await;
470 }
471
472 #[tokio::test]
473 async fn dedupe_ignore_type_matching() {
474 let transform_config = make_ignore_transform_config(5, vec![]);
475 type_matching(transform_config).await;
476 }
477
478 async fn type_matching(transform_config: DedupeConfig) {
482 assert_transform_compliance(async {
483 let (tx, rx) = mpsc::channel(1);
484 let (topology, mut out) =
485 create_topology(ReceiverStream::new(rx), transform_config).await;
486
487 let mut event1 = Event::Log(LogEvent::from("message"));
488 event1.as_mut_log().insert("matched", "123");
489
490 let mut event2 = Event::Log(LogEvent::from("message"));
491 event2.as_mut_log().insert("matched", 123);
492
493 tx.send(event1.clone()).await.unwrap();
495 let new_event = out.recv().await.unwrap();
496
497 event1.set_source_id(Arc::new(ComponentKey::from("in")));
498 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
499 event1
501 .metadata_mut()
502 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
503 assert_eq!(new_event, event1);
504
505 tx.send(event2.clone()).await.unwrap();
508 let new_event = out.recv().await.unwrap();
509
510 event2.set_source_id(Arc::new(ComponentKey::from("in")));
511 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
512 event2
514 .metadata_mut()
515 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
516 assert_eq!(new_event, event2);
517
518 drop(tx);
519 topology.stop().await;
520 assert_eq!(out.recv().await, None);
521 })
522 .await;
523 }
524
525 #[tokio::test]
526 async fn dedupe_match_type_matching_nested_objects() {
527 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
528 type_matching_nested_objects(transform_config).await;
529 }
530
531 #[tokio::test]
532 async fn dedupe_ignore_type_matching_nested_objects() {
533 let transform_config = make_ignore_transform_config(5, vec![]);
534 type_matching_nested_objects(transform_config).await;
535 }
536
537 async fn type_matching_nested_objects(transform_config: DedupeConfig) {
541 assert_transform_compliance(async {
542 let (tx, rx) = mpsc::channel(1);
543 let (topology, mut out) =
544 create_topology(ReceiverStream::new(rx), transform_config).await;
545
546 let mut map1 = ObjectMap::new();
547 map1.insert("key".into(), "123".into());
548 let mut event1 = Event::Log(LogEvent::from("message"));
549 event1.as_mut_log().insert("matched", map1);
550
551 let mut map2 = ObjectMap::new();
552 map2.insert("key".into(), 123.into());
553 let mut event2 = Event::Log(LogEvent::from("message"));
554 event2.as_mut_log().insert("matched", map2);
555
556 tx.send(event1.clone()).await.unwrap();
558 let new_event = out.recv().await.unwrap();
559
560 event1.set_source_id(Arc::new(ComponentKey::from("in")));
561 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
562 event1
564 .metadata_mut()
565 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
566 assert_eq!(new_event, event1);
567
568 tx.send(event2.clone()).await.unwrap();
571 let new_event = out.recv().await.unwrap();
572
573 event2.set_source_id(Arc::new(ComponentKey::from("in")));
574 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
575 event2
577 .metadata_mut()
578 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
579 assert_eq!(new_event, event2);
580
581 drop(tx);
582 topology.stop().await;
583 assert_eq!(out.recv().await, None);
584 })
585 .await;
586 }
587
588 #[tokio::test]
589 async fn dedupe_match_null_vs_missing() {
590 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
591 ignore_vs_missing(transform_config).await;
592 }
593
594 #[tokio::test]
595 async fn dedupe_ignore_null_vs_missing() {
596 let transform_config = make_ignore_transform_config(5, vec![]);
597 ignore_vs_missing(transform_config).await;
598 }
599
600 async fn ignore_vs_missing(transform_config: DedupeConfig) {
602 assert_transform_compliance(async {
603 let (tx, rx) = mpsc::channel(1);
604 let (topology, mut out) =
605 create_topology(ReceiverStream::new(rx), transform_config).await;
606
607 let mut event1 = Event::Log(LogEvent::from("message"));
608 event1.as_mut_log().insert("matched", Value::Null);
609
610 let mut event2 = Event::Log(LogEvent::from("message"));
611
612 tx.send(event1.clone()).await.unwrap();
614 let new_event = out.recv().await.unwrap();
615
616 event1.set_source_id(Arc::new(ComponentKey::from("in")));
617 event1.set_upstream_id(Arc::new(OutputId::from("transform")));
618 event1
620 .metadata_mut()
621 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
622 assert_eq!(new_event, event1);
623
624 tx.send(event2.clone()).await.unwrap();
627 let new_event = out.recv().await.unwrap();
628
629 event2.set_source_id(Arc::new(ComponentKey::from("in")));
630 event2.set_upstream_id(Arc::new(OutputId::from("transform")));
631 event2
633 .metadata_mut()
634 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
635 assert_eq!(new_event, event2);
636
637 drop(tx);
638 topology.stop().await;
639 assert_eq!(out.recv().await, None);
640 })
641 .await;
642 }
643}