vector/expiring_hash_map.rs
1//! Expiring Hash Map and related types. See [`ExpiringHashMap`].
2#![warn(missing_docs)]
3
4use std::{
5 borrow::Borrow,
6 collections::HashMap,
7 fmt,
8 hash::Hash,
9 time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use tokio_util::time::{DelayQueue, delay_queue};
14
15/// An expired item, holding the value and the key with an expiration information.
16pub type ExpiredItem<K, V> = (V, delay_queue::Expired<K>);
17
18/// A [`HashMap`] that maintains deadlines for the keys via a [`DelayQueue`].
19pub struct ExpiringHashMap<K, V> {
20 map: HashMap<K, (V, delay_queue::Key)>,
21 expiration_queue: DelayQueue<K>,
22}
23
24impl<K, V> Unpin for ExpiringHashMap<K, V> {}
25
26impl<K, V> ExpiringHashMap<K, V>
27where
28 K: Eq + Hash + Clone,
29{
30 /// Insert a new key with a TTL.
31 pub fn insert(&mut self, key: K, value: V, ttl: Duration) {
32 let delay_queue_key = self.expiration_queue.insert(key.clone(), ttl);
33 self.map.insert(key, (value, delay_queue_key));
34 }
35
36 /// Insert a new value with a deadline.
37 pub fn insert_at(&mut self, key: K, value: V, deadline: Instant) {
38 let delay_queue_key = self
39 .expiration_queue
40 .insert_at(key.clone(), deadline.into());
41 self.map.insert(key, (value, delay_queue_key));
42 }
43
44 /// Get a reference to the value by key.
45 pub fn get<Q>(&self, k: &Q) -> Option<&V>
46 where
47 K: Borrow<Q>,
48 Q: ?Sized + Hash + Eq,
49 {
50 self.map.get(k).map(|(v, _)| v)
51 }
52
53 /// Get a reference to the value by key with the expiration information.
54 pub fn get_with_deadline<Q>(&self, k: &Q) -> Option<(&V, Instant)>
55 where
56 K: Borrow<Q>,
57 Q: ?Sized + Hash + Eq,
58 {
59 let (value, delay_queue_key) = self.map.get(k)?;
60 let deadline = self.expiration_queue.deadline(delay_queue_key);
61 Some((value, deadline.into()))
62 }
63
64 /// Get a mut reference to the value by key.
65 pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
66 where
67 K: Borrow<Q>,
68 Q: ?Sized + Hash + Eq,
69 {
70 self.map.get_mut(k).map(|&mut (ref mut v, _)| v)
71 }
72
73 /// Reset the deadline for a key, and return a mut ref to the value.
74 pub fn reset_at<Q>(&mut self, k: &Q, when: Instant) -> Option<&mut V>
75 where
76 K: Borrow<Q>,
77 Q: ?Sized + Hash + Eq,
78 {
79 let (value, delay_queue_key) = self.map.get_mut(k)?;
80 self.expiration_queue.reset_at(delay_queue_key, when.into());
81 Some(value)
82 }
83
84 /// Reset the key if it exists, returning the value and the expiration
85 /// information.
86 pub fn remove<Q>(&mut self, k: &Q) -> Option<ExpiredItem<K, V>>
87 where
88 K: Borrow<Q>,
89 Q: ?Sized + Hash + Eq,
90 {
91 let (value, expiration_queue_key) = self.map.remove(k)?;
92 let expired = self.expiration_queue.remove(&expiration_queue_key);
93 Some((value, expired))
94 }
95
96 /// Return an iterator over keys and values of ExpiringHashMap. Useful for
97 /// processing all values in ExpiringHashMap irrespective of expiration. This
98 /// may be required for processing shutdown or other operations.
99 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
100 self.map.iter_mut().map(|(k, (v, _delayed_key))| (k, v))
101 }
102
103 /// Check whether the [`ExpiringHashMap`] is empty.
104 /// If it's empty, the `next_expired` function immediately resolves to
105 /// [`None`]. Be aware that this may cause a spinlock behaviour if the
106 /// `next_expired` is polled in a loop while [`ExpiringHashMap`] is empty.
107 /// See [`ExpiringHashMap::next_expired`] for more info.
108 pub fn is_empty(&self) -> bool {
109 self.expiration_queue.is_empty()
110 }
111
112 /// Returns the number of elements in the map.
113 pub fn len(&self) -> usize {
114 self.map.len()
115 }
116
117 /// If the [`ExpiringHashMap`] is empty, immediately returns `None`.
118 /// Otherwise, waits for the closest deadline, removes expired item and
119 /// returns it.
120 ///
121 /// Be aware that misuse of this function may cause a spinlock! If you want
122 /// to `select` on this future in a loop, be sure to check
123 /// [`ExpiringHashMap::is_empty`] and skip polling on
124 /// [`ExpiringHashMap::next_expired`] if the [`ExpiringHashMap`] is empty.
125 /// Otherwise, when the [`ExpiringHashMap`] is empty you'll effectively get
126 /// a spinlock on the first value insertion.
127 ///
128 /// We currently don't offer an API that would allow simply waiting for
129 /// expired items regardless of what state the [`ExpiringHashMap`] is.
130 /// This is a deliberate design decision, we went with it for the following
131 /// reasons:
132 /// 1. Use of `async fn`. One of the benefits of this API is that it relies
133 /// only on `async fn`s, and doesn't require manual `Future`
134 /// implementation. While this is not a problem in general, but there is
135 /// some value with doing it this way. With a switch to `async` across
136 /// our code base, the idea is that we should completely eliminate manual
137 /// `Future` implementations and poll fns. This is controversial, but we
138 /// decided to give it a try.
139 /// 2. We don't know all the use cases, and exposing this kind of API might
140 /// make more sense, since it allows more flexibility.
141 /// We were choosing between, effectively, the current "drain"-like API,
142 /// and the "queue" like API.
143 /// Current ("drain"-like) API waits on the deadline or returns `None`
144 /// when there are no more items. Very similar how we [`Vec::drain`] iter
145 /// works.
146 /// The "queue"-like API would, pretty much, be simply waiting expired
147 /// items to appear. In the case of empty [`ExpiringHashMap`], we would
148 /// wait indefinitely - or until an item is inserted. This would be
149 /// possible to carry on, for instance, from a sibling branch of a
150 /// `select` statement, so the borrowing rules won't be a problem here.
151 /// 3. We went over the following alternative signature:
152 /// ```ignore
153 /// pub fn next_expired(&mut self) -> Option<impl Future<Outcome = Result<ExpiredItem<K, V>, Error>>> {...}
154 /// ```
155 /// This captures the API restrictions a bit better, and should provide
156 /// less possibilities to misuse the API.
157 /// We didn't pick this one because it's not an `async fn` and we wanted
158 /// this, see (1) of this list. Furthermore, instead of doing a
159 /// `select { _ = map.next_expired(), if !map.is_empty() => { ... } }`
160 /// users would have to do
161 /// `let exp = map.next_expired(); select { _ = exp.unwrap(), if exp.is_some() => { ... } }`,
162 /// which is less readable and a bit harder to understand. Although it
163 /// has a possibility of a nicer generalization if `select` macro
164 /// supported a `Some(future)` kind of pattern matching, we decided to go
165 /// with other solution for now.
166 ///
167 /// # Examples
168 ///
169 /// ```rust
170 /// # let rt = tokio::runtime::Runtime::new().unwrap();
171 /// # rt.block_on(async {
172 /// use vector::expiring_hash_map::ExpiringHashMap;
173 /// use std::time::Duration;
174 ///
175 /// let mut map: ExpiringHashMap<String, String> = ExpiringHashMap::default();
176 ///
177 /// loop {
178 /// tokio::select! {
179 /// // You need to ensure that this branch is disabled if the map
180 /// // is empty! Not doing this will result in a spinlock.
181 /// val = map.next_expired(), if !map.is_empty() => match val {
182 /// None => unreachable!(), // we never poll the empty map in the first place!
183 /// Some((val, _)) => {
184 /// println!("Expired: {}", val);
185 /// break;
186 /// }
187 /// },
188 /// _ = tokio::time::sleep(Duration::from_millis(100)) => map.insert(
189 /// "key".to_owned(),
190 /// "val".to_owned(),
191 /// Duration::from_millis(30),
192 /// ),
193 /// }
194 /// }
195 /// # });
196 /// ```
197 pub async fn next_expired(&mut self) -> Option<ExpiredItem<K, V>> {
198 self.expiration_queue.next().await.map(|key| {
199 let (value, _) = self.map.remove(key.get_ref()).unwrap();
200 (value, key)
201 })
202 }
203}
204
205impl<K, V> Default for ExpiringHashMap<K, V>
206where
207 K: Eq + Hash + Clone,
208{
209 fn default() -> Self {
210 Self {
211 map: HashMap::new(),
212 expiration_queue: DelayQueue::new(),
213 }
214 }
215}
216
217impl<K, V> fmt::Debug for ExpiringHashMap<K, V>
218where
219 K: Eq + Hash + Clone,
220{
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 f.debug_struct("ExpiringHashMap").finish()
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use std::task::Poll;
229
230 use tokio_test::{assert_pending, assert_ready, task};
231
232 use super::*;
233
234 fn unwrap_ready<T>(poll: Poll<T>) -> T {
235 assert_ready!(&poll);
236 match poll {
237 Poll::Ready(val) => val,
238 _ => unreachable!(),
239 }
240 }
241
242 #[test]
243 fn next_expired_is_pending_with_empty_map() {
244 let mut map = ExpiringHashMap::<String, String>::default();
245 let mut fut = task::spawn(map.next_expired());
246 assert!(unwrap_ready(fut.poll()).is_none());
247 }
248
249 #[tokio::test]
250 async fn next_expired_is_pending_with_a_non_empty_map() {
251 let mut map = ExpiringHashMap::<String, String>::default();
252 map.insert("key".to_owned(), "val".to_owned(), Duration::from_secs(1));
253 let mut fut = task::spawn(map.next_expired());
254 assert_pending!(fut.poll());
255 }
256
257 #[tokio::test]
258 async fn next_expired_does_not_wake_when_the_value_is_available_upfront() {
259 let mut map = ExpiringHashMap::<String, String>::default();
260
261 let a_minute_ago = Instant::now() - Duration::from_secs(60);
262 map.insert_at("key".to_owned(), "val".to_owned(), a_minute_ago);
263
264 let mut fut = task::spawn(map.next_expired());
265 assert_eq!(unwrap_ready(fut.poll()).unwrap().0, "val");
266 assert!(!fut.is_woken());
267 }
268
269 #[tokio::test(start_paused = true)]
270 async fn next_expired_wakes_and_becomes_ready_when_value_ttl_expires() {
271 let mut map = ExpiringHashMap::<String, String>::default();
272
273 let ttl = Duration::from_secs(1);
274 map.insert("key".to_owned(), "val".to_owned(), ttl);
275
276 let mut fut = task::spawn(map.next_expired());
277
278 // At first, has to be pending.
279 assert_pending!(fut.poll());
280 assert!(!fut.is_woken());
281
282 // Then, after deadline, has to be ready.
283 tokio::time::advance(Duration::from_secs(1)).await;
284 assert!(fut.is_woken());
285 let value = assert_ready!(fut.poll());
286 let (key, value) = value
287 .map(|(value, key)| (key.into_inner(), value))
288 .expect("map definitively had entry that should be expired");
289 assert_eq!(key, "key".to_owned());
290 assert_eq!(value, "val".to_owned());
291 }
292
293 #[tokio::test]
294 async fn next_expired_api_allows_inserting_items() {
295 let mut map = ExpiringHashMap::<String, String>::default();
296
297 // At first, has to be pending.
298 let mut fut = task::spawn(map.next_expired());
299 assert!(unwrap_ready(fut.poll()).is_none());
300 drop(fut);
301
302 // Insert an item.
303 let ttl = Duration::from_secs(1000);
304 map.insert("key".to_owned(), "val".to_owned(), ttl);
305
306 // Then, after value is inserted, has to be still pending.
307 let mut fut = task::spawn(map.next_expired());
308 assert_pending!(fut.poll());
309 }
310}