vector/transforms/sample/
transform.rs1use std::{borrow::Cow, collections::HashMap, fmt};
2use vector_lib::config::LegacyKey;
3
4use crate::{
5 conditions::Condition,
6 event::Event,
7 internal_events::SampleEventDiscarded,
8 sinks::prelude::TemplateRenderingError,
9 template::Template,
10 transforms::{FunctionTransform, OutputBuffer},
11};
12use vector_lib::lookup::lookup_v2::OptionalValuePath;
13use vector_lib::lookup::OwnedTargetPath;
14
15#[derive(Clone, Debug)]
19pub enum SampleMode {
20 Rate {
21 rate: u64,
22 counters: HashMap<Option<String>, u64>,
23 },
24 Ratio {
25 ratio: f64,
26 values: HashMap<Option<String>, f64>,
27 hash_ratio_threshold: u64,
28 },
29}
30
31impl SampleMode {
32 pub fn new_rate(rate: u64) -> Self {
33 Self::Rate {
34 rate,
35 counters: HashMap::default(),
36 }
37 }
38
39 pub fn new_ratio(ratio: f64) -> Self {
40 Self::Ratio {
41 ratio,
42 values: HashMap::default(),
43 hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
51 }
52 }
53
54 fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool {
55 let threshold_exceeded = match self {
56 Self::Rate { rate, counters } => {
57 let counter_value = counters.entry(group_by_key.clone()).or_default();
58 let old_counter_value = *counter_value;
59 *counter_value += 1;
60 old_counter_value % *rate == 0
61 }
62 Self::Ratio { ratio, values, .. } => {
63 let value = values.entry(group_by_key.clone()).or_insert(1.0 - *ratio);
64 let increment: f64 = *value + *ratio;
65 *value = if increment >= 1.0 {
66 increment - 1.0
67 } else {
68 increment
69 };
70 increment >= 1.0
71 }
72 };
73 if let Some(value) = value {
74 self.hash_within_ratio(value.as_bytes())
75 } else {
76 threshold_exceeded
77 }
78 }
79
80 fn hash_within_ratio(&self, value: &[u8]) -> bool {
81 let hash = seahash::hash(value);
82 match self {
83 Self::Rate { rate, .. } => hash % rate == 0,
84 Self::Ratio {
85 hash_ratio_threshold,
86 ..
87 } => hash <= *hash_ratio_threshold,
88 }
89 }
90}
91
92impl fmt::Display for SampleMode {
93 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94 match self {
97 Self::Rate { rate, .. } => write!(f, "{rate}"),
98 Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
99 }
100 }
101}
102
103#[derive(Clone)]
104pub struct Sample {
105 name: String,
106 rate: SampleMode,
107 key_field: Option<String>,
108 group_by: Option<Template>,
109 exclude: Option<Condition>,
110 sample_rate_key: OptionalValuePath,
111}
112
113impl Sample {
114 #![allow(dead_code)]
117 pub const fn new(
118 name: String,
119 rate: SampleMode,
120 key_field: Option<String>,
121 group_by: Option<Template>,
122 exclude: Option<Condition>,
123 sample_rate_key: OptionalValuePath,
124 ) -> Self {
125 Self {
126 name,
127 rate,
128 key_field,
129 group_by,
130 exclude,
131 sample_rate_key,
132 }
133 }
134
135 #[cfg(test)]
136 pub fn ratio(&self) -> f64 {
137 match self.rate {
138 SampleMode::Rate { rate, .. } => 1.0f64 / rate as f64,
139 SampleMode::Ratio { ratio, .. } => ratio,
140 }
141 }
142}
143
144impl FunctionTransform for Sample {
145 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
146 let mut event = {
147 if let Some(condition) = self.exclude.as_ref() {
148 let (result, event) = condition.check(event);
149 if result {
150 output.push(event);
151 return;
152 } else {
153 event
154 }
155 } else {
156 event
157 }
158 };
159
160 let value = self
161 .key_field
162 .as_ref()
163 .and_then(|key_field| match &event {
164 Event::Log(event) => event
165 .parse_path_and_get_value(key_field.as_str())
166 .ok()
167 .flatten(),
168 Event::Trace(event) => event
169 .parse_path_and_get_value(key_field.as_str())
170 .ok()
171 .flatten(),
172 Event::Metric(_) => panic!("component can never receive metric events"),
173 })
174 .map(|v| v.to_string_lossy());
175
176 let group_by_key = self.group_by.as_ref().and_then(|group_by| match &event {
178 Event::Log(event) => group_by
179 .render_string(event)
180 .map_err(|error| {
181 emit!(TemplateRenderingError {
182 error,
183 field: Some("group_by"),
184 drop_event: false,
185 })
186 })
187 .ok(),
188 Event::Trace(event) => group_by
189 .render_string(event)
190 .map_err(|error| {
191 emit!(TemplateRenderingError {
192 error,
193 field: Some("group_by"),
194 drop_event: false,
195 })
196 })
197 .ok(),
198 Event::Metric(_) => panic!("component can never receive metric events"),
199 });
200
201 let should_sample = self.rate.increment(&group_by_key, &value);
202 if should_sample {
203 if let Some(path) = &self.sample_rate_key.path {
204 match event {
205 Event::Log(ref mut event) => {
206 event.namespace().insert_source_metadata(
207 self.name.as_str(),
208 event,
209 Some(LegacyKey::Overwrite(path)),
210 path,
211 self.rate.to_string(),
212 );
213 }
214 Event::Trace(ref mut event) => {
215 event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
216 }
217 Event::Metric(_) => panic!("component can never receive metric events"),
218 };
219 }
220 output.push(event);
221 } else {
222 emit!(SampleEventDiscarded);
223 }
224 }
225}