vector/transforms/throttle/
transform.rs

1use async_stream::stream;
2use futures::{Stream, StreamExt};
3use governor::{clock, Quota};
4use snafu::Snafu;
5use std::hash::Hash;
6use std::{num::NonZeroU32, pin::Pin, time::Duration};
7
8use super::{
9    config::{ThrottleConfig, ThrottleInternalMetricsConfig},
10    rate_limiter::RateLimiterRunner,
11};
12use crate::{
13    conditions::Condition,
14    config::TransformContext,
15    event::Event,
16    internal_events::{TemplateRenderingError, ThrottleEventDiscarded},
17    template::Template,
18    transforms::TaskTransform,
19};
20
21#[derive(Clone)]
22pub struct Throttle<C: clock::Clock<Instant = I>, I: clock::Reference> {
23    pub quota: Quota,
24    pub flush_keys_interval: Duration,
25    key_field: Option<Template>,
26    exclude: Option<Condition>,
27    pub clock: C,
28    internal_metrics: ThrottleInternalMetricsConfig,
29}
30
31impl<C, I> Throttle<C, I>
32where
33    C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
34    I: clock::Reference,
35{
36    pub fn new(
37        config: &ThrottleConfig,
38        context: &TransformContext,
39        clock: C,
40    ) -> crate::Result<Self> {
41        let flush_keys_interval = config.window_secs;
42
43        let threshold = match NonZeroU32::new(config.threshold) {
44            Some(threshold) => threshold,
45            None => return Err(Box::new(ConfigError::NonZero)),
46        };
47
48        let quota = match Quota::with_period(Duration::from_secs_f64(
49            flush_keys_interval.as_secs_f64() / f64::from(threshold.get()),
50        )) {
51            Some(quota) => quota.allow_burst(threshold),
52            None => return Err(Box::new(ConfigError::NonZero)),
53        };
54        let exclude = config
55            .exclude
56            .as_ref()
57            .map(|condition| condition.build(&context.enrichment_tables))
58            .transpose()?;
59
60        Ok(Self {
61            quota,
62            clock,
63            flush_keys_interval,
64            key_field: config.key_field.clone(),
65            exclude,
66            internal_metrics: config.internal_metrics.clone(),
67        })
68    }
69
70    #[must_use]
71    pub fn start_rate_limiter<K>(&self) -> RateLimiterRunner<K, C>
72    where
73        K: Hash + Eq + Clone + Send + Sync + 'static,
74    {
75        RateLimiterRunner::start(self.quota, self.clock.clone(), self.flush_keys_interval)
76    }
77
78    pub fn emit_event_discarded(&self, key: String) {
79        emit!(ThrottleEventDiscarded {
80            key,
81            emit_events_discarded_per_key: self.internal_metrics.emit_events_discarded_per_key
82        });
83    }
84}
85
86impl<C, I> TaskTransform<Event> for Throttle<C, I>
87where
88    C: clock::Clock<Instant = I> + Clone + Send + Sync + 'static,
89    I: clock::Reference + Send + 'static,
90{
91    fn transform(
92        self: Box<Self>,
93        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
94    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
95    where
96        Self: 'static,
97    {
98        let limiter = self.start_rate_limiter();
99
100        Box::pin(stream! {
101            while let Some(event) = input_rx.next().await {
102                let (throttle, event) = match self.exclude.as_ref() {
103                    Some(condition) => {
104                        let (result, event) = condition.check(event);
105                        (!result, event)
106                    },
107                    _ => (true, event)
108                };
109                let output = if throttle {
110                    let key = self.key_field.as_ref().and_then(|t| {
111                        t.render_string(&event)
112                            .map_err(|error| {
113                                emit!(TemplateRenderingError {
114                                    error,
115                                    field: Some("key_field"),
116                                    drop_event: false,
117                                })
118                            })
119                            .ok()
120                    });
121
122                    if limiter.check_key(&key) {
123                        Some(event)
124                    } else {
125                        self.emit_event_discarded(key.unwrap_or_else(|| "None".to_string()));
126                        None
127                    }
128                } else {
129                    Some(event)
130                };
131                if let Some(event) = output {
132                    yield event;
133                }
134            }
135        })
136    }
137}
138
139#[derive(Debug, Snafu)]
140pub enum ConfigError {
141    #[snafu(display("`threshold`, and `window_secs` must be non-zero"))]
142    NonZero,
143}
144
145#[cfg(test)]
146mod tests {
147    use std::task::Poll;
148
149    use futures::SinkExt;
150
151    use super::*;
152    use crate::transforms::Transform;
153    use crate::{
154        event::LogEvent, test_util::components::assert_transform_compliance,
155        transforms::test::create_topology,
156    };
157    use tokio::sync::mpsc;
158    use tokio_stream::wrappers::ReceiverStream;
159
160    #[tokio::test]
161    async fn throttle_events() {
162        let clock = clock::FakeRelativeClock::default();
163        let config = toml::from_str::<ThrottleConfig>(
164            r"
165threshold = 2
166window_secs = 5
167",
168        )
169        .unwrap();
170
171        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
172            .map(Transform::event_task)
173            .unwrap();
174
175        let throttle = throttle.into_task();
176
177        let (mut tx, rx) = futures::channel::mpsc::channel(10);
178        let mut out_stream = throttle.transform_events(Box::pin(rx));
179
180        // tokio interval is always immediately ready, so we poll once to make sure
181        // we trip it/set the interval in the future
182        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
183
184        tx.send(LogEvent::default().into()).await.unwrap();
185        tx.send(LogEvent::default().into()).await.unwrap();
186
187        let mut count = 0_u8;
188        while count < 2 {
189            match out_stream.next().await {
190                Some(_event) => {
191                    count += 1;
192                }
193                _ => {
194                    panic!("Unexpectedly received None in output stream");
195                }
196            }
197        }
198        assert_eq!(2, count);
199
200        clock.advance(Duration::from_secs(2));
201
202        tx.send(LogEvent::default().into()).await.unwrap();
203
204        // We should be back to pending, having the second event dropped
205        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
206
207        clock.advance(Duration::from_secs(3));
208
209        tx.send(LogEvent::default().into()).await.unwrap();
210
211        // The rate limiter should now be refreshed and allow an additional event through
212        match out_stream.next().await {
213            Some(_event) => {}
214            _ => {
215                panic!("Unexpectedly received None in output stream");
216            }
217        }
218
219        // We should be back to pending, having nothing waiting for us
220        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
221
222        tx.disconnect();
223
224        // And still nothing there
225        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
226    }
227
228    #[tokio::test]
229    async fn throttle_exclude() {
230        let clock = clock::FakeRelativeClock::default();
231        let config = toml::from_str::<ThrottleConfig>(
232            r#"
233threshold = 2
234window_secs = 5
235exclude = """
236exists(.special)
237"""
238"#,
239        )
240        .unwrap();
241
242        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
243            .map(Transform::event_task)
244            .unwrap();
245
246        let throttle = throttle.into_task();
247
248        let (mut tx, rx) = futures::channel::mpsc::channel(10);
249        let mut out_stream = throttle.transform_events(Box::pin(rx));
250
251        // tokio interval is always immediately ready, so we poll once to make sure
252        // we trip it/set the interval in the future
253        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
254
255        tx.send(LogEvent::default().into()).await.unwrap();
256        tx.send(LogEvent::default().into()).await.unwrap();
257
258        let mut count = 0_u8;
259        while count < 2 {
260            match out_stream.next().await {
261                Some(_event) => {
262                    count += 1;
263                }
264                _ => {
265                    panic!("Unexpectedly received None in output stream");
266                }
267            }
268        }
269        assert_eq!(2, count);
270
271        clock.advance(Duration::from_secs(2));
272
273        tx.send(LogEvent::default().into()).await.unwrap();
274
275        // We should be back to pending, having the second event dropped
276        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
277
278        let mut special_log = LogEvent::default();
279        special_log.insert("special", "true");
280        tx.send(special_log.into()).await.unwrap();
281        // The rate limiter should allow this log through regardless of current limit
282        match out_stream.next().await {
283            Some(_event) => {}
284            _ => {
285                panic!("Unexpectedly received None in output stream");
286            }
287        }
288
289        clock.advance(Duration::from_secs(3));
290
291        tx.send(LogEvent::default().into()).await.unwrap();
292
293        // The rate limiter should now be refreshed and allow an additional event through
294        match out_stream.next().await {
295            Some(_event) => {}
296            _ => {
297                panic!("Unexpectedly received None in output stream");
298            }
299        }
300
301        // We should be back to pending, having nothing waiting for us
302        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
303
304        tx.disconnect();
305
306        // And still nothing there
307        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
308    }
309
310    #[tokio::test]
311    async fn throttle_buckets() {
312        let clock = clock::FakeRelativeClock::default();
313        let config = toml::from_str::<ThrottleConfig>(
314            r#"
315threshold = 1
316window_secs = 5
317key_field = "{{ bucket }}"
318"#,
319        )
320        .unwrap();
321
322        let throttle = Throttle::new(&config, &TransformContext::default(), clock.clone())
323            .map(Transform::event_task)
324            .unwrap();
325
326        let throttle = throttle.into_task();
327
328        let (mut tx, rx) = futures::channel::mpsc::channel(10);
329        let mut out_stream = throttle.transform_events(Box::pin(rx));
330
331        // tokio interval is always immediately ready, so we poll once to make sure
332        // we trip it/set the interval in the future
333        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
334
335        let mut log_a = LogEvent::default();
336        log_a.insert("bucket", "a");
337        let mut log_b = LogEvent::default();
338        log_b.insert("bucket", "b");
339        tx.send(log_a.into()).await.unwrap();
340        tx.send(log_b.into()).await.unwrap();
341
342        let mut count = 0_u8;
343        while count < 2 {
344            match out_stream.next().await {
345                Some(_event) => {
346                    count += 1;
347                }
348                _ => {
349                    panic!("Unexpectedly received None in output stream");
350                }
351            }
352        }
353        assert_eq!(2, count);
354
355        // We should be back to pending, having nothing waiting for us
356        assert_eq!(Poll::Pending, futures::poll!(out_stream.next()));
357
358        tx.disconnect();
359
360        // And still nothing there
361        assert_eq!(Poll::Ready(None), futures::poll!(out_stream.next()));
362    }
363
364    #[tokio::test]
365    async fn emits_internal_events() {
366        assert_transform_compliance(async move {
367            let config = ThrottleConfig {
368                threshold: 1,
369                window_secs: Duration::from_secs_f64(1.0),
370                key_field: None,
371                exclude: None,
372                internal_metrics: Default::default(),
373            };
374            let (tx, rx) = mpsc::channel(1);
375            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
376
377            let log = LogEvent::from("hello world");
378            tx.send(log.into()).await.unwrap();
379
380            _ = out.recv().await;
381
382            drop(tx);
383            topology.stop().await;
384            assert_eq!(out.recv().await, None);
385        })
386        .await
387    }
388}