vector/transforms/throttle/
rate_limiter.rs1use std::{hash::Hash, sync::Arc, time::Duration};
2
3use governor::{
4 Quota, RateLimiter, clock, middleware::NoOpMiddleware, state::keyed::DashMapStateStore,
5};
6use tokio;
7
8pub struct RateLimiterRunner<K, C>
11where
12 K: Hash + Eq + Clone,
13 C: clock::Clock,
14{
15 pub rate_limiter: Arc<RateLimiter<K, DashMapStateStore<K>, C, NoOpMiddleware<C::Instant>>>,
16 flush_handle: tokio::task::JoinHandle<()>,
17}
18
19impl<K, C> RateLimiterRunner<K, C>
20where
21 K: Hash + Eq + Clone + Send + Sync + 'static,
22 C: clock::Clock + Clone + Send + Sync + 'static,
23{
24 pub fn start(quota: Quota, clock: C, flush_keys_interval: Duration) -> Self {
25 let rate_limiter = Arc::new(RateLimiter::dashmap_with_clock(quota, clock));
26
27 let rate_limiter_clone = Arc::clone(&rate_limiter);
28 let flush_handle = tokio::spawn(async move {
29 let mut interval = tokio::time::interval(flush_keys_interval);
30 loop {
31 interval.tick().await;
32 rate_limiter_clone.retain_recent();
33 }
34 });
35
36 Self {
37 rate_limiter,
38 flush_handle,
39 }
40 }
41
42 pub fn check_key(&self, key: &K) -> bool {
43 self.rate_limiter.check_key(key).is_ok()
44 }
45}
46
47impl<K, C> Drop for RateLimiterRunner<K, C>
48where
49 K: Hash + Eq + Clone,
50 C: clock::Clock,
51{
52 fn drop(&mut self) {
53 self.flush_handle.abort();
54 }
55}