vector/transforms/sample/
config.rs

1use snafu::Snafu;
2use vector_lib::{
3    config::{LegacyKey, LogNamespace},
4    configurable::configurable_component,
5    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
6};
7use vrl::value::Kind;
8
9use super::transform::{Sample, SampleMode};
10use crate::{
11    conditions::AnyCondition,
12    config::{
13        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14        TransformOutput,
15    },
16    schema,
17    template::Template,
18    transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum SampleError {
23    // Errors from `determine_sample_mode`
24    #[snafu(display(
25        "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
26    ))]
27    InvalidRatio { ratio: f64 },
28
29    #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
30    InvalidRate,
31
32    #[snafu(display(
33        "Exactly one value must be provided for either 'rate' or 'ratio', but not both"
34    ))]
35    InvalidConfiguration,
36}
37
38/// Configuration for the `sample` transform.
39#[configurable_component(transform(
40    "sample",
41    "Sample events from an event stream based on supplied criteria and at a configurable rate."
42))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct SampleConfig {
46    /// The rate at which events are forwarded, expressed as `1/N`.
47    ///
48    /// For example, `rate = 1500` means 1 out of every 1500 events are forwarded and the rest are
49    /// dropped. This differs from `ratio` which allows more precise control over the number of events
50    /// retained and values greater than 1/2. It is an error to provide a value for both `rate` and `ratio`.
51    #[configurable(metadata(docs::examples = 1500))]
52    pub rate: Option<u64>,
53
54    /// The rate at which events are forwarded, expressed as a percentage
55    ///
56    /// For example, `ratio = .13` means that 13% out of all events on the stream are forwarded and
57    /// the rest are dropped. This differs from `rate` allowing the configuration of a higher
58    /// precision value and also the ability to retain values of greater than 50% of all events. It is
59    /// an error to provide a value for both `rate` and `ratio`.
60    #[configurable(metadata(docs::examples = 0.13))]
61    #[configurable(validation(range(min = 0.0, max = 1.0)))]
62    pub ratio: Option<f64>,
63
64    /// The name of the field whose value is hashed to determine if the event should be
65    /// sampled.
66    ///
67    /// Each unique value for the key creates a bucket of related events to be sampled together
68    /// and the rate is applied to the buckets themselves to sample `1/N` buckets.  The overall rate
69    /// of sampling may differ from the configured one if values in the field are not uniformly
70    /// distributed. If left unspecified, or if the event doesn’t have `key_field`, then the
71    /// event is sampled independently.
72    ///
73    /// This can be useful to, for example, ensure that all logs for a given transaction are
74    /// sampled together, but that overall `1/N` transactions are sampled.
75    #[configurable(metadata(docs::examples = "message"))]
76    pub key_field: Option<String>,
77
78    /// The event key in which the sample rate is stored. If set to an empty string, the sample rate will not be added to the event.
79    #[configurable(metadata(docs::examples = "sample_rate"))]
80    #[serde(default = "default_sample_rate_key")]
81    pub sample_rate_key: OptionalValuePath,
82
83    /// The value to group events into separate buckets to be sampled independently.
84    ///
85    /// If left unspecified, or if the event doesn't have `group_by`, then the event is not
86    /// sampled separately.
87    #[configurable(metadata(
88        docs::examples = "{{ service }}",
89        docs::examples = "{{ hostname }}-{{ service }}"
90    ))]
91    pub group_by: Option<Template>,
92
93    /// A logical condition used to exclude events from sampling.
94    pub exclude: Option<AnyCondition>,
95}
96
97impl SampleConfig {
98    fn sample_rate(&self) -> Result<SampleMode, SampleError> {
99        match (self.rate, self.ratio) {
100            (None, Some(ratio)) => {
101                if ratio <= 0.0 {
102                    Err(SampleError::InvalidRatio { ratio })
103                } else {
104                    Ok(SampleMode::new_ratio(ratio))
105                }
106            }
107            (Some(rate), None) => {
108                if rate == 0 {
109                    Err(SampleError::InvalidRate)
110                } else {
111                    Ok(SampleMode::new_rate(rate))
112                }
113            }
114            _ => Err(SampleError::InvalidConfiguration),
115        }
116    }
117}
118
119impl GenerateConfig for SampleConfig {
120    fn generate_config() -> toml::Value {
121        toml::Value::try_from(Self {
122            rate: None,
123            ratio: Some(0.1),
124            key_field: None,
125            group_by: None,
126            exclude: None::<AnyCondition>,
127            sample_rate_key: default_sample_rate_key(),
128        })
129        .unwrap()
130    }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "sample")]
135impl TransformConfig for SampleConfig {
136    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
137        Ok(Transform::function(Sample::new(
138            Self::NAME.to_string(),
139            self.sample_rate()?,
140            self.key_field.clone(),
141            self.group_by.clone(),
142            self.exclude
143                .as_ref()
144                .map(|condition| condition.build(&context.enrichment_tables))
145                .transpose()?,
146            self.sample_rate_key.clone(),
147        )))
148    }
149
150    fn input(&self) -> Input {
151        Input::new(DataType::Log | DataType::Trace)
152    }
153
154    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
155        self.sample_rate()
156            .map(|_| ())
157            .map_err(|e| vec![e.to_string()])
158    }
159
160    fn outputs(
161        &self,
162        _: vector_lib::enrichment::TableRegistry,
163        input_definitions: &[(OutputId, schema::Definition)],
164        _: LogNamespace,
165    ) -> Vec<TransformOutput> {
166        vec![TransformOutput::new(
167            DataType::Log | DataType::Trace,
168            input_definitions
169                .iter()
170                .map(|(output, definition)| {
171                    (
172                        output.clone(),
173                        definition.clone().with_source_metadata(
174                            SampleConfig::NAME,
175                            Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
176                            &owned_value_path!("sample_rate"),
177                            Kind::bytes(),
178                            None,
179                        ),
180                    )
181                })
182                .collect(),
183        )]
184    }
185}
186
187pub fn default_sample_rate_key() -> OptionalValuePath {
188    OptionalValuePath::from(owned_value_path!("sample_rate"))
189}
190
191#[cfg(test)]
192mod tests {
193    use crate::transforms::sample::config::SampleConfig;
194
195    #[test]
196    fn generate_config() {
197        crate::test_util::test_generate_config::<SampleConfig>();
198    }
199}