vector_stream/
expiration_map.rs

1use async_stream::stream;
2use futures::{Stream, StreamExt};
3use std::time::Duration;
4
5#[derive(Default)]
6pub struct Emitter<T> {
7    values: Vec<T>,
8}
9
10impl<T> Emitter<T> {
11    pub fn new() -> Self {
12        Self { values: vec![] }
13    }
14    pub fn emit(&mut self, value: T) {
15        self.values.push(value);
16    }
17}
18
19/// Similar to `stream.filter_map(..).flatten(..)` but also allows checking for expired events
20/// and flushing when the input stream ends.
21pub fn map_with_expiration<S, T, M, E, F>(
22    initial_state: S,
23    input: impl Stream<Item = T> + 'static,
24    expiration_interval: Duration,
25    // called for each event
26    mut map_fn: M,
27    // called periodically to allow expiring internal state
28    mut expiration_fn: E,
29    // called once at the end of the input stream
30    mut flush_fn: F,
31) -> impl Stream<Item = T>
32where
33    M: FnMut(&mut S, T, &mut Emitter<T>),
34    E: FnMut(&mut S, &mut Emitter<T>),
35    F: FnMut(&mut S, &mut Emitter<T>),
36{
37    let mut state = initial_state;
38    let mut flush_stream = tokio::time::interval(expiration_interval);
39
40    Box::pin(stream! {
41        futures_util::pin_mut!(input);
42              loop {
43                let mut emitter = Emitter::<T>::new();
44                let done = tokio::select! {
45                    _ = flush_stream.tick() => {
46                        expiration_fn(&mut state, &mut emitter);
47                        false
48                    }
49                    maybe_event = input.next() => {
50                      match maybe_event {
51                        None => {
52                            flush_fn(&mut state, &mut emitter);
53                            true
54                        }
55                        Some(event) => {
56                            map_fn(&mut state, event, &mut emitter);
57                            false
58                        }
59                      }
60                    }
61                };
62                yield futures::stream::iter(emitter.values.into_iter());
63                if done { break }
64              }
65
66    })
67    .flatten()
68}
69
70#[cfg(test)]
71mod test {
72    use super::*;
73
74    #[tokio::test]
75    async fn test_simple() {
76        let input = futures::stream::iter([1, 2, 3]);
77
78        let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
79            *state += event;
80            emitter.emit(*state);
81        };
82        let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
83            // do nothing
84        };
85        let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
86            emitter.emit(*state);
87        };
88        let stream: Vec<i32> = map_with_expiration(
89            0_i32,
90            input,
91            Duration::from_secs(100),
92            map_fn,
93            expiration_fn,
94            flush_fn,
95        )
96        .take(4)
97        .collect()
98        .await;
99
100        assert_eq!(vec![1, 3, 6, 6], stream);
101    }
102
103    #[tokio::test]
104    async fn test_expiration() {
105        // an input that never ends (to test expiration)
106        let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());
107
108        let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
109            *state += event;
110            emitter.emit(*state);
111        };
112        let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
113            emitter.emit(*state);
114        };
115        let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
116            // do nothing
117        };
118        let stream: Vec<i32> = map_with_expiration(
119            0_i32,
120            input,
121            Duration::from_secs(1),
122            map_fn,
123            expiration_fn,
124            flush_fn,
125        )
126        .take(4)
127        .collect()
128        .await;
129
130        assert_eq!(vec![1, 3, 6, 6], stream);
131    }
132}