vector/transforms/sample/
config.rs

1use snafu::Snafu;
2use vector_lib::config::{LegacyKey, LogNamespace};
3use vector_lib::configurable::configurable_component;
4use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path};
5use vrl::value::Kind;
6
7use crate::{
8    conditions::AnyCondition,
9    config::{
10        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11        TransformOutput,
12    },
13    schema,
14    template::Template,
15    transforms::Transform,
16};
17
18use super::transform::{Sample, SampleMode};
19
20#[derive(Debug, Snafu)]
21pub enum SampleError {
22    // Errors from `determine_sample_mode`
23    #[snafu(display(
24        "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
25    ))]
26    InvalidRatio { ratio: f64 },
27
28    #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
29    InvalidRate,
30
31    #[snafu(display(
32        "Exactly one value must be provided for either 'rate' or 'ratio', but not both"
33    ))]
34    InvalidConfiguration,
35}
36
37/// Configuration for the `sample` transform.
38#[configurable_component(transform(
39    "sample",
40    "Sample events from an event stream based on supplied criteria and at a configurable rate."
41))]
42#[derive(Clone, Debug)]
43#[serde(deny_unknown_fields)]
44pub struct SampleConfig {
45    /// The rate at which events are forwarded, expressed as `1/N`.
46    ///
47    /// For example, `rate = 1500` means 1 out of every 1500 events are forwarded and the rest are
48    /// dropped. This differs from `ratio` which allows more precise control over the number of events
49    /// retained and values greater than 1/2. It is an error to provide a value for both `rate` and `ratio`.
50    #[configurable(metadata(docs::examples = 1500))]
51    pub rate: Option<u64>,
52
53    /// The rate at which events are forwarded, expressed as a percentage
54    ///
55    /// For example, `ratio = .13` means that 13% out of all events on the stream are forwarded and
56    /// the rest are dropped. This differs from `rate` allowing the configuration of a higher
57    /// precision value and also the ability to retain values of greater than 50% of all events. It is
58    /// an error to provide a value for both `rate` and `ratio`.
59    #[configurable(metadata(docs::examples = 0.13))]
60    #[configurable(validation(range(min = 0.0, max = 1.0)))]
61    pub ratio: Option<f64>,
62
63    /// The name of the field whose value is hashed to determine if the event should be
64    /// sampled.
65    ///
66    /// Each unique value for the key creates a bucket of related events to be sampled together
67    /// and the rate is applied to the buckets themselves to sample `1/N` buckets.  The overall rate
68    /// of sampling may differ from the configured one if values in the field are not uniformly
69    /// distributed. If left unspecified, or if the event doesn’t have `key_field`, then the
70    /// event is sampled independently.
71    ///
72    /// This can be useful to, for example, ensure that all logs for a given transaction are
73    /// sampled together, but that overall `1/N` transactions are sampled.
74    #[configurable(metadata(docs::examples = "message"))]
75    pub key_field: Option<String>,
76
77    /// The event key in which the sample rate is stored. If set to an empty string, the sample rate will not be added to the event.
78    #[configurable(metadata(docs::examples = "sample_rate"))]
79    #[serde(default = "default_sample_rate_key")]
80    pub sample_rate_key: OptionalValuePath,
81
82    /// The value to group events into separate buckets to be sampled independently.
83    ///
84    /// If left unspecified, or if the event doesn't have `group_by`, then the event is not
85    /// sampled separately.
86    #[configurable(metadata(
87        docs::examples = "{{ service }}",
88        docs::examples = "{{ hostname }}-{{ service }}"
89    ))]
90    pub group_by: Option<Template>,
91
92    /// A logical condition used to exclude events from sampling.
93    pub exclude: Option<AnyCondition>,
94}
95
96impl SampleConfig {
97    fn sample_rate(&self) -> Result<SampleMode, SampleError> {
98        match (self.rate, self.ratio) {
99            (None, Some(ratio)) => {
100                if ratio <= 0.0 {
101                    Err(SampleError::InvalidRatio { ratio })
102                } else {
103                    Ok(SampleMode::new_ratio(ratio))
104                }
105            }
106            (Some(rate), None) => {
107                if rate == 0 {
108                    Err(SampleError::InvalidRate)
109                } else {
110                    Ok(SampleMode::new_rate(rate))
111                }
112            }
113            _ => Err(SampleError::InvalidConfiguration),
114        }
115    }
116}
117
118impl GenerateConfig for SampleConfig {
119    fn generate_config() -> toml::Value {
120        toml::Value::try_from(Self {
121            rate: None,
122            ratio: Some(0.1),
123            key_field: None,
124            group_by: None,
125            exclude: None::<AnyCondition>,
126            sample_rate_key: default_sample_rate_key(),
127        })
128        .unwrap()
129    }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "sample")]
134impl TransformConfig for SampleConfig {
135    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
136        Ok(Transform::function(Sample::new(
137            Self::NAME.to_string(),
138            self.sample_rate()?,
139            self.key_field.clone(),
140            self.group_by.clone(),
141            self.exclude
142                .as_ref()
143                .map(|condition| condition.build(&context.enrichment_tables))
144                .transpose()?,
145            self.sample_rate_key.clone(),
146        )))
147    }
148
149    fn input(&self) -> Input {
150        Input::new(DataType::Log | DataType::Trace)
151    }
152
153    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
154        self.sample_rate()
155            .map(|_| ())
156            .map_err(|e| vec![e.to_string()])
157    }
158
159    fn outputs(
160        &self,
161        _: vector_lib::enrichment::TableRegistry,
162        input_definitions: &[(OutputId, schema::Definition)],
163        _: LogNamespace,
164    ) -> Vec<TransformOutput> {
165        vec![TransformOutput::new(
166            DataType::Log | DataType::Trace,
167            input_definitions
168                .iter()
169                .map(|(output, definition)| {
170                    (
171                        output.clone(),
172                        definition.clone().with_source_metadata(
173                            SampleConfig::NAME,
174                            Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
175                            &owned_value_path!("sample_rate"),
176                            Kind::bytes(),
177                            None,
178                        ),
179                    )
180                })
181                .collect(),
182        )]
183    }
184}
185
186pub fn default_sample_rate_key() -> OptionalValuePath {
187    OptionalValuePath::from(owned_value_path!("sample_rate"))
188}
189
190#[cfg(test)]
191mod tests {
192    use crate::transforms::sample::config::SampleConfig;
193
194    #[test]
195    fn generate_config() {
196        crate::test_util::test_generate_config::<SampleConfig>();
197    }
198}