vector/transforms/throttle/
transform.rs1use async_stream::stream;
2use futures::{Stream, StreamExt};
3use governor::{clock, Quota};
4use snafu::Snafu;
5use std::hash::Hash;
6use std::{num::NonZeroU32, pin::Pin, time::Duration};
7
8use super::{
9 config::{ThrottleConfig, ThrottleInternalMetricsConfig},
10 rate_limiter::RateLimiterRunner,
11};
12use crate::{
13 conditions::Condition,
14 config::TransformContext,
15 event::Event,
16 internal_events::{TemplateRenderingError, ThrottleEventDiscarded},
17 template::Template,
18 transforms::TaskTransform,
19};
20
21#[derive(Clone)]
22pub struct Throttle<C: clock::Clock<Instant = I>, I: clock::Reference> {
23 pub quota: Quota,
24 pub flush_keys_interval: Duration,
25 key_field: Option<Template>,
26 exclude: Option<Condition>,
27 pub clock: C,
28 internal_metrics: ThrottleInternalMetricsConfig,
29}
30
31impl<C, I> Throttle<C, I>
32where
33 C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
34 I: clock::Reference,
35{
36 pub fn new(
37 config: &ThrottleConfig,
38 context: &TransformContext,
39 clock: C,
40 ) -> crate::Result<Self> {
41 let flush_keys_interval = config.window_secs;
42
43 let threshold = match NonZeroU32::new(config.threshold) {
44 Some(threshold) => threshold,
45 None => return Err(Box::new(ConfigError::NonZero)),
46 };
47
48 let quota = match Quota::with_period(Duration::from_secs_f64(
49 flush_keys_interval.as_secs_f64() / f64::from(threshold.get()),
50 )) {
51 Some(quota) => quota.allow_burst(threshold),
52 None => return Err(Box::new(ConfigError::NonZero)),
53 };
54 let exclude = config
55 .exclude
56 .as_ref()
57 .map(|condition| condition.build(&context.enrichment_tables))
58 .transpose()?;
59
60 Ok(Self {
61 quota,
62 clock,
63 flush_keys_interval,
64 key_field: config.key_field.clone(),
65 exclude,
66 internal_metrics: config.internal_metrics.clone(),
67 })
68 }
69
70 #[must_use]
71 pub fn start_rate_limiter<K>(&self) -> RateLimiterRunner<K, C>
72 where
73 K: Hash + Eq + Clone + Send + Sync + 'static,
74 {
75 RateLimiterRunner::start(self.quota, self.clock.clone(), self.flush_keys_interval)
76 }
77
78 pub fn emit_event_discarded(&self, key: String) {
79 emit!(ThrottleEventDiscarded {
80 key,
81 emit_events_discarded_per_key: self.internal_metrics.emit_events_discarded_per_key
82 });
83 }
84}
85
86impl<C, I> TaskTransform<Event> for Throttle<C, I>
87where
88 C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
89 I: clock::Reference + Send + 'static,
90{
91 fn transform(
92 self: Box<Self>,
93 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
94 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
95 where
96 Self: 'static,
97 {
98 let limiter = self.start_rate_limiter();
99
100 Box::pin(stream! {
101 while let Some(event) = input_rx.next().await {
102 let (throttle, event) = match self.exclude.as_ref() {
103 Some(condition) => {
104 let (result, event) = condition.check(event);
105 (!result, event)
106 },
107 _ => (true, event)
108 };
109 let output = if throttle {
110 let key = self.key_field.as_ref().and_then(|t| {
111 t.render_string(&event)
112 .map_err(|error| {
113 emit!(TemplateRenderingError {
114 error,
115 field: Some("key_field"),
116 drop_event: false,
117 })
118 })
119 .ok()
120 });
121
122 if limiter.check_key(&key) {
123 Some(event)
124 } else {
125 self.emit_event_discarded(key.unwrap_or_else(|| "None".to_string()));
126 None
127 }
128 } else {
129 Some(event)
130 };
131 if let Some(event) = output {
132 yield event;
133 }
134 }
135 })
136 }
137}
138
139#[derive(Debug, Snafu)]
140pub enum ConfigError {
141 #[snafu(display("`threshold`, and `window_secs` must be non-zero"))]
142 NonZero,
143}
144
145#[cfg(test)]
146mod tests {
147 use std::task::Poll;
148
149 use futures::SinkExt;
150
151 use super::*;
152 use crate::transforms::Transform;
153 use crate::{
154 event::LogEvent, test_util::components::assert_transform_compliance,
155 transforms::test::create_topology,
156 };
157 use tokio::sync::mpsc;
158 use tokio_stream::wrappers::ReceiverStream;
159
160 #[tokio::test]
161 async fn throttle_events() {
162 let clock = clock::FakeRelativeClock::default();
163 let config = toml::from_str::<ThrottleConfig>(
164 r"
165threshold = 2
166window_secs = 5
167",
168 )
169 .unwrap();
170
171 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
172 .map(Transform::event_task)
173 .unwrap();
174
175 let throttle = throttle.into_task();
176
177 let (mut tx, rx) = futures::channel::mpsc::channel(10);
178 let mut out_stream = throttle.transform_events(Box::pin(rx));
179
180 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
183
184 tx.send(LogEvent::default().into()).await.unwrap();
185 tx.send(LogEvent::default().into()).await.unwrap();
186
187 let mut count = 0_u8;
188 while count < 2 {
189 match out_stream.next().await {
190 Some(_event) => {
191 count += 1;
192 }
193 _ => {
194 panic!("Unexpectedly received None in output stream");
195 }
196 }
197 }
198 assert_eq!(2, count);
199
200 clock.advance(Duration::from_secs(2));
201
202 tx.send(LogEvent::default().into()).await.unwrap();
203
204 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
206
207 clock.advance(Duration::from_secs(3));
208
209 tx.send(LogEvent::default().into()).await.unwrap();
210
211 match out_stream.next().await {
213 Some(_event) => {}
214 _ => {
215 panic!("Unexpectedly received None in output stream");
216 }
217 }
218
219 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
221
222 tx.disconnect();
223
224 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
226 }
227
228 #[tokio::test]
229 async fn throttle_exclude() {
230 let clock = clock::FakeRelativeClock::default();
231 let config = toml::from_str::<ThrottleConfig>(
232 r#"
233threshold = 2
234window_secs = 5
235exclude = """
236exists(.special)
237"""
238"#,
239 )
240 .unwrap();
241
242 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
243 .map(Transform::event_task)
244 .unwrap();
245
246 let throttle = throttle.into_task();
247
248 let (mut tx, rx) = futures::channel::mpsc::channel(10);
249 let mut out_stream = throttle.transform_events(Box::pin(rx));
250
251 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
254
255 tx.send(LogEvent::default().into()).await.unwrap();
256 tx.send(LogEvent::default().into()).await.unwrap();
257
258 let mut count = 0_u8;
259 while count < 2 {
260 match out_stream.next().await {
261 Some(_event) => {
262 count += 1;
263 }
264 _ => {
265 panic!("Unexpectedly received None in output stream");
266 }
267 }
268 }
269 assert_eq!(2, count);
270
271 clock.advance(Duration::from_secs(2));
272
273 tx.send(LogEvent::default().into()).await.unwrap();
274
275 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
277
278 let mut special_log = LogEvent::default();
279 special_log.insert("special", "true");
280 tx.send(special_log.into()).await.unwrap();
281 match out_stream.next().await {
283 Some(_event) => {}
284 _ => {
285 panic!("Unexpectedly received None in output stream");
286 }
287 }
288
289 clock.advance(Duration::from_secs(3));
290
291 tx.send(LogEvent::default().into()).await.unwrap();
292
293 match out_stream.next().await {
295 Some(_event) => {}
296 _ => {
297 panic!("Unexpectedly received None in output stream");
298 }
299 }
300
301 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
303
304 tx.disconnect();
305
306 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
308 }
309
310 #[tokio::test]
311 async fn throttle_buckets() {
312 let clock = clock::FakeRelativeClock::default();
313 let config = toml::from_str::<ThrottleConfig>(
314 r#"
315threshold = 1
316window_secs = 5
317key_field = "{{ bucket }}"
318"#,
319 )
320 .unwrap();
321
322 let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
323 .map(Transform::event_task)
324 .unwrap();
325
326 let throttle = throttle.into_task();
327
328 let (mut tx, rx) = futures::channel::mpsc::channel(10);
329 let mut out_stream = throttle.transform_events(Box::pin(rx));
330
331 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
334
335 let mut log_a = LogEvent::default();
336 log_a.insert("bucket", "a");
337 let mut log_b = LogEvent::default();
338 log_b.insert("bucket", "b");
339 tx.send(log_a.into()).await.unwrap();
340 tx.send(log_b.into()).await.unwrap();
341
342 let mut count = 0_u8;
343 while count < 2 {
344 match out_stream.next().await {
345 Some(_event) => {
346 count += 1;
347 }
348 _ => {
349 panic!("Unexpectedly received None in output stream");
350 }
351 }
352 }
353 assert_eq!(2, count);
354
355 assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
357
358 tx.disconnect();
359
360 assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
362 }
363
364 #[tokio::test]
365 async fn emits_internal_events() {
366 assert_transform_compliance(async move {
367 let config = ThrottleConfig {
368 threshold: 1,
369 window_secs: Duration::from_secs_f64(1.0),
370 key_field: None,
371 exclude: None,
372 internal_metrics: Default::default(),
373 };
374 let (tx, rx) = mpsc::channel(1);
375 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
376
377 let log = LogEvent::from("hello world");
378 tx.send(log.into()).await.unwrap();
379
380 _ = out.recv().await;
381
382 drop(tx);
383 topology.stop().await;
384 assert_eq!(out.recv().await, None);
385 })
386 .await
387 }
388}