vector/transforms/sample/
config.rs1use snafu::Snafu;
2use vector_lib::{
3    config::{LegacyKey, LogNamespace},
4    configurable::configurable_component,
5    lookup::{lookup_v2::OptionalValuePath, owned_value_path},
6};
7use vrl::value::Kind;
8
9use super::transform::{Sample, SampleMode};
10use crate::{
11    conditions::AnyCondition,
12    config::{
13        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
14        TransformOutput,
15    },
16    schema,
17    template::Template,
18    transforms::Transform,
19};
20
21#[derive(Debug, Snafu)]
22pub enum SampleError {
23    #[snafu(display(
25        "Only positive, non-zero numbers are allowed values for `ratio`, value: {ratio}"
26    ))]
27    InvalidRatio { ratio: f64 },
28
29    #[snafu(display("Only non-zero numbers are allowed values for `rate`"))]
30    InvalidRate,
31
32    #[snafu(display(
33        "Exactly one value must be provided for either 'rate' or 'ratio', but not both"
34    ))]
35    InvalidConfiguration,
36}
37
38#[configurable_component(transform(
40    "sample",
41    "Sample events from an event stream based on supplied criteria and at a configurable rate."
42))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct SampleConfig {
46    #[configurable(metadata(docs::examples = 1500))]
52    pub rate: Option<u64>,
53
54    #[configurable(metadata(docs::examples = 0.13))]
61    #[configurable(validation(range(min = 0.0, max = 1.0)))]
62    pub ratio: Option<f64>,
63
64    #[configurable(metadata(docs::examples = "message"))]
76    pub key_field: Option<String>,
77
78    #[configurable(metadata(docs::examples = "sample_rate"))]
80    #[serde(default = "default_sample_rate_key")]
81    pub sample_rate_key: OptionalValuePath,
82
83    #[configurable(metadata(
88        docs::examples = "{{ service }}",
89        docs::examples = "{{ hostname }}-{{ service }}"
90    ))]
91    pub group_by: Option<Template>,
92
93    pub exclude: Option<AnyCondition>,
95}
96
97impl SampleConfig {
98    fn sample_rate(&self) -> Result<SampleMode, SampleError> {
99        match (self.rate, self.ratio) {
100            (None, Some(ratio)) => {
101                if ratio <= 0.0 {
102                    Err(SampleError::InvalidRatio { ratio })
103                } else {
104                    Ok(SampleMode::new_ratio(ratio))
105                }
106            }
107            (Some(rate), None) => {
108                if rate == 0 {
109                    Err(SampleError::InvalidRate)
110                } else {
111                    Ok(SampleMode::new_rate(rate))
112                }
113            }
114            _ => Err(SampleError::InvalidConfiguration),
115        }
116    }
117}
118
119impl GenerateConfig for SampleConfig {
120    fn generate_config() -> toml::Value {
121        toml::Value::try_from(Self {
122            rate: None,
123            ratio: Some(0.1),
124            key_field: None,
125            group_by: None,
126            exclude: None::<AnyCondition>,
127            sample_rate_key: default_sample_rate_key(),
128        })
129        .unwrap()
130    }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "sample")]
135impl TransformConfig for SampleConfig {
136    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
137        Ok(Transform::function(Sample::new(
138            Self::NAME.to_string(),
139            self.sample_rate()?,
140            self.key_field.clone(),
141            self.group_by.clone(),
142            self.exclude
143                .as_ref()
144                .map(|condition| condition.build(&context.enrichment_tables))
145                .transpose()?,
146            self.sample_rate_key.clone(),
147        )))
148    }
149
150    fn input(&self) -> Input {
151        Input::new(DataType::Log | DataType::Trace)
152    }
153
154    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
155        self.sample_rate()
156            .map(|_| ())
157            .map_err(|e| vec![e.to_string()])
158    }
159
160    fn outputs(
161        &self,
162        _: vector_lib::enrichment::TableRegistry,
163        input_definitions: &[(OutputId, schema::Definition)],
164        _: LogNamespace,
165    ) -> Vec<TransformOutput> {
166        vec![TransformOutput::new(
167            DataType::Log | DataType::Trace,
168            input_definitions
169                .iter()
170                .map(|(output, definition)| {
171                    (
172                        output.clone(),
173                        definition.clone().with_source_metadata(
174                            SampleConfig::NAME,
175                            Some(LegacyKey::Overwrite(owned_value_path!("sample_rate"))),
176                            &owned_value_path!("sample_rate"),
177                            Kind::bytes(),
178                            None,
179                        ),
180                    )
181                })
182                .collect(),
183        )]
184    }
185}
186
187pub fn default_sample_rate_key() -> OptionalValuePath {
188    OptionalValuePath::from(owned_value_path!("sample_rate"))
189}
190
191#[cfg(test)]
192mod tests {
193    use crate::transforms::sample::config::SampleConfig;
194
195    #[test]
196    fn generate_config() {
197        crate::test_util::test_generate_config::<SampleConfig>();
198    }
199}