vector/
expiring_hash_map.rs

1//! Expiring Hash Map and related types. See [`ExpiringHashMap`].
2#![warn(missing_docs)]
3
4use std::{
5    borrow::Borrow,
6    collections::HashMap,
7    fmt,
8    hash::Hash,
9    time::{Duration, Instant},
10};
11
12use futures::StreamExt;
13use tokio_util::time::{delay_queue, DelayQueue};
14
15/// An expired item, holding the value and the key with an expiration information.
16pub type ExpiredItem<K, V> = (V, delay_queue::Expired<K>);
17
18/// A [`HashMap`] that maintains deadlines for the keys via a [`DelayQueue`].
19pub struct ExpiringHashMap<K, V> {
20    map: HashMap<K, (V, delay_queue::Key)>,
21    expiration_queue: DelayQueue<K>,
22}
23
24impl<K, V> Unpin for ExpiringHashMap<K, V> {}
25
26impl<K, V> ExpiringHashMap<K, V>
27where
28    K: Eq + Hash + Clone,
29{
30    /// Insert a new key with a TTL.
31    pub fn insert(&mut self, key: K, value: V, ttl: Duration) {
32        let delay_queue_key = self.expiration_queue.insert(key.clone(), ttl);
33        self.map.insert(key, (value, delay_queue_key));
34    }
35
36    /// Insert a new value with a deadline.
37    pub fn insert_at(&mut self, key: K, value: V, deadline: Instant) {
38        let delay_queue_key = self
39            .expiration_queue
40            .insert_at(key.clone(), deadline.into());
41        self.map.insert(key, (value, delay_queue_key));
42    }
43
44    /// Get a reference to the value by key.
45    pub fn get<Q>(&self, k: &Q) -> Option<&V>
46    where
47        K: Borrow<Q>,
48        Q: ?Sized + Hash + Eq,
49    {
50        self.map.get(k).map(|(v, _)| v)
51    }
52
53    /// Get a mut reference to the value by key.
54    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<&mut V>
55    where
56        K: Borrow<Q>,
57        Q: ?Sized + Hash + Eq,
58    {
59        self.map.get_mut(k).map(|&mut (ref mut v, _)| v)
60    }
61
62    /// Reset the deadline for a key, and return a mut ref to the value.
63    pub fn reset_at<Q>(&mut self, k: &Q, when: Instant) -> Option<&mut V>
64    where
65        K: Borrow<Q>,
66        Q: ?Sized + Hash + Eq,
67    {
68        let (value, delay_queue_key) = self.map.get_mut(k)?;
69        self.expiration_queue.reset_at(delay_queue_key, when.into());
70        Some(value)
71    }
72
73    /// Reset the key if it exists, returning the value and the expiration
74    /// information.
75    pub fn remove<Q>(&mut self, k: &Q) -> Option<ExpiredItem<K, V>>
76    where
77        K: Borrow<Q>,
78        Q: ?Sized + Hash + Eq,
79    {
80        let (value, expiration_queue_key) = self.map.remove(k)?;
81        let expired = self.expiration_queue.remove(&expiration_queue_key);
82        Some((value, expired))
83    }
84
85    /// Return an iterator over keys and values of ExpiringHashMap. Useful for
86    /// processing all values in ExpiringHashMap irrespective of expiration. This
87    /// may be required for processing shutdown or other operations.
88    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut V)> {
89        self.map.iter_mut().map(|(k, (v, _delayed_key))| (k, v))
90    }
91
92    /// Check whether the [`ExpiringHashMap`] is empty.
93    /// If it's empty, the `next_expired` function immediately resolves to
94    /// [`None`]. Be aware that this may cause a spinlock behaviour if the
95    /// `next_expired` is polled in a loop while [`ExpiringHashMap`] is empty.
96    /// See [`ExpiringHashMap::next_expired`] for more info.
97    pub fn is_empty(&self) -> bool {
98        self.expiration_queue.is_empty()
99    }
100
101    /// Returns the number of elements in the map.
102    pub fn len(&self) -> usize {
103        self.map.len()
104    }
105
106    /// If the [`ExpiringHashMap`] is empty, immediately returns `None`.
107    /// Otherwise, waits for the closest deadline, removes expired item and
108    /// returns it.
109    ///
110    /// Be aware that misuse of this function may cause a spinlock! If you want
111    /// to `select` on this future in a loop, be sure to check
112    /// [`ExpiringHashMap::is_empty`] and skip polling on
113    /// [`ExpiringHashMap::next_expired`] if the [`ExpiringHashMap`] is empty.
114    /// Otherwise, when the [`ExpiringHashMap`] is empty you'll effectively get
115    /// a spinlock on the first value insertion.
116    ///
117    /// We currently don't offer an API that would allow simply waiting for
118    /// expired items regardless of what state the [`ExpiringHashMap`] is.
119    /// This is a deliberate design decision, we went with it for the following
120    /// reasons:
121    /// 1. Use of `async fn`. One of the benefits of this API is that it relies
122    ///    only on `async fn`s, and doesn't require manual `Future`
123    ///    implementation. While this is not a problem in general, but there is
124    ///    some value with doing it this way. With a switch to `async` across
125    ///    our code base, the idea is that we should completely eliminate manual
126    ///    `Future` implementations and poll fns. This is controversial, but we
127    ///    decided to give it a try.
128    /// 2. We don't know all the use cases, and exposing this kind of API might
129    ///    make more sense, since it allows more flexibility.
130    ///    We were choosing between, effectively, the current "drain"-like API,
131    ///    and the "queue" like API.
132    ///    Current ("drain"-like) API waits on the deadline or returns `None`
133    ///    when there are no more items. Very similar how we [`Vec::drain`] iter
134    ///    works.
135    ///    The "queue"-like API would, pretty much, be simply waiting expired
136    ///    items to appear. In the case of empty [`ExpiringHashMap`], we would
137    ///    wait indefinitely - or until an item is inserted. This would be
138    ///    possible to carry on, for instance, from a sibling branch of a
139    ///    `select` statement, so the borrowing rules won't be a problem here.
140    /// 3. We went over the following alternative signature:
141    ///    ```ignore
142    ///    pub fn next_expired(&mut self) -> Option<impl Future<Outcome = Result<ExpiredItem<K, V>, Error>>> {...}
143    ///    ```
144    ///    This captures the API restrictions a bit better, and should provide
145    ///    less possibilities to misuse the API.
146    ///    We didn't pick this one because it's not an `async fn` and we wanted
147    ///    this, see (1) of this list. Furthermore, instead of doing a
148    ///    `select { _ = map.next_expired(), if !map.is_empty() => { ... } }`
149    ///    users would have to do
150    ///    `let exp = map.next_expired(); select { _ = exp.unwrap(), if exp.is_some() => { ... } }`,
151    ///    which is less readable and a bit harder to understand. Although it
152    ///    has a possibility of a nicer generalization if `select` macro
153    ///    supported a `Some(future)` kind of pattern matching, we decided to go
154    ///    with other solution for now.
155    ///
156    /// # Examples
157    ///
158    /// ```rust
159    /// # let rt = tokio::runtime::Runtime::new().unwrap();
160    /// # rt.block_on(async {
161    /// use vector::expiring_hash_map::ExpiringHashMap;
162    /// use std::time::Duration;
163    ///
164    /// let mut map: ExpiringHashMap<String, String> = ExpiringHashMap::default();
165    ///
166    /// loop {
167    ///     tokio::select! {
168    ///         // You need to ensure that this branch is disabled if the map
169    ///         // is empty! Not doing this will result in a spinlock.
170    ///         val = map.next_expired(), if !map.is_empty() => match val {
171    ///             None => unreachable!(), // we never poll the empty map in the first place!
172    ///             Some((val, _)) => {
173    ///                 println!("Expired: {}", val);
174    ///                 break;
175    ///             }
176    ///         },
177    ///         _ = tokio::time::sleep(Duration::from_millis(100)) => map.insert(
178    ///             "key".to_owned(),
179    ///             "val".to_owned(),
180    ///             Duration::from_millis(30),
181    ///         ),
182    ///     }
183    /// }
184    /// # });
185    /// ```
186    pub async fn next_expired(&mut self) -> Option<ExpiredItem<K, V>> {
187        self.expiration_queue.next().await.map(|key| {
188            let (value, _) = self.map.remove(key.get_ref()).unwrap();
189            (value, key)
190        })
191    }
192}
193
194impl<K, V> Default for ExpiringHashMap<K, V>
195where
196    K: Eq + Hash + Clone,
197{
198    fn default() -> Self {
199        Self {
200            map: HashMap::new(),
201            expiration_queue: DelayQueue::new(),
202        }
203    }
204}
205
206impl<K, V> fmt::Debug for ExpiringHashMap<K, V>
207where
208    K: Eq + Hash + Clone,
209{
210    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
211        f.debug_struct("ExpiringHashMap").finish()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::task::Poll;
218
219    use tokio_test::{assert_pending, assert_ready, task};
220
221    use super::*;
222
223    fn unwrap_ready<T>(poll: Poll<T>) -> T {
224        assert_ready!(&poll);
225        match poll {
226            Poll::Ready(val) => val,
227            _ => unreachable!(),
228        }
229    }
230
231    #[test]
232    fn next_expired_is_pending_with_empty_map() {
233        let mut map = ExpiringHashMap::<String, String>::default();
234        let mut fut = task::spawn(map.next_expired());
235        assert!(unwrap_ready(fut.poll()).is_none());
236    }
237
238    #[tokio::test]
239    async fn next_expired_is_pending_with_a_non_empty_map() {
240        let mut map = ExpiringHashMap::<String, String>::default();
241        map.insert("key".to_owned(), "val".to_owned(), Duration::from_secs(1));
242        let mut fut = task::spawn(map.next_expired());
243        assert_pending!(fut.poll());
244    }
245
246    #[tokio::test]
247    async fn next_expired_does_not_wake_when_the_value_is_available_upfront() {
248        let mut map = ExpiringHashMap::<String, String>::default();
249
250        let a_minute_ago = Instant::now() - Duration::from_secs(60);
251        map.insert_at("key".to_owned(), "val".to_owned(), a_minute_ago);
252
253        let mut fut = task::spawn(map.next_expired());
254        assert_eq!(unwrap_ready(fut.poll()).unwrap().0, "val");
255        assert!(!fut.is_woken());
256    }
257
258    #[tokio::test(start_paused = true)]
259    async fn next_expired_wakes_and_becomes_ready_when_value_ttl_expires() {
260        let mut map = ExpiringHashMap::<String, String>::default();
261
262        let ttl = Duration::from_secs(1);
263        map.insert("key".to_owned(), "val".to_owned(), ttl);
264
265        let mut fut = task::spawn(map.next_expired());
266
267        // At first, has to be pending.
268        assert_pending!(fut.poll());
269        assert!(!fut.is_woken());
270
271        // Then, after deadline, has to be ready.
272        tokio::time::advance(Duration::from_secs(1)).await;
273        assert!(fut.is_woken());
274        let value = assert_ready!(fut.poll());
275        let (key, value) = value
276            .map(|(value, key)| (key.into_inner(), value))
277            .expect("map definitively had entry that should be expired");
278        assert_eq!(key, "key".to_owned());
279        assert_eq!(value, "val".to_owned());
280    }
281
282    #[tokio::test]
283    async fn next_expired_api_allows_inserting_items() {
284        let mut map = ExpiringHashMap::<String, String>::default();
285
286        // At first, has to be pending.
287        let mut fut = task::spawn(map.next_expired());
288        assert!(unwrap_ready(fut.poll()).is_none());
289        drop(fut);
290
291        // Insert an item.
292        let ttl = Duration::from_secs(1000);
293        map.insert("key".to_owned(), "val".to_owned(), ttl);
294
295        // Then, after value is inserted, has to be still pending.
296        let mut fut = task::spawn(map.next_expired());
297        assert_pending!(fut.poll());
298    }
299}