vector/transforms/sample/
transform.rs

1use std::{borrow::Cow, collections::HashMap, fmt};
2use vector_lib::config::LegacyKey;
3
4use crate::{
5    conditions::Condition,
6    event::Event,
7    internal_events::SampleEventDiscarded,
8    sinks::prelude::TemplateRenderingError,
9    template::Template,
10    transforms::{FunctionTransform, OutputBuffer},
11};
12use vector_lib::lookup::lookup_v2::OptionalValuePath;
13use vector_lib::lookup::OwnedTargetPath;
14
15/// Exists only for backwards compatability purposes so that the value of sample_rate_key is
16/// consistent after the internal implementation of the Sample class was modified to work in terms
17/// of percentages
18#[derive(Clone, Debug)]
19pub enum SampleMode {
20    Rate {
21        rate: u64,
22        counters: HashMap<Option<String>, u64>,
23    },
24    Ratio {
25        ratio: f64,
26        values: HashMap<Option<String>, f64>,
27        hash_ratio_threshold: u64,
28    },
29}
30
31impl SampleMode {
32    pub fn new_rate(rate: u64) -> Self {
33        Self::Rate {
34            rate,
35            counters: HashMap::default(),
36        }
37    }
38
39    pub fn new_ratio(ratio: f64) -> Self {
40        Self::Ratio {
41            ratio,
42            values: HashMap::default(),
43            // Supports the 'key_field' option, assuming an equal distribution of values for a given
44            // field, hashing its contents this component should output events according to the
45            // configured ratio.
46            //
47            // To do one option would be to convert the hash to a number between 0 and 1 and compare
48            // to the ratio. However to address issues with precision, here the ratio is scaled to
49            // meet the width of the type of the hash.
50            hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
51        }
52    }
53
54    fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool {
55        let threshold_exceeded = match self {
56            Self::Rate { rate, counters } => {
57                let counter_value = counters.entry(group_by_key.clone()).or_default();
58                let old_counter_value = *counter_value;
59                *counter_value += 1;
60                old_counter_value % *rate == 0
61            }
62            Self::Ratio { ratio, values, .. } => {
63                let value = values.entry(group_by_key.clone()).or_insert(1.0 - *ratio);
64                let increment: f64 = *value + *ratio;
65                *value = if increment >= 1.0 {
66                    increment - 1.0
67                } else {
68                    increment
69                };
70                increment >= 1.0
71            }
72        };
73        if let Some(value) = value {
74            self.hash_within_ratio(value.as_bytes())
75        } else {
76            threshold_exceeded
77        }
78    }
79
80    fn hash_within_ratio(&self, value: &[u8]) -> bool {
81        let hash = seahash::hash(value);
82        match self {
83            Self::Rate { rate, .. } => hash % rate == 0,
84            Self::Ratio {
85                hash_ratio_threshold,
86                ..
87            } => hash <= *hash_ratio_threshold,
88        }
89    }
90}
91
92impl fmt::Display for SampleMode {
93    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94        // Avoids the print of an additional '.0' which was not performed in the previous
95        // implementation
96        match self {
97            Self::Rate { rate, .. } => write!(f, "{rate}"),
98            Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
99        }
100    }
101}
102
103#[derive(Clone)]
104pub struct Sample {
105    name: String,
106    rate: SampleMode,
107    key_field: Option<String>,
108    group_by: Option<Template>,
109    exclude: Option<Condition>,
110    sample_rate_key: OptionalValuePath,
111}
112
113impl Sample {
114    // This function is dead code when the feature flag `transforms-impl-sample` is specified but not
115    // `transforms-sample`.
116    #![allow(dead_code)]
117    pub const fn new(
118        name: String,
119        rate: SampleMode,
120        key_field: Option<String>,
121        group_by: Option<Template>,
122        exclude: Option<Condition>,
123        sample_rate_key: OptionalValuePath,
124    ) -> Self {
125        Self {
126            name,
127            rate,
128            key_field,
129            group_by,
130            exclude,
131            sample_rate_key,
132        }
133    }
134
135    #[cfg(test)]
136    pub fn ratio(&self) -> f64 {
137        match self.rate {
138            SampleMode::Rate { rate, .. } => 1.0f64 / rate as f64,
139            SampleMode::Ratio { ratio, .. } => ratio,
140        }
141    }
142}
143
144impl FunctionTransform for Sample {
145    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
146        let mut event = {
147            if let Some(condition) = self.exclude.as_ref() {
148                let (result, event) = condition.check(event);
149                if result {
150                    output.push(event);
151                    return;
152                } else {
153                    event
154                }
155            } else {
156                event
157            }
158        };
159
160        let value = self
161            .key_field
162            .as_ref()
163            .and_then(|key_field| match &event {
164                Event::Log(event) => event
165                    .parse_path_and_get_value(key_field.as_str())
166                    .ok()
167                    .flatten(),
168                Event::Trace(event) => event
169                    .parse_path_and_get_value(key_field.as_str())
170                    .ok()
171                    .flatten(),
172                Event::Metric(_) => panic!("component can never receive metric events"),
173            })
174            .map(|v| v.to_string_lossy());
175
176        // Fetch actual field value if group_by option is set.
177        let group_by_key = self.group_by.as_ref().and_then(|group_by| match &event {
178            Event::Log(event) => group_by
179                .render_string(event)
180                .map_err(|error| {
181                    emit!(TemplateRenderingError {
182                        error,
183                        field: Some("group_by"),
184                        drop_event: false,
185                    })
186                })
187                .ok(),
188            Event::Trace(event) => group_by
189                .render_string(event)
190                .map_err(|error| {
191                    emit!(TemplateRenderingError {
192                        error,
193                        field: Some("group_by"),
194                        drop_event: false,
195                    })
196                })
197                .ok(),
198            Event::Metric(_) => panic!("component can never receive metric events"),
199        });
200
201        let should_sample = self.rate.increment(&group_by_key, &value);
202        if should_sample {
203            if let Some(path) = &self.sample_rate_key.path {
204                match event {
205                    Event::Log(ref mut event) => {
206                        event.namespace().insert_source_metadata(
207                            self.name.as_str(),
208                            event,
209                            Some(LegacyKey::Overwrite(path)),
210                            path,
211                            self.rate.to_string(),
212                        );
213                    }
214                    Event::Trace(ref mut event) => {
215                        event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
216                    }
217                    Event::Metric(_) => panic!("component can never receive metric events"),
218                };
219            }
220            output.push(event);
221        } else {
222            emit!(SampleEventDiscarded);
223        }
224    }
225}