vector/
expiring_hash_map.rs

1//! Expiring Hash Map and related types. See [`ExpiringHashMap`].
2#![warn(missing_docs)]
3
4use std::{
5    borrow::Borrow,
6    collections::HashMap,
7    fmt,
8    hash::Hash,
9    time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use tokio_util::time::{DelayQueue, delay_queue};
14
15/// An expired item, holding the value and the key with an expiration information.
16pub type ExpiredItem<K, V> = (V, delay_queue::Expired<K>);
17
18/// A [`HashMap`] that maintains deadlines for the keys via a [`DelayQueue`].
19pub struct ExpiringHashMap<K, V> {
20    map: HashMap<K, (V, delay_queue::Key)>,
21    expiration_queue: DelayQueue<K>,
22}
23
24impl<K, V> Unpin for ExpiringHashMap<K, V> {}
25
26impl<K, V> ExpiringHashMap<K, V>
27where
28    K: Eq + Hash + Clone,
29{
30    /// Insert a new key with a TTL.
31    pub fn insert(&mut self, key: K, value: V, ttl: Duration) {
32        let delay_queue_key = self.expiration_queue.insert(key.clone(), ttl);
33        self.map.insert(key, (value, delay_queue_key));
34    }
35
36    /// Insert a new value with a deadline.
37    pub fn insert_at(&mut self, key: K, value: V, deadline: Instant) {
38        let delay_queue_key = self
39            .expiration_queue
40            .insert_at(key.clone(), deadline.into());
41        self.map.insert(key, (value, delay_queue_key));
42    }
43
44    /// Get a reference to the value by key.
45    pub fn get<Q>(&self, k: &Q) -> Option<&V>
46    where
47        K: Borrow<Q>,
48        Q: ?Sized + Hash + Eq,
49    {
50        self.map.get(k).map(|(v, _)| v)
51    }
52
53    /// Get a reference to the value by key with the expiration information.
54    pub fn get_with_deadline<Q>(&self, k: &Q) -> Option<(&V, Instant)>
55    where
56        K: Borrow<Q>,
57        Q: ?Sized + Hash + Eq,
58    {
59        let (value, delay_queue_key) = self.map.get(k)?;
60        let deadline = self.expiration_queue.deadline(delay_queue_key);
61        Some((value, deadline.into()))
62    }
63
64    /// Get a mut reference to the value by key.
65    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
66    where
67        K: Borrow<Q>,
68        Q: ?Sized + Hash + Eq,
69    {
70        self.map.get_mut(k).map(|&mut (ref mut v, _)| v)
71    }
72
73    /// Reset the deadline for a key, and return a mut ref to the value.
74    pub fn reset_at<Q>(&mut self, k: &Q, when: Instant) -> Option<&mut V>
75    where
76        K: Borrow<Q>,
77        Q: ?Sized + Hash + Eq,
78    {
79        let (value, delay_queue_key) = self.map.get_mut(k)?;
80        self.expiration_queue.reset_at(delay_queue_key, when.into());
81        Some(value)
82    }
83
84    /// Reset the key if it exists, returning the value and the expiration
85    /// information.
86    pub fn remove<Q>(&mut self, k: &Q) -> Option<ExpiredItem<K, V>>
87    where
88        K: Borrow<Q>,
89        Q: ?Sized + Hash + Eq,
90    {
91        let (value, expiration_queue_key) = self.map.remove(k)?;
92        let expired = self.expiration_queue.remove(&expiration_queue_key);
93        Some((value, expired))
94    }
95
96    /// Return an iterator over keys and values of ExpiringHashMap. Useful for
97    /// processing all values in ExpiringHashMap irrespective of expiration. This
98    /// may be required for processing shutdown or other operations.
99    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
100        self.map.iter_mut().map(|(k, (v, _delayed_key))| (k, v))
101    }
102
103    /// Check whether the [`ExpiringHashMap`] is empty.
104    /// If it's empty, the `next_expired` function immediately resolves to
105    /// [`None`]. Be aware that this may cause a spinlock behaviour if the
106    /// `next_expired` is polled in a loop while [`ExpiringHashMap`] is empty.
107    /// See [`ExpiringHashMap::next_expired`] for more info.
108    pub fn is_empty(&self) -> bool {
109        self.expiration_queue.is_empty()
110    }
111
112    /// Returns the number of elements in the map.
113    pub fn len(&self) -> usize {
114        self.map.len()
115    }
116
117    /// If the [`ExpiringHashMap`] is empty, immediately returns `None`.
118    /// Otherwise, waits for the closest deadline, removes expired item and
119    /// returns it.
120    ///
121    /// Be aware that misuse of this function may cause a spinlock! If you want
122    /// to `select` on this future in a loop, be sure to check
123    /// [`ExpiringHashMap::is_empty`] and skip polling on
124    /// [`ExpiringHashMap::next_expired`] if the [`ExpiringHashMap`] is empty.
125    /// Otherwise, when the [`ExpiringHashMap`] is empty you'll effectively get
126    /// a spinlock on the first value insertion.
127    ///
128    /// We currently don't offer an API that would allow simply waiting for
129    /// expired items regardless of what state the [`ExpiringHashMap`] is.
130    /// This is a deliberate design decision, we went with it for the following
131    /// reasons:
132    /// 1. Use of `async fn`. One of the benefits of this API is that it relies
133    ///    only on `async fn`s, and doesn't require manual `Future`
134    ///    implementation. While this is not a problem in general, but there is
135    ///    some value with doing it this way. With a switch to `async` across
136    ///    our code base, the idea is that we should completely eliminate manual
137    ///    `Future` implementations and poll fns. This is controversial, but we
138    ///    decided to give it a try.
139    /// 2. We don't know all the use cases, and exposing this kind of API might
140    ///    make more sense, since it allows more flexibility.
141    ///    We were choosing between, effectively, the current "drain"-like API,
142    ///    and the "queue" like API.
143    ///    Current ("drain"-like) API waits on the deadline or returns `None`
144    ///    when there are no more items. Very similar how we [`Vec::drain`] iter
145    ///    works.
146    ///    The "queue"-like API would, pretty much, be simply waiting expired
147    ///    items to appear. In the case of empty [`ExpiringHashMap`], we would
148    ///    wait indefinitely - or until an item is inserted. This would be
149    ///    possible to carry on, for instance, from a sibling branch of a
150    ///    `select` statement, so the borrowing rules won't be a problem here.
151    /// 3. We went over the following alternative signature:
152    ///    ```ignore
153    ///    pub fn next_expired(&mut self) -> Option<impl Future<Outcome = Result<ExpiredItem<K, V>, Error>>> {...}
154    ///    ```
155    ///    This captures the API restrictions a bit better, and should provide
156    ///    less possibilities to misuse the API.
157    ///    We didn't pick this one because it's not an `async fn` and we wanted
158    ///    this, see (1) of this list. Furthermore, instead of doing a
159    ///    `select { _ = map.next_expired(), if !map.is_empty() => { ... } }`
160    ///    users would have to do
161    ///    `let exp = map.next_expired(); select { _ = exp.unwrap(), if exp.is_some() => { ... } }`,
162    ///    which is less readable and a bit harder to understand. Although it
163    ///    has a possibility of a nicer generalization if `select` macro
164    ///    supported a `Some(future)` kind of pattern matching, we decided to go
165    ///    with other solution for now.
166    ///
167    /// # Examples
168    ///
169    /// ```rust
170    /// # let rt = tokio::runtime::Runtime::new().unwrap();
171    /// # rt.block_on(async {
172    /// use vector::expiring_hash_map::ExpiringHashMap;
173    /// use std::time::Duration;
174    ///
175    /// let mut map: ExpiringHashMap<String, String> = ExpiringHashMap::default();
176    ///
177    /// loop {
178    ///     tokio::select! {
179    ///         // You need to ensure that this branch is disabled if the map
180    ///         // is empty! Not doing this will result in a spinlock.
181    ///         val = map.next_expired(), if !map.is_empty() => match val {
182    ///             None => unreachable!(), // we never poll the empty map in the first place!
183    ///             Some((val, _)) => {
184    ///                 println!("Expired: {}", val);
185    ///                 break;
186    ///             }
187    ///         },
188    ///         _ = tokio::time::sleep(Duration::from_millis(100)) => map.insert(
189    ///             "key".to_owned(),
190    ///             "val".to_owned(),
191    ///             Duration::from_millis(30),
192    ///         ),
193    ///     }
194    /// }
195    /// # });
196    /// ```
197    pub async fn next_expired(&mut self) -> Option<ExpiredItem<K, V>> {
198        self.expiration_queue.next().await.map(|key| {
199            let (value, _) = self.map.remove(key.get_ref()).unwrap();
200            (value, key)
201        })
202    }
203}
204
205impl<K, V> Default for ExpiringHashMap<K, V>
206where
207    K: Eq + Hash + Clone,
208{
209    fn default() -> Self {
210        Self {
211            map: HashMap::new(),
212            expiration_queue: DelayQueue::new(),
213        }
214    }
215}
216
217impl<K, V> fmt::Debug for ExpiringHashMap<K, V>
218where
219    K: Eq + Hash + Clone,
220{
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        f.debug_struct("ExpiringHashMap").finish()
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use std::task::Poll;
229
230    use tokio_test::{assert_pending, assert_ready, task};
231
232    use super::*;
233
234    fn unwrap_ready<T>(poll: Poll<T>) -> T {
235        assert_ready!(&poll);
236        match poll {
237            Poll::Ready(val) => val,
238            _ => unreachable!(),
239        }
240    }
241
242    #[test]
243    fn next_expired_is_pending_with_empty_map() {
244        let mut map = ExpiringHashMap::<String, String>::default();
245        let mut fut = task::spawn(map.next_expired());
246        assert!(unwrap_ready(fut.poll()).is_none());
247    }
248
249    #[tokio::test]
250    async fn next_expired_is_pending_with_a_non_empty_map() {
251        let mut map = ExpiringHashMap::<String, String>::default();
252        map.insert("key".to_owned(), "val".to_owned(), Duration::from_secs(1));
253        let mut fut = task::spawn(map.next_expired());
254        assert_pending!(fut.poll());
255    }
256
257    #[tokio::test]
258    async fn next_expired_does_not_wake_when_the_value_is_available_upfront() {
259        let mut map = ExpiringHashMap::<String, String>::default();
260
261        let a_minute_ago = Instant::now() - Duration::from_secs(60);
262        map.insert_at("key".to_owned(), "val".to_owned(), a_minute_ago);
263
264        let mut fut = task::spawn(map.next_expired());
265        assert_eq!(unwrap_ready(fut.poll()).unwrap().0, "val");
266        assert!(!fut.is_woken());
267    }
268
269    #[tokio::test(start_paused = true)]
270    async fn next_expired_wakes_and_becomes_ready_when_value_ttl_expires() {
271        let mut map = ExpiringHashMap::<String, String>::default();
272
273        let ttl = Duration::from_secs(1);
274        map.insert("key".to_owned(), "val".to_owned(), ttl);
275
276        let mut fut = task::spawn(map.next_expired());
277
278        // At first, has to be pending.
279        assert_pending!(fut.poll());
280        assert!(!fut.is_woken());
281
282        // Then, after deadline, has to be ready.
283        tokio::time::advance(Duration::from_secs(1)).await;
284        assert!(fut.is_woken());
285        let value = assert_ready!(fut.poll());
286        let (key, value) = value
287            .map(|(value, key)| (key.into_inner(), value))
288            .expect("map definitively had entry that should be expired");
289        assert_eq!(key, "key".to_owned());
290        assert_eq!(value, "val".to_owned());
291    }
292
293    #[tokio::test]
294    async fn next_expired_api_allows_inserting_items() {
295        let mut map = ExpiringHashMap::<String, String>::default();
296
297        // At first, has to be pending.
298        let mut fut = task::spawn(map.next_expired());
299        assert!(unwrap_ready(fut.poll()).is_none());
300        drop(fut);
301
302        // Insert an item.
303        let ttl = Duration::from_secs(1000);
304        map.insert("key".to_owned(), "val".to_owned(), ttl);
305
306        // Then, after value is inserted, has to be still pending.
307        let mut fut = task::spawn(map.next_expired());
308        assert_pending!(fut.poll());
309    }
310}