vector/transforms/sample/
transform.rs1use std::{collections::HashMap, fmt};
2
3use vector_lib::{
4 config::LegacyKey,
5 lookup::{OwnedTargetPath, lookup_v2::OptionalValuePath},
6};
7
8use crate::{
9 conditions::Condition,
10 event::{Event, Value},
11 internal_events::SampleEventDiscarded,
12 sinks::prelude::TemplateRenderingError,
13 template::Template,
14 transforms::{FunctionTransform, OutputBuffer},
15};
16
17#[derive(Clone, Debug)]
21pub enum SampleMode {
22 Rate {
23 rate: u64,
24 counters: HashMap<Option<String>, u64>,
25 },
26 Ratio {
27 ratio: f64,
28 values: HashMap<Option<String>, f64>,
29 hash_ratio_threshold: u64,
30 },
31}
32
33impl SampleMode {
34 pub fn new_rate(rate: u64) -> Self {
35 Self::Rate {
36 rate,
37 counters: HashMap::default(),
38 }
39 }
40
41 pub fn new_ratio(ratio: f64) -> Self {
42 Self::Ratio {
43 ratio,
44 values: HashMap::default(),
45 hash_ratio_threshold: (ratio * (u64::MAX as u128) as f64) as u64,
53 }
54 }
55
56 fn increment(&mut self, group_by_key: Option<String>, value: Option<&Value>) -> bool {
57 let threshold_exceeded = match self {
58 Self::Rate { rate, counters } => {
59 let counter_value = counters.entry(group_by_key).or_default();
60 let old_counter_value = *counter_value;
61 *counter_value += 1;
62 old_counter_value % *rate == 0
63 }
64 Self::Ratio { ratio, values, .. } => {
65 let value = values.entry(group_by_key).or_insert(1.0 - *ratio);
66 let increment: f64 = *value + *ratio;
67 *value = if increment >= 1.0 {
68 increment - 1.0
69 } else {
70 increment
71 };
72 increment >= 1.0
73 }
74 };
75 if let Some(value) = value {
76 self.hash_within_ratio(value.to_string_lossy().as_bytes())
77 } else {
78 threshold_exceeded
79 }
80 }
81
82 fn hash_within_ratio(&self, value: &[u8]) -> bool {
83 let hash = seahash::hash(value);
84 match self {
85 Self::Rate { rate, .. } => hash.is_multiple_of(*rate),
86 Self::Ratio {
87 hash_ratio_threshold,
88 ..
89 } => hash <= *hash_ratio_threshold,
90 }
91 }
92}
93
94impl fmt::Display for SampleMode {
95 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
96 match self {
99 Self::Rate { rate, .. } => write!(f, "{rate}"),
100 Self::Ratio { ratio, .. } => write!(f, "{ratio}"),
101 }
102 }
103}
104
105#[derive(Clone)]
106pub struct Sample {
107 name: String,
108 rate: SampleMode,
109 key_field: Option<String>,
110 group_by: Option<Template>,
111 exclude: Option<Condition>,
112 sample_rate_key: OptionalValuePath,
113}
114
115impl Sample {
116 #![allow(dead_code)]
119 pub const fn new(
120 name: String,
121 rate: SampleMode,
122 key_field: Option<String>,
123 group_by: Option<Template>,
124 exclude: Option<Condition>,
125 sample_rate_key: OptionalValuePath,
126 ) -> Self {
127 Self {
128 name,
129 rate,
130 key_field,
131 group_by,
132 exclude,
133 sample_rate_key,
134 }
135 }
136
137 #[cfg(test)]
138 pub fn ratio(&self) -> f64 {
139 match self.rate {
140 SampleMode::Rate { rate, .. } => 1.0f64 / rate as f64,
141 SampleMode::Ratio { ratio, .. } => ratio,
142 }
143 }
144}
145
146impl FunctionTransform for Sample {
147 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
148 let mut event = {
149 if let Some(condition) = self.exclude.as_ref() {
150 let (result, event) = condition.check(event);
151 if result {
152 output.push(event);
153 return;
154 } else {
155 event
156 }
157 } else {
158 event
159 }
160 };
161
162 let value = self.key_field.as_ref().and_then(|key_field| match &event {
163 Event::Log(event) => event
164 .parse_path_and_get_value(key_field.as_str())
165 .ok()
166 .flatten(),
167 Event::Trace(event) => event
168 .parse_path_and_get_value(key_field.as_str())
169 .ok()
170 .flatten(),
171 Event::Metric(_) => panic!("component can never receive metric events"),
172 });
173
174 let group_by_key = self.group_by.as_ref().and_then(|group_by| {
176 match &event {
177 Event::Log(event) => group_by.render_string(event),
178 Event::Trace(event) => group_by.render_string(event),
179 Event::Metric(_) => panic!("component can never receive metric events"),
180 }
181 .map_err(|error| {
182 emit!(TemplateRenderingError {
183 error,
184 field: Some("group_by"),
185 drop_event: false,
186 })
187 })
188 .ok()
189 });
190
191 if self.rate.increment(group_by_key, value) {
192 if let Some(path) = &self.sample_rate_key.path {
193 match event {
194 Event::Log(ref mut event) => {
195 event.namespace().insert_source_metadata(
196 self.name.as_str(),
197 event,
198 Some(LegacyKey::Overwrite(path)),
199 path,
200 self.rate.to_string(),
201 );
202 }
203 Event::Trace(ref mut event) => {
204 event.insert(&OwnedTargetPath::event(path.clone()), self.rate.to_string());
205 }
206 Event::Metric(_) => panic!("component can never receive metric events"),
207 };
208 }
209 output.push(event);
210 } else {
211 emit!(SampleEventDiscarded);
212 }
213 }
214}