vector/transforms/sample/
transform.rs

1use std::{borrow::Cow, collections::HashMap, fmt};
2
3use vector_lib::{
4    config::LegacyKey,
5    lookup::{OwnedTargetPath, lookup_v2::OptionalValuePath},
6};
7
8use crate::{
9    conditions::Condition,
10    event::Event,
11    internal_events::SampleEventDiscarded,
12    sinks::prelude::TemplateRenderingError,
13    template::Template,
14    transforms::{FunctionTransform, OutputBuffer},
15};
16
17/// Exists only for backwards compatability purposes so that the value of sample_rate_key is
18/// consistent after the internal implementation of the Sample class was modified to work in terms
19/// of percentages
20#[derive(Clone, Debug)]
21pub enum SampleMode {
22    Rate {
23        rate: u64,
24        counters: HashMap<Option<String>, u64>,
25    },
26    Ratio {
27        ratio: f64,
28        values: HashMap<Option<String>, f64>,
29        hash_ratio_threshold: u64,
30    },
31}
32
33impl SampleMode {
34    pub fn new_rate(rate: u64) -> Self {
35        Self::Rate {
36            rate,
37            counters: HashMap::default(),
38        }
39    }
40
41    pub fn new_ratio(ratio: f64) -> Self {
42        Self::Ratio {
43            ratio,
44            values: HashMap::default(),
45            // Supports the 'key_field' option, assuming an equal distribution of values for a given
46            // field, hashing its contents this component should output events according to the
47            // configured ratio.
48            //
49            // To do one option would be to convert the hash to a number between 0 and 1 and compare
50            // to the ratio. However to address issues with precision, here the ratio is scaled to
51            // meet the width of the type of the hash.
52            hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
53        }
54    }
55
56    fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool {
57        let threshold_exceeded = match self {
58            Self::Rate { rate, counters } => {
59                let counter_value = counters.entry(group_by_key.clone()).or_default();
60                let old_counter_value = *counter_value;
61                *counter_value += 1;
62                old_counter_value % *rate == 0
63            }
64            Self::Ratio { ratio, values, .. } => {
65                let value = values.entry(group_by_key.clone()).or_insert(1.0 - *ratio);
66                let increment: f64 = *value + *ratio;
67                *value = if increment >= 1.0 {
68                    increment - 1.0
69                } else {
70                    increment
71                };
72                increment >= 1.0
73            }
74        };
75        if let Some(value) = value {
76            self.hash_within_ratio(value.as_bytes())
77        } else {
78            threshold_exceeded
79        }
80    }
81
82    fn hash_within_ratio(&self, value: &[u8]) -> bool {
83        let hash = seahash::hash(value);
84        match self {
85            Self::Rate { rate, .. } => hash % rate == 0,
86            Self::Ratio {
87                hash_ratio_threshold,
88                ..
89            } => hash <= *hash_ratio_threshold,
90        }
91    }
92}
93
94impl fmt::Display for SampleMode {
95    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96        // Avoids the print of an additional '.0' which was not performed in the previous
97        // implementation
98        match self {
99            Self::Rate { rate, .. } => write!(f, "{rate}"),
100            Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
101        }
102    }
103}
104
105#[derive(Clone)]
106pub struct Sample {
107    name: String,
108    rate: SampleMode,
109    key_field: Option<String>,
110    group_by: Option<Template>,
111    exclude: Option<Condition>,
112    sample_rate_key: OptionalValuePath,
113}
114
115impl Sample {
116    // This function is dead code when the feature flag `transforms-impl-sample` is specified but not
117    // `transforms-sample`.
118    #![allow(dead_code)]
119    pub const fn new(
120        name: String,
121        rate: SampleMode,
122        key_field: Option<String>,
123        group_by: Option<Template>,
124        exclude: Option<Condition>,
125        sample_rate_key: OptionalValuePath,
126    ) -> Self {
127        Self {
128            name,
129            rate,
130            key_field,
131            group_by,
132            exclude,
133            sample_rate_key,
134        }
135    }
136
137    #[cfg(test)]
138    pub fn ratio(&self) -> f64 {
139        match self.rate {
140            SampleMode::Rate { rate, .. } => 1.0f64 / rate as f64,
141            SampleMode::Ratio { ratio, .. } => ratio,
142        }
143    }
144}
145
146impl FunctionTransform for Sample {
147    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
148        let mut event = {
149            if let Some(condition) = self.exclude.as_ref() {
150                let (result, event) = condition.check(event);
151                if result {
152                    output.push(event);
153                    return;
154                } else {
155                    event
156                }
157            } else {
158                event
159            }
160        };
161
162        let value = self
163            .key_field
164            .as_ref()
165            .and_then(|key_field| match &event {
166                Event::Log(event) => event
167                    .parse_path_and_get_value(key_field.as_str())
168                    .ok()
169                    .flatten(),
170                Event::Trace(event) => event
171                    .parse_path_and_get_value(key_field.as_str())
172                    .ok()
173                    .flatten(),
174                Event::Metric(_) => panic!("component can never receive metric events"),
175            })
176            .map(|v| v.to_string_lossy());
177
178        // Fetch actual field value if group_by option is set.
179        let group_by_key = self.group_by.as_ref().and_then(|group_by| match &event {
180            Event::Log(event) => group_by
181                .render_string(event)
182                .map_err(|error| {
183                    emit!(TemplateRenderingError {
184                        error,
185                        field: Some("group_by"),
186                        drop_event: false,
187                    })
188                })
189                .ok(),
190            Event::Trace(event) => group_by
191                .render_string(event)
192                .map_err(|error| {
193                    emit!(TemplateRenderingError {
194                        error,
195                        field: Some("group_by"),
196                        drop_event: false,
197                    })
198                })
199                .ok(),
200            Event::Metric(_) => panic!("component can never receive metric events"),
201        });
202
203        let should_sample = self.rate.increment(&group_by_key, &value);
204        if should_sample {
205            if let Some(path) = &self.sample_rate_key.path {
206                match event {
207                    Event::Log(ref mut event) => {
208                        event.namespace().insert_source_metadata(
209                            self.name.as_str(),
210                            event,
211                            Some(LegacyKey::Overwrite(path)),
212                            path,
213                            self.rate.to_string(),
214                        );
215                    }
216                    Event::Trace(ref mut event) => {
217                        event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
218                    }
219                    Event::Metric(_) => panic!("component can never receive metric events"),
220                };
221            }
222            output.push(event);
223        } else {
224            emit!(SampleEventDiscarded);
225        }
226    }
227}