vector/transforms/dedupe/
common.rs

1use std::{num::NonZeroUsize, time::Duration};
2
3use serde_with::serde_as;
4use vector_lib::{configurable::configurable_component, lookup::lookup_v2::ConfigTargetPath};
5
6use crate::config::log_schema;
7
8/// Caching configuration for deduplication.
9#[configurable_component]
10#[derive(Clone, Debug)]
11#[serde(deny_unknown_fields)]
12pub struct CacheConfig {
13    /// Number of events to cache and use for comparing incoming events to previously seen events.
14    pub num_events: NonZeroUsize,
15}
16
17/// Configuration for time based cache.
18#[serde_as]
19#[configurable_component]
20#[derive(Clone, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct TimedCacheConfig {
23    /// Maximum age of items in deduplication cache, before duplicates are allowed again.
24    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
25    pub max_age_ms: Duration,
26
27    /// Set to true if dropped duplicates should refresh the age timer.
28    #[serde(default = "crate::serde::default_false")]
29    pub refresh_on_drop: bool,
30}
31
32pub const fn default_cache_config() -> CacheConfig {
33    CacheConfig {
34        num_events: NonZeroUsize::new(5000).expect("static non-zero number"),
35    }
36}
37
38/// Options to control what fields to match against.
39///
40/// When no field matching configuration is specified, events are matched using the `timestamp`,
41/// `host`, and `message` fields from an event. The specific field names used are those set in
42/// the global [`log schema`][global_log_schema] configuration.
43///
44/// [global_log_schema]: https://vector.dev/docs/reference/configuration/global-options/#log_schema
45// TODO: This enum renders correctly in terms of providing equivalent Cue output when using the
46// machine-generated stuff vs the previously-hand-written Cue... but what it _doesn't_ have in the
47// machine-generated output is any sort of blurb that these "fields" (`match` and `ignore`) are
48// actually mutually exclusive.
49//
50// We know that to be the case when we're generating the output from the configuration schema, so we
51// need to emit something in that output to indicate as much, and further, actually use it on the
52// Cue side to add some sort of boilerplate about them being mutually exclusive, etc.
53#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub enum FieldMatchConfig {
57    /// Matches events using only the specified fields.
58    #[serde(rename = "match")]
59    MatchFields(
60        #[configurable(metadata(
61            docs::examples = "field1",
62            docs::examples = "parent.child_field"
63        ))]
64        Vec<ConfigTargetPath>,
65    ),
66
67    /// Matches events using all fields except for the ignored ones.
68    #[serde(rename = "ignore")]
69    IgnoreFields(
70        #[configurable(metadata(
71            docs::examples = "field1",
72            docs::examples = "parent.child_field",
73            docs::examples = "host",
74            docs::examples = "hostname"
75        ))]
76        Vec<ConfigTargetPath>,
77    ),
78}
79
80pub fn fill_default_fields_match(maybe_fields: Option<&FieldMatchConfig>) -> FieldMatchConfig {
81    // We provide a default value on `fields`, based on `default_match_fields`, in order to
82    // drive the configuration schema and documentation. Since we're getting the values from the
83    // configured log schema, though, the default field values shown in the configuration
84    // schema/documentation may not be the same as an actual user's Vector configuration.
85    match maybe_fields {
86        Some(FieldMatchConfig::MatchFields(x)) => FieldMatchConfig::MatchFields(x.clone()),
87        Some(FieldMatchConfig::IgnoreFields(y)) => FieldMatchConfig::IgnoreFields(y.clone()),
88        None => FieldMatchConfig::MatchFields(default_match_fields()),
89    }
90}
91
92// TODO: Add support to the `configurable(metadata(..))` helper attribute for passing an expression
93// that will provide the value for the metadata attribute's value, as well as letting all metadata
94// attributes have whatever value they want, so long as it can be serialized by `serde_json`.
95//
96// Once we have that, we could curry these default values (and others) via a metadata attribute
97// instead of via `serde(default = "...")` to allow for displaying default values in the
98// configuration schema _without_ actually changing how a field is populated during deserialization.
99//
100// See the comment in `fill_default_fields_match` for more information on why this is required.
101//
102// TODO: These values are used even for events with the new "Vector" log namespace.
103//   These aren't great defaults in that case, but hard-coding isn't much better since the
104//   structure can vary significantly. This should probably either become a required field
105//   in the future, or maybe the "semantic meaning" can be utilized here.
106fn default_match_fields() -> Vec<ConfigTargetPath> {
107    let mut fields = Vec::new();
108    if let Some(message_key) = log_schema().message_key_target_path() {
109        fields.push(ConfigTargetPath(message_key.clone()));
110    }
111    if let Some(host_key) = log_schema().host_key_target_path() {
112        fields.push(ConfigTargetPath(host_key.clone()));
113    }
114    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
115        fields.push(ConfigTargetPath(timestamp_key.clone()));
116    }
117    fields
118}