vector/transforms/throttle/
rate_limiter.rs1use governor::clock;
2use governor::middleware::NoOpMiddleware;
3use governor::state::keyed::DashMapStateStore;
4use governor::{Quota, RateLimiter};
5use std::hash::Hash;
6use std::sync::Arc;
7use std::time::Duration;
8use tokio;
9
10pub struct RateLimiterRunner<K, C>
13where
14 K: Hash + Eq + Clone,
15 C: clock::Clock,
16{
17 pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
18 flush_handle: tokio::task::JoinHandle<()>,
19}
20
21impl<K, C> RateLimiterRunner<K, C>
22where
23 K: Hash + Eq + Clone + Send + Sync + 'static,
24 C: clock::Clock + Clone + Send + Sync + 'static,
25{
26 pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self {
27 let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock));
28
29 let rate_limiter_clone = Arc::clone(&rate_limiter);
30 let flush_handle = tokio::spawn(async move {
31 let mut interval = tokio::time::interval(flush_keys_interval);
32 loop {
33 interval.tick().await;
34 rate_limiter_clone.retain_recent();
35 }
36 });
37
38 Self {
39 rate_limiter,
40 flush_handle,
41 }
42 }
43
44 pub fn check_key(&self, key: &K) -> bool {
45 self.rate_limiter.check_key(key).is_ok()
46 }
47}
48
49impl<K, C> Drop for RateLimiterRunner<K, C>
50where
51 K: Hash + Eq + Clone,
52 C: clock::Clock,
53{
54 fn drop(&mut self) {
55 self.flush_handle.abort();
56 }
57}