vector/transforms/throttle/
rate_limiter.rs

1use governor::clock;
2use governor::middleware::NoOpMiddleware;
3use governor::state::keyed::DashMapStateStore;
4use governor::{Quota, RateLimiter};
5use std::hash::Hash;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio;
9
10/// Re-usable wrapper around the structs/type from the governor crate.
11/// Spawns a background task that periodically flushes keys that haven't been accessed recently.
12pub struct RateLimiterRunner<K, C>
13where
14    K: Hash + Eq + Clone,
15    C: clock::Clock,
16{
17    pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
18    flush_handle: tokio::task::JoinHandle<()>,
19}
20
21impl<K, C> RateLimiterRunner<K, C>
22where
23    K: Hash + Eq + Clone + Send + Sync + 'static,
24    C: clock::Clock + Clone + Send + Sync + 'static,
25{
26    pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self {
27        let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock));
28
29        let rate_limiter_clone = Arc::clone(&rate_limiter);
30        let flush_handle = tokio::spawn(async move {
31            let mut interval = tokio::time::interval(flush_keys_interval);
32            loop {
33                interval.tick().await;
34                rate_limiter_clone.retain_recent();
35            }
36        });
37
38        Self {
39            rate_limiter,
40            flush_handle,
41        }
42    }
43
44    pub fn check_key(&self, key: &K) -> bool {
45        self.rate_limiter.check_key(key).is_ok()
46    }
47}
48
49impl<K, C> Drop for RateLimiterRunner<K, C>
50where
51    K: Hash + Eq + Clone,
52    C: clock::Clock,
53{
54    fn drop(&mut self) {
55        self.flush_handle.abort();
56    }
57}