vector/transforms/dedupe/common.rs
1use std::{num::NonZeroUsize, time::Duration};
2
3use serde_with::serde_as;
4use vector_lib::{configurable::configurable_component, lookup::lookup_v2::ConfigTargetPath};
5
6use crate::config::log_schema;
7
8/// Caching configuration for deduplication.
9#[configurable_component]
10#[derive(Clone, Debug)]
11#[serde(deny_unknown_fields)]
12pub struct CacheConfig {
13 /// Number of events to cache and use for comparing incoming events to previously seen events.
14 pub num_events: NonZeroUsize,
15}
16
17/// Configuration for time based cache.
18#[serde_as]
19#[configurable_component]
20#[derive(Clone, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct TimedCacheConfig {
23 /// Maximum age of items in deduplication cache, before duplicates are allowed again.
24 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
25 pub max_age_ms: Duration,
26
27 /// Set to true if dropped duplicates should refresh the age timer.
28 #[serde(default = "crate::serde::default_false")]
29 pub refresh_on_drop: bool,
30}
31
32pub const fn default_cache_config() -> CacheConfig {
33 CacheConfig {
34 num_events: NonZeroUsize::new(5000).expect("static non-zero number"),
35 }
36}
37
38/// Options to control what fields to match against.
39///
40/// When no field matching configuration is specified, events are matched using the `timestamp`,
41/// `host`, and `message` fields from an event. The specific field names used are those set in
42/// the global [`log schema`][global_log_schema] configuration.
43///
44/// [global_log_schema]: https://vector.dev/docs/reference/configuration/global-options/#log_schema
45// TODO: This enum renders correctly in terms of providing equivalent Cue output when using the
46// machine-generated stuff vs the previously-hand-written Cue... but what it _doesn't_ have in the
47// machine-generated output is any sort of blurb that these "fields" (`match` and `ignore`) are
48// actually mutually exclusive.
49//
50// We know that to be the case when we're generating the output from the configuration schema, so we
51// need to emit something in that output to indicate as much, and further, actually use it on the
52// Cue side to add some sort of boilerplate about them being mutually exclusive, etc.
53#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub enum FieldMatchConfig {
57 /// Matches events using only the specified fields.
58 #[serde(rename = "match")]
59 MatchFields(
60 #[configurable(metadata(
61 docs::examples = "field1",
62 docs::examples = "parent.child_field"
63 ))]
64 Vec<ConfigTargetPath>,
65 ),
66
67 /// Matches events using all fields except for the ignored ones.
68 #[serde(rename = "ignore")]
69 IgnoreFields(
70 #[configurable(metadata(
71 docs::examples = "field1",
72 docs::examples = "parent.child_field",
73 docs::examples = "host",
74 docs::examples = "hostname"
75 ))]
76 Vec<ConfigTargetPath>,
77 ),
78}
79
80pub fn fill_default_fields_match(maybe_fields: Option<&FieldMatchConfig>) -> FieldMatchConfig {
81 // We provide a default value on `fields`, based on `default_match_fields`, in order to
82 // drive the configuration schema and documentation. Since we're getting the values from the
83 // configured log schema, though, the default field values shown in the configuration
84 // schema/documentation may not be the same as an actual user's Vector configuration.
85 match maybe_fields {
86 Some(FieldMatchConfig::MatchFields(x)) => FieldMatchConfig::MatchFields(x.clone()),
87 Some(FieldMatchConfig::IgnoreFields(y)) => FieldMatchConfig::IgnoreFields(y.clone()),
88 None => FieldMatchConfig::MatchFields(default_match_fields()),
89 }
90}
91
92// TODO: Add support to the `configurable(metadata(..))` helper attribute for passing an expression
93// that will provide the value for the metadata attribute's value, as well as letting all metadata
94// attributes have whatever value they want, so long as it can be serialized by `serde_json`.
95//
96// Once we have that, we could curry these default values (and others) via a metadata attribute
97// instead of via `serde(default = "...")` to allow for displaying default values in the
98// configuration schema _without_ actually changing how a field is populated during deserialization.
99//
100// See the comment in `fill_default_fields_match` for more information on why this is required.
101//
102// TODO: These values are used even for events with the new "Vector" log namespace.
103// These aren't great defaults in that case, but hard-coding isn't much better since the
104// structure can vary significantly. This should probably either become a required field
105// in the future, or maybe the "semantic meaning" can be utilized here.
106fn default_match_fields() -> Vec<ConfigTargetPath> {
107 let mut fields = Vec::new();
108 if let Some(message_key) = log_schema().message_key_target_path() {
109 fields.push(ConfigTargetPath(message_key.clone()));
110 }
111 if let Some(host_key) = log_schema().host_key_target_path() {
112 fields.push(ConfigTargetPath(host_key.clone()));
113 }
114 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
115 fields.push(ConfigTargetPath(timestamp_key.clone()));
116 }
117 fields
118}