1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::{
4 common::{
5 CacheConfig, FieldMatchConfig, TimedCacheConfig, default_cache_config,
6 fill_default_fields_match,
7 },
8 timed_transform::TimedDedupe,
9 transform::Dedupe,
10};
11use crate::{
12 config::{
13 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14 TransformOutput,
15 },
16 schema,
17 transforms::Transform,
18};
19
20#[configurable_component(transform("dedupe", "Deduplicate logs passing through a topology."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct DedupeConfig {
25 #[configurable(derived)]
26 #[serde(default)]
27 pub fields: Option<FieldMatchConfig>,
28
29 #[configurable(derived)]
30 #[serde(default = "default_cache_config")]
31 pub cache: CacheConfig,
32
33 #[configurable(derived)]
34 #[serde(default)]
35 pub time_settings: Option<TimedCacheConfig>,
36}
37
38impl GenerateConfig for DedupeConfig {
39 fn generate_config() -> toml::Value {
40 toml::Value::try_from(Self {
41 fields: None,
42 cache: default_cache_config(),
43 time_settings: None,
44 })
45 .unwrap()
46 }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "dedupe")]
51impl TransformConfig for DedupeConfig {
52 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
53 if let Some(time_config) = &self.time_settings {
54 Ok(Transform::event_task(TimedDedupe::new(
55 self.cache.num_events,
56 fill_default_fields_match(self.fields.as_ref()),
57 time_config.clone(),
58 )))
59 } else {
60 Ok(Transform::event_task(Dedupe::new(
61 self.cache.num_events,
62 fill_default_fields_match(self.fields.as_ref()),
63 )))
64 }
65 }
66
67 fn input(&self) -> Input {
68 Input::log()
69 }
70
71 fn outputs(
72 &self,
73 _: &TransformContext,
74 input_definitions: &[(OutputId, schema::Definition)],
75 ) -> Vec<TransformOutput> {
76 vec![TransformOutput::new(
77 DataType::Log,
78 clone_input_definitions(input_definitions),
79 )]
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use std::{sync::Arc, time::Duration};
86
87 use tokio::sync::mpsc;
88 use tokio_stream::wrappers::ReceiverStream;
89 use vector_lib::{
90 config::{ComponentKey, OutputId},
91 lookup::lookup_v2::ConfigTargetPath,
92 };
93
94 use crate::{
95 config::schema::Definition,
96 event::{Event, LogEvent, ObjectMap, Value},
97 test_util::components::assert_transform_compliance,
98 transforms::{
99 dedupe::{
100 common::TimedCacheConfig,
101 config::{CacheConfig, DedupeConfig, FieldMatchConfig},
102 },
103 test::create_topology,
104 },
105 };
106
107 const TEST_SOURCE_COMPONENT_ID: &str = "in";
108 const TEST_UPSTREAM_COMPONENT_ID: &str = "transform";
109 const TEST_SOURCE_TYPE: &str = "unit_test_stream";
110
111 fn set_expected_metadata(event: &mut Event) {
112 event.set_source_id(Arc::new(ComponentKey::from(TEST_SOURCE_COMPONENT_ID)));
113 event.set_upstream_id(Arc::new(OutputId::from(TEST_UPSTREAM_COMPONENT_ID)));
114 event.set_source_type(TEST_SOURCE_TYPE);
115 event
117 .metadata_mut()
118 .set_schema_definition(&Arc::new(Definition::default_legacy_namespace()));
119 }
120
121 #[test]
122 fn generate_config() {
123 crate::test_util::test_generate_config::<DedupeConfig>();
124 }
125
126 const fn make_match_transform_config(
127 num_events: usize,
128 fields: Vec<ConfigTargetPath>,
129 ) -> DedupeConfig {
130 DedupeConfig {
131 cache: CacheConfig {
132 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
133 },
134 fields: Some(FieldMatchConfig::MatchFields(fields)),
135 time_settings: None,
136 }
137 }
138
139 fn make_ignore_transform_config(
140 num_events: usize,
141 given_fields: Vec<ConfigTargetPath>,
142 ) -> DedupeConfig {
143 let mut fields = vec!["message".into(), "timestamp".into()];
145 fields.extend(given_fields);
146
147 DedupeConfig {
148 cache: CacheConfig {
149 num_events: std::num::NonZeroUsize::new(num_events).expect("non-zero num_events"),
150 },
151 fields: Some(FieldMatchConfig::IgnoreFields(fields)),
152 time_settings: None,
153 }
154 }
155
156 #[tokio::test]
157 async fn dedupe_match_basic() {
158 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
159 basic(transform_config, "matched", "unmatched").await;
160 }
161
162 #[tokio::test]
163 async fn dedupe_ignore_basic() {
164 let transform_config = make_ignore_transform_config(5, vec!["unmatched".into()]);
165 basic(transform_config, "matched", "unmatched").await;
166 }
167
168 #[tokio::test]
169 async fn dedupe_ignore_with_metadata_field() {
170 let transform_config = make_ignore_transform_config(5, vec!["%ignored".into()]);
171 basic(transform_config, "matched", "%ignored").await;
172 }
173
174 async fn basic(transform_config: DedupeConfig, first_path: &str, second_path: &str) {
175 assert_transform_compliance(async {
176 let (tx, rx) = mpsc::channel(1);
177 let (topology, mut out) =
178 create_topology(ReceiverStream::new(rx), transform_config).await;
179
180 let mut event1 = Event::Log(LogEvent::from("message"));
181 event1.as_mut_log().insert(first_path, "some value");
182 event1.as_mut_log().insert(second_path, "another value");
183
184 let mut event2 = Event::Log(LogEvent::from("message"));
186 event2.as_mut_log().insert(first_path, "some value2");
187 event2.as_mut_log().insert(second_path, "another value");
188
189 let mut event3 = Event::Log(LogEvent::from("message"));
191 event3.as_mut_log().insert(first_path, "some value");
192 event3.as_mut_log().insert(second_path, "another value2");
193
194 tx.send(event1.clone()).await.unwrap();
196 let new_event = out.recv().await.unwrap();
197
198 set_expected_metadata(&mut event1);
199 assert_eq!(new_event, event1);
200
201 tx.send(event2.clone()).await.unwrap();
204 let new_event = out.recv().await.unwrap();
205
206 set_expected_metadata(&mut event2);
207 assert_eq!(new_event, event2);
208
209 tx.send(event3.clone()).await.unwrap();
211
212 drop(tx);
213 topology.stop().await;
214 assert_eq!(out.recv().await, None);
215 })
216 .await;
217 }
218
219 #[tokio::test]
220 async fn dedupe_match_field_name_matters() {
221 let transform_config =
222 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
223 field_name_matters(transform_config).await;
224 }
225
226 #[tokio::test]
227 async fn dedupe_ignore_field_name_matters() {
228 let transform_config = make_ignore_transform_config(5, vec![]);
229 field_name_matters(transform_config).await;
230 }
231
232 async fn field_name_matters(transform_config: DedupeConfig) {
233 assert_transform_compliance(async {
234 let (tx, rx) = mpsc::channel(1);
235 let (topology, mut out) =
236 create_topology(ReceiverStream::new(rx), transform_config).await;
237
238 let mut event1 = Event::Log(LogEvent::from("message"));
239 event1.as_mut_log().insert("matched1", "some value");
240
241 let mut event2 = Event::Log(LogEvent::from("message"));
242 event2.as_mut_log().insert("matched2", "some value");
243
244 tx.send(event1.clone()).await.unwrap();
246 let new_event = out.recv().await.unwrap();
247
248 set_expected_metadata(&mut event1);
249 assert_eq!(new_event, event1);
250
251 tx.send(event2.clone()).await.unwrap();
254 let new_event = out.recv().await.unwrap();
255
256 set_expected_metadata(&mut event2);
257 assert_eq!(new_event, event2);
258
259 drop(tx);
260 topology.stop().await;
261 assert_eq!(out.recv().await, None);
262 })
263 .await;
264 }
265
266 #[tokio::test]
267 async fn dedupe_match_field_order_irrelevant() {
268 let transform_config =
269 make_match_transform_config(5, vec!["matched1".into(), "matched2".into()]);
270 field_order_irrelevant(transform_config).await;
271 }
272
273 #[tokio::test]
274 async fn dedupe_ignore_field_order_irrelevant() {
275 let transform_config = make_ignore_transform_config(5, vec!["randomData".into()]);
276 field_order_irrelevant(transform_config).await;
277 }
278
279 async fn field_order_irrelevant(transform_config: DedupeConfig) {
283 assert_transform_compliance(async {
284 let (tx, rx) = mpsc::channel(1);
285 let (topology, mut out) =
286 create_topology(ReceiverStream::new(rx), transform_config).await;
287
288 let mut event1 = Event::Log(LogEvent::from("message"));
289 event1.as_mut_log().insert("matched1", "value1");
290 event1.as_mut_log().insert("matched2", "value2");
291
292 let mut event2 = Event::Log(LogEvent::from("message"));
294 event2.as_mut_log().insert("matched2", "value2");
295 event2.as_mut_log().insert("matched1", "value1");
296
297 tx.send(event1.clone()).await.unwrap();
299 let new_event = out.recv().await.unwrap();
300
301 set_expected_metadata(&mut event1);
302 assert_eq!(new_event, event1);
303
304 tx.send(event2).await.unwrap();
307
308 drop(tx);
309 topology.stop().await;
310 assert_eq!(out.recv().await, None);
311 })
312 .await;
313 }
314
315 #[tokio::test]
316 async fn dedupe_match_age_out() {
317 let transform_config = make_match_transform_config(1, vec!["matched".into()]);
319 age_out(transform_config).await;
320 }
321
322 #[tokio::test]
323 async fn dedupe_ignore_age_out() {
324 let transform_config = make_ignore_transform_config(1, vec![]);
326 age_out(transform_config).await;
327 }
328
329 async fn age_out(transform_config: DedupeConfig) {
331 assert_transform_compliance(async {
332 let (tx, rx) = mpsc::channel(1);
333 let (topology, mut out) =
334 create_topology(ReceiverStream::new(rx), transform_config).await;
335
336 let mut event1 = Event::Log(LogEvent::from("message"));
337 event1.as_mut_log().insert("matched", "some value");
338
339 let mut event2 = Event::Log(LogEvent::from("message"));
340 event2.as_mut_log().insert("matched", "some value2");
341
342 tx.send(event1.clone()).await.unwrap();
344 let new_event = out.recv().await.unwrap();
345
346 set_expected_metadata(&mut event1);
347 assert_eq!(new_event, event1);
348
349 tx.send(event2.clone()).await.unwrap();
352 let new_event = out.recv().await.unwrap();
353
354 set_expected_metadata(&mut event2);
355
356 assert_eq!(new_event, event2);
357
358 tx.send(event1.clone()).await.unwrap();
361 let new_event = out.recv().await.unwrap();
362
363 set_expected_metadata(&mut event1);
364 assert_eq!(new_event, event1);
365
366 drop(tx);
367 topology.stop().await;
368 assert_eq!(out.recv().await, None);
369 })
370 .await;
371 }
372
373 #[tokio::test]
374 async fn dedupe_match_timed_age_out() {
375 let transform_config = DedupeConfig {
377 time_settings: Some(TimedCacheConfig {
378 max_age_ms: Duration::from_millis(100),
379 refresh_on_drop: false,
380 }),
381 ..make_match_transform_config(5, vec!["matched".into()])
382 };
383 timed_age_out(transform_config).await;
384 }
385
386 #[tokio::test]
387 async fn dedupe_ignore_timed_age_out() {
388 let transform_config = DedupeConfig {
390 time_settings: Some(TimedCacheConfig {
391 max_age_ms: Duration::from_millis(100),
392 refresh_on_drop: false,
393 }),
394 ..make_ignore_transform_config(1, vec![])
395 };
396 timed_age_out(transform_config).await;
397 }
398
399 async fn timed_age_out(transform_config: DedupeConfig) {
401 assert_transform_compliance(async {
402 let (tx, rx) = mpsc::channel(1);
403 let (topology, mut out) =
404 create_topology(ReceiverStream::new(rx), transform_config).await;
405
406 let mut event1 = Event::Log(LogEvent::from("message"));
407 event1.as_mut_log().insert("matched", "some value");
408
409 tx.send(event1.clone()).await.unwrap();
411 let new_event = out.recv().await.unwrap();
412
413 set_expected_metadata(&mut event1);
414 assert_eq!(new_event, event1);
415
416 tx.send(event1.clone()).await.unwrap();
418
419 tokio::time::sleep(Duration::from_millis(101)).await;
420
421 tx.send(event1.clone()).await.unwrap();
423 let new_event = out.recv().await.unwrap();
424
425 set_expected_metadata(&mut event1);
426 assert_eq!(new_event, event1);
427
428 drop(tx);
429 topology.stop().await;
430 assert_eq!(out.recv().await, None);
431 })
432 .await;
433 }
434
435 #[tokio::test]
436 async fn dedupe_match_type_matching() {
437 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
438 type_matching(transform_config).await;
439 }
440
441 #[tokio::test]
442 async fn dedupe_ignore_type_matching() {
443 let transform_config = make_ignore_transform_config(5, vec![]);
444 type_matching(transform_config).await;
445 }
446
447 async fn type_matching(transform_config: DedupeConfig) {
451 assert_transform_compliance(async {
452 let (tx, rx) = mpsc::channel(1);
453 let (topology, mut out) =
454 create_topology(ReceiverStream::new(rx), transform_config).await;
455
456 let mut event1 = Event::Log(LogEvent::from("message"));
457 event1.as_mut_log().insert("matched", "123");
458
459 let mut event2 = Event::Log(LogEvent::from("message"));
460 event2.as_mut_log().insert("matched", 123);
461
462 tx.send(event1.clone()).await.unwrap();
464 let new_event = out.recv().await.unwrap();
465
466 set_expected_metadata(&mut event1);
467 assert_eq!(new_event, event1);
468
469 tx.send(event2.clone()).await.unwrap();
472 let new_event = out.recv().await.unwrap();
473
474 set_expected_metadata(&mut event2);
475 assert_eq!(new_event, event2);
476
477 drop(tx);
478 topology.stop().await;
479 assert_eq!(out.recv().await, None);
480 })
481 .await;
482 }
483
484 #[tokio::test]
485 async fn dedupe_match_type_matching_nested_objects() {
486 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
487 type_matching_nested_objects(transform_config).await;
488 }
489
490 #[tokio::test]
491 async fn dedupe_ignore_type_matching_nested_objects() {
492 let transform_config = make_ignore_transform_config(5, vec![]);
493 type_matching_nested_objects(transform_config).await;
494 }
495
496 async fn type_matching_nested_objects(transform_config: DedupeConfig) {
500 assert_transform_compliance(async {
501 let (tx, rx) = mpsc::channel(1);
502 let (topology, mut out) =
503 create_topology(ReceiverStream::new(rx), transform_config).await;
504
505 let mut map1 = ObjectMap::new();
506 map1.insert("key".into(), "123".into());
507 let mut event1 = Event::Log(LogEvent::from("message"));
508 event1.as_mut_log().insert("matched", map1);
509
510 let mut map2 = ObjectMap::new();
511 map2.insert("key".into(), 123.into());
512 let mut event2 = Event::Log(LogEvent::from("message"));
513 event2.as_mut_log().insert("matched", map2);
514
515 tx.send(event1.clone()).await.unwrap();
517 let new_event = out.recv().await.unwrap();
518
519 set_expected_metadata(&mut event1);
520 assert_eq!(new_event, event1);
521
522 tx.send(event2.clone()).await.unwrap();
525 let new_event = out.recv().await.unwrap();
526
527 set_expected_metadata(&mut event2);
528 assert_eq!(new_event, event2);
529
530 drop(tx);
531 topology.stop().await;
532 assert_eq!(out.recv().await, None);
533 })
534 .await;
535 }
536
537 #[tokio::test]
538 async fn dedupe_match_null_vs_missing() {
539 let transform_config = make_match_transform_config(5, vec!["matched".into()]);
540 ignore_vs_missing(transform_config).await;
541 }
542
543 #[tokio::test]
544 async fn dedupe_ignore_null_vs_missing() {
545 let transform_config = make_ignore_transform_config(5, vec![]);
546 ignore_vs_missing(transform_config).await;
547 }
548
549 async fn ignore_vs_missing(transform_config: DedupeConfig) {
551 assert_transform_compliance(async {
552 let (tx, rx) = mpsc::channel(1);
553 let (topology, mut out) =
554 create_topology(ReceiverStream::new(rx), transform_config).await;
555
556 let mut event1 = Event::Log(LogEvent::from("message"));
557 event1.as_mut_log().insert("matched", Value::Null);
558
559 let mut event2 = Event::Log(LogEvent::from("message"));
560
561 tx.send(event1.clone()).await.unwrap();
563 let new_event = out.recv().await.unwrap();
564
565 set_expected_metadata(&mut event1);
566 assert_eq!(new_event, event1);
567
568 tx.send(event2.clone()).await.unwrap();
571 let new_event = out.recv().await.unwrap();
572
573 set_expected_metadata(&mut event2);
574 assert_eq!(new_event, event2);
575
576 drop(tx);
577 topology.stop().await;
578 assert_eq!(out.recv().await, None);
579 })
580 .await;
581 }
582}