vector/transforms/sample/
transform.rs1use std::{borrow::Cow, collections::HashMap, fmt};
2
3use vector_lib::{
4 config::LegacyKey,
5 lookup::{OwnedTargetPath, lookup_v2::OptionalValuePath},
6};
7
8use crate::{
9 conditions::Condition,
10 event::Event,
11 internal_events::SampleEventDiscarded,
12 sinks::prelude::TemplateRenderingError,
13 template::Template,
14 transforms::{FunctionTransform, OutputBuffer},
15};
16
17#[derive(Clone, Debug)]
21pub enum SampleMode {
22 Rate {
23 rate: u64,
24 counters: HashMap<Option<String>, u64>,
25 },
26 Ratio {
27 ratio: f64,
28 values: HashMap<Option<String>, f64>,
29 hash_ratio_threshold: u64,
30 },
31}
32
33impl SampleMode {
34 pub fn new_rate(rate: u64) -> Self {
35 Self::Rate {
36 rate,
37 counters: HashMap::default(),
38 }
39 }
40
41 pub fn new_ratio(ratio: f64) -> Self {
42 Self::Ratio {
43 ratio,
44 values: HashMap::default(),
45 hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
53 }
54 }
55
56 fn increment(&mut self, group_by_key: &Option<String>, value: &Option<Cow<'_, str>>) -> bool {
57 let threshold_exceeded = match self {
58 Self::Rate { rate, counters } => {
59 let counter_value = counters.entry(group_by_key.clone()).or_default();
60 let old_counter_value = *counter_value;
61 *counter_value += 1;
62 old_counter_value % *rate == 0
63 }
64 Self::Ratio { ratio, values, .. } => {
65 let value = values.entry(group_by_key.clone()).or_insert(1.0 - *ratio);
66 let increment: f64 = *value + *ratio;
67 *value = if increment >= 1.0 {
68 increment - 1.0
69 } else {
70 increment
71 };
72 increment >= 1.0
73 }
74 };
75 if let Some(value) = value {
76 self.hash_within_ratio(value.as_bytes())
77 } else {
78 threshold_exceeded
79 }
80 }
81
82 fn hash_within_ratio(&self, value: &[u8]) -> bool {
83 let hash = seahash::hash(value);
84 match self {
85 Self::Rate { rate, .. } => hash % rate == 0,
86 Self::Ratio {
87 hash_ratio_threshold,
88 ..
89 } => hash <= *hash_ratio_threshold,
90 }
91 }
92}
93
94impl fmt::Display for SampleMode {
95 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96 match self {
99 Self::Rate { rate, .. } => write!(f, "{rate}"),
100 Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
101 }
102 }
103}
104
105#[derive(Clone)]
106pub struct Sample {
107 name: String,
108 rate: SampleMode,
109 key_field: Option<String>,
110 group_by: Option<Template>,
111 exclude: Option<Condition>,
112 sample_rate_key: OptionalValuePath,
113}
114
115impl Sample {
116 #![allow(dead_code)]
119 pub const fn new(
120 name: String,
121 rate: SampleMode,
122 key_field: Option<String>,
123 group_by: Option<Template>,
124 exclude: Option<Condition>,
125 sample_rate_key: OptionalValuePath,
126 ) -> Self {
127 Self {
128 name,
129 rate,
130 key_field,
131 group_by,
132 exclude,
133 sample_rate_key,
134 }
135 }
136
137 #[cfg(test)]
138 pub fn ratio(&self) -> f64 {
139 match self.rate {
140 SampleMode::Rate { rate, .. } => 1.0f64 / rate as f64,
141 SampleMode::Ratio { ratio, .. } => ratio,
142 }
143 }
144}
145
146impl FunctionTransform for Sample {
147 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
148 let mut event = {
149 if let Some(condition) = self.exclude.as_ref() {
150 let (result, event) = condition.check(event);
151 if result {
152 output.push(event);
153 return;
154 } else {
155 event
156 }
157 } else {
158 event
159 }
160 };
161
162 let value = self
163 .key_field
164 .as_ref()
165 .and_then(|key_field| match &event {
166 Event::Log(event) => event
167 .parse_path_and_get_value(key_field.as_str())
168 .ok()
169 .flatten(),
170 Event::Trace(event) => event
171 .parse_path_and_get_value(key_field.as_str())
172 .ok()
173 .flatten(),
174 Event::Metric(_) => panic!("component can never receive metric events"),
175 })
176 .map(|v| v.to_string_lossy());
177
178 let group_by_key = self.group_by.as_ref().and_then(|group_by| match &event {
180 Event::Log(event) => group_by
181 .render_string(event)
182 .map_err(|error| {
183 emit!(TemplateRenderingError {
184 error,
185 field: Some("group_by"),
186 drop_event: false,
187 })
188 })
189 .ok(),
190 Event::Trace(event) => group_by
191 .render_string(event)
192 .map_err(|error| {
193 emit!(TemplateRenderingError {
194 error,
195 field: Some("group_by"),
196 drop_event: false,
197 })
198 })
199 .ok(),
200 Event::Metric(_) => panic!("component can never receive metric events"),
201 });
202
203 let should_sample = self.rate.increment(&group_by_key, &value);
204 if should_sample {
205 if let Some(path) = &self.sample_rate_key.path {
206 match event {
207 Event::Log(ref mut event) => {
208 event.namespace().insert_source_metadata(
209 self.name.as_str(),
210 event,
211 Some(LegacyKey::Overwrite(path)),
212 path,
213 self.rate.to_string(),
214 );
215 }
216 Event::Trace(ref mut event) => {
217 event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
218 }
219 Event::Metric(_) => panic!("component can never receive metric events"),
220 };
221 }
222 output.push(event);
223 } else {
224 emit!(SampleEventDiscarded);
225 }
226 }
227}