vector/expiring_hash_map.rs
1//! Expiring Hash Map and related types. See [`ExpiringHashMap`].
2#![warn(missing_docs)]
3
4use std::{
5 borrow::Borrow,
6 collections::HashMap,
7 fmt,
8 hash::Hash,
9 time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use tokio_util::time::{delay_queue, DelayQueue};
14
15/// An expired item, holding the value and the key with an expiration information.
16pub type ExpiredItem<K, V> = (V, delay_queue::Expired<K>);
17
18/// A [`HashMap`] that maintains deadlines for the keys via a [`DelayQueue`].
19pub struct ExpiringHashMap<K, V> {
20 map: HashMap<K, (V, delay_queue::Key)>,
21 expiration_queue: DelayQueue<K>,
22}
23
24impl<K, V> Unpin for ExpiringHashMap<K, V> {}
25
26impl<K, V> ExpiringHashMap<K, V>
27where
28 K: Eq + Hash + Clone,
29{
30 /// Insert a new key with a TTL.
31 pub fn insert(&mut self, key: K, value: V, ttl: Duration) {
32 let delay_queue_key = self.expiration_queue.insert(key.clone(), ttl);
33 self.map.insert(key, (value, delay_queue_key));
34 }
35
36 /// Insert a new value with a deadline.
37 pub fn insert_at(&mut self, key: K, value: V, deadline: Instant) {
38 let delay_queue_key = self
39 .expiration_queue
40 .insert_at(key.clone(), deadline.into());
41 self.map.insert(key, (value, delay_queue_key));
42 }
43
44 /// Get a reference to the value by key.
45 pub fn get<Q>(&self, k: &Q) -> Option<&V>
46 where
47 K: Borrow<Q>,
48 Q: ?Sized + Hash + Eq,
49 {
50 self.map.get(k).map(|(v, _)| v)
51 }
52
53 /// Get a mut reference to the value by key.
54 pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
55 where
56 K: Borrow<Q>,
57 Q: ?Sized + Hash + Eq,
58 {
59 self.map.get_mut(k).map(|&mut (ref mut v, _)| v)
60 }
61
62 /// Reset the deadline for a key, and return a mut ref to the value.
63 pub fn reset_at<Q>(&mut self, k: &Q, when: Instant) -> Option<&mut V>
64 where
65 K: Borrow<Q>,
66 Q: ?Sized + Hash + Eq,
67 {
68 let (value, delay_queue_key) = self.map.get_mut(k)?;
69 self.expiration_queue.reset_at(delay_queue_key, when.into());
70 Some(value)
71 }
72
73 /// Reset the key if it exists, returning the value and the expiration
74 /// information.
75 pub fn remove<Q>(&mut self, k: &Q) -> Option<ExpiredItem<K, V>>
76 where
77 K: Borrow<Q>,
78 Q: ?Sized + Hash + Eq,
79 {
80 let (value, expiration_queue_key) = self.map.remove(k)?;
81 let expired = self.expiration_queue.remove(&expiration_queue_key);
82 Some((value, expired))
83 }
84
85 /// Return an iterator over keys and values of ExpiringHashMap. Useful for
86 /// processing all values in ExpiringHashMap irrespective of expiration. This
87 /// may be required for processing shutdown or other operations.
88 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
89 self.map.iter_mut().map(|(k, (v, _delayed_key))| (k, v))
90 }
91
92 /// Check whether the [`ExpiringHashMap`] is empty.
93 /// If it's empty, the `next_expired` function immediately resolves to
94 /// [`None`]. Be aware that this may cause a spinlock behaviour if the
95 /// `next_expired` is polled in a loop while [`ExpiringHashMap`] is empty.
96 /// See [`ExpiringHashMap::next_expired`] for more info.
97 pub fn is_empty(&self) -> bool {
98 self.expiration_queue.is_empty()
99 }
100
101 /// Returns the number of elements in the map.
102 pub fn len(&self) -> usize {
103 self.map.len()
104 }
105
106 /// If the [`ExpiringHashMap`] is empty, immediately returns `None`.
107 /// Otherwise, waits for the closest deadline, removes expired item and
108 /// returns it.
109 ///
110 /// Be aware that misuse of this function may cause a spinlock! If you want
111 /// to `select` on this future in a loop, be sure to check
112 /// [`ExpiringHashMap::is_empty`] and skip polling on
113 /// [`ExpiringHashMap::next_expired`] if the [`ExpiringHashMap`] is empty.
114 /// Otherwise, when the [`ExpiringHashMap`] is empty you'll effectively get
115 /// a spinlock on the first value insertion.
116 ///
117 /// We currently don't offer an API that would allow simply waiting for
118 /// expired items regardless of what state the [`ExpiringHashMap`] is.
119 /// This is a deliberate design decision, we went with it for the following
120 /// reasons:
121 /// 1. Use of `async fn`. One of the benefits of this API is that it relies
122 /// only on `async fn`s, and doesn't require manual `Future`
123 /// implementation. While this is not a problem in general, but there is
124 /// some value with doing it this way. With a switch to `async` across
125 /// our code base, the idea is that we should completely eliminate manual
126 /// `Future` implementations and poll fns. This is controversial, but we
127 /// decided to give it a try.
128 /// 2. We don't know all the use cases, and exposing this kind of API might
129 /// make more sense, since it allows more flexibility.
130 /// We were choosing between, effectively, the current "drain"-like API,
131 /// and the "queue" like API.
132 /// Current ("drain"-like) API waits on the deadline or returns `None`
133 /// when there are no more items. Very similar how we [`Vec::drain`] iter
134 /// works.
135 /// The "queue"-like API would, pretty much, be simply waiting expired
136 /// items to appear. In the case of empty [`ExpiringHashMap`], we would
137 /// wait indefinitely - or until an item is inserted. This would be
138 /// possible to carry on, for instance, from a sibling branch of a
139 /// `select` statement, so the borrowing rules won't be a problem here.
140 /// 3. We went over the following alternative signature:
141 /// ```ignore
142 /// pub fn next_expired(&mut self) -> Option<impl Future<Outcome = Result<ExpiredItem<K, V>, Error>>> {...}
143 /// ```
144 /// This captures the API restrictions a bit better, and should provide
145 /// less possibilities to misuse the API.
146 /// We didn't pick this one because it's not an `async fn` and we wanted
147 /// this, see (1) of this list. Furthermore, instead of doing a
148 /// `select { _ = map.next_expired(), if !map.is_empty() => { ... } }`
149 /// users would have to do
150 /// `let exp = map.next_expired(); select { _ = exp.unwrap(), if exp.is_some() => { ... } }`,
151 /// which is less readable and a bit harder to understand. Although it
152 /// has a possibility of a nicer generalization if `select` macro
153 /// supported a `Some(future)` kind of pattern matching, we decided to go
154 /// with other solution for now.
155 ///
156 /// # Examples
157 ///
158 /// ```rust
159 /// # let rt = tokio::runtime::Runtime::new().unwrap();
160 /// # rt.block_on(async {
161 /// use vector::expiring_hash_map::ExpiringHashMap;
162 /// use std::time::Duration;
163 ///
164 /// let mut map: ExpiringHashMap<String, String> = ExpiringHashMap::default();
165 ///
166 /// loop {
167 /// tokio::select! {
168 /// // You need to ensure that this branch is disabled if the map
169 /// // is empty! Not doing this will result in a spinlock.
170 /// val = map.next_expired(), if !map.is_empty() => match val {
171 /// None => unreachable!(), // we never poll the empty map in the first place!
172 /// Some((val, _)) => {
173 /// println!("Expired: {}", val);
174 /// break;
175 /// }
176 /// },
177 /// _ = tokio::time::sleep(Duration::from_millis(100)) => map.insert(
178 /// "key".to_owned(),
179 /// "val".to_owned(),
180 /// Duration::from_millis(30),
181 /// ),
182 /// }
183 /// }
184 /// # });
185 /// ```
186 pub async fn next_expired(&mut self) -> Option<ExpiredItem<K, V>> {
187 self.expiration_queue.next().await.map(|key| {
188 let (value, _) = self.map.remove(key.get_ref()).unwrap();
189 (value, key)
190 })
191 }
192}
193
194impl<K, V> Default for ExpiringHashMap<K, V>
195where
196 K: Eq + Hash + Clone,
197{
198 fn default() -> Self {
199 Self {
200 map: HashMap::new(),
201 expiration_queue: DelayQueue::new(),
202 }
203 }
204}
205
206impl<K, V> fmt::Debug for ExpiringHashMap<K, V>
207where
208 K: Eq + Hash + Clone,
209{
210 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211 f.debug_struct("ExpiringHashMap").finish()
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::task::Poll;
218
219 use tokio_test::{assert_pending, assert_ready, task};
220
221 use super::*;
222
223 fn unwrap_ready<T>(poll: Poll<T>) -> T {
224 assert_ready!(&poll);
225 match poll {
226 Poll::Ready(val) => val,
227 _ => unreachable!(),
228 }
229 }
230
231 #[test]
232 fn next_expired_is_pending_with_empty_map() {
233 let mut map = ExpiringHashMap::<String, String>::default();
234 let mut fut = task::spawn(map.next_expired());
235 assert!(unwrap_ready(fut.poll()).is_none());
236 }
237
238 #[tokio::test]
239 async fn next_expired_is_pending_with_a_non_empty_map() {
240 let mut map = ExpiringHashMap::<String, String>::default();
241 map.insert("key".to_owned(), "val".to_owned(), Duration::from_secs(1));
242 let mut fut = task::spawn(map.next_expired());
243 assert_pending!(fut.poll());
244 }
245
246 #[tokio::test]
247 async fn next_expired_does_not_wake_when_the_value_is_available_upfront() {
248 let mut map = ExpiringHashMap::<String, String>::default();
249
250 let a_minute_ago = Instant::now() - Duration::from_secs(60);
251 map.insert_at("key".to_owned(), "val".to_owned(), a_minute_ago);
252
253 let mut fut = task::spawn(map.next_expired());
254 assert_eq!(unwrap_ready(fut.poll()).unwrap().0, "val");
255 assert!(!fut.is_woken());
256 }
257
258 #[tokio::test(start_paused = true)]
259 async fn next_expired_wakes_and_becomes_ready_when_value_ttl_expires() {
260 let mut map = ExpiringHashMap::<String, String>::default();
261
262 let ttl = Duration::from_secs(1);
263 map.insert("key".to_owned(), "val".to_owned(), ttl);
264
265 let mut fut = task::spawn(map.next_expired());
266
267 // At first, has to be pending.
268 assert_pending!(fut.poll());
269 assert!(!fut.is_woken());
270
271 // Then, after deadline, has to be ready.
272 tokio::time::advance(Duration::from_secs(1)).await;
273 assert!(fut.is_woken());
274 let value = assert_ready!(fut.poll());
275 let (key, value) = value
276 .map(|(value, key)| (key.into_inner(), value))
277 .expect("map definitively had entry that should be expired");
278 assert_eq!(key, "key".to_owned());
279 assert_eq!(value, "val".to_owned());
280 }
281
282 #[tokio::test]
283 async fn next_expired_api_allows_inserting_items() {
284 let mut map = ExpiringHashMap::<String, String>::default();
285
286 // At first, has to be pending.
287 let mut fut = task::spawn(map.next_expired());
288 assert!(unwrap_ready(fut.poll()).is_none());
289 drop(fut);
290
291 // Insert an item.
292 let ttl = Duration::from_secs(1000);
293 map.insert("key".to_owned(), "val".to_owned(), ttl);
294
295 // Then, after value is inserted, has to be still pending.
296 let mut fut = task::spawn(map.next_expired());
297 assert_pending!(fut.poll());
298 }
299}