vector/transforms/throttle/
rate_limiter.rs

1use std::{hash::Hash, sync::Arc, time::Duration};
2
3use governor::{
4    Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore,
5};
6use tokio;
7
8/// Re-usable wrapper around the structs/type from the governor crate.
9/// Spawns a background task that periodically flushes keys that haven't been accessed recently.
10pub struct RateLimiterRunner<K, C>
11where
12    K: Hash + Eq + Clone,
13    C: clock::Clock,
14{
15    pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
16    flush_handle: tokio::task::JoinHandle<()>,
17}
18
19impl<K, C> RateLimiterRunner<K, C>
20where
21    K: Hash + Eq + Clone + Send + Sync + 'static,
22    C: clock::Clock + Clone + Send + Sync + 'static,
23{
24    pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self {
25        let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock));
26
27        let rate_limiter_clone = Arc::clone(&rate_limiter);
28        let flush_handle = tokio::spawn(async move {
29            let mut interval = tokio::time::interval(flush_keys_interval);
30            loop {
31                interval.tick().await;
32                rate_limiter_clone.retain_recent();
33            }
34        });
35
36        Self {
37            rate_limiter,
38            flush_handle,
39        }
40    }
41
42    pub fn check_key(&self, key: &K) -> bool {
43        self.rate_limiter.check_key(key).is_ok()
44    }
45}
46
47impl<K, C> Drop for RateLimiterRunner<K, C>
48where
49    K: Hash + Eq + Clone,
50    C: clock::Clock,
51{
52    fn drop(&mut self) {
53        self.flush_handle.abort();
54    }
55}