vector_stream/
expiration_map.rs

1use std::time::Duration;
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5
6#[derive(Default)]
7pub struct Emitter<T> {
8    values: Vec<T>,
9}
10
11impl<T> Emitter<T> {
12    pub fn new() -> Self {
13        Self { values: vec![] }
14    }
15    pub fn emit(&mut self, value: T) {
16        self.values.push(value);
17    }
18}
19
20/// Similar to `stream.filter_map(..).flatten(..)` but also allows checking for expired events
21/// and flushing when the input stream ends.
22pub fn map_with_expiration<S, T, M, E, F>(
23    initial_state: S,
24    input: impl Stream<Item = T> + 'static,
25    expiration_interval: Duration,
26    // called for each event
27    mut map_fn: M,
28    // called periodically to allow expiring internal state
29    mut expiration_fn: E,
30    // called once at the end of the input stream
31    mut flush_fn: F,
32) -> impl Stream<Item = T>
33where
34    M: FnMut(&mut S, T, &mut Emitter<T>),
35    E: FnMut(&mut S, &mut Emitter<T>),
36    F: FnMut(&mut S, &mut Emitter<T>),
37{
38    let mut state = initial_state;
39    let mut flush_stream = tokio::time::interval(expiration_interval);
40
41    Box::pin(stream! {
42        futures_util::pin_mut!(input);
43              loop {
44                let mut emitter = Emitter::<T>::new();
45                let done = tokio::select! {
46                    _ = flush_stream.tick() => {
47                        expiration_fn(&mut state, &mut emitter);
48                        false
49                    }
50                    maybe_event = input.next() => {
51                      match maybe_event {
52                        None => {
53                            flush_fn(&mut state, &mut emitter);
54                            true
55                        }
56                        Some(event) => {
57                            map_fn(&mut state, event, &mut emitter);
58                            false
59                        }
60                      }
61                    }
62                };
63                yield futures::stream::iter(emitter.values.into_iter());
64                if done { break }
65              }
66
67    })
68    .flatten()
69}
70
71#[cfg(test)]
72mod test {
73    use super::*;
74
75    #[tokio::test]
76    async fn test_simple() {
77        let input = futures::stream::iter([1, 2, 3]);
78
79        let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
80            *state += event;
81            emitter.emit(*state);
82        };
83        let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
84            // do nothing
85        };
86        let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
87            emitter.emit(*state);
88        };
89        let stream: Vec<i32> = map_with_expiration(
90            0_i32,
91            input,
92            Duration::from_secs(100),
93            map_fn,
94            expiration_fn,
95            flush_fn,
96        )
97        .take(4)
98        .collect()
99        .await;
100
101        assert_eq!(vec![1, 3, 6, 6], stream);
102    }
103
104    #[tokio::test]
105    async fn test_expiration() {
106        // an input that never ends (to test expiration)
107        let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());
108
109        let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
110            *state += event;
111            emitter.emit(*state);
112        };
113        let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
114            emitter.emit(*state);
115        };
116        let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
117            // do nothing
118        };
119        let stream: Vec<i32> = map_with_expiration(
120            0_i32,
121            input,
122            Duration::from_secs(1),
123            map_fn,
124            expiration_fn,
125            flush_fn,
126        )
127        .take(4)
128        .collect()
129        .await;
130
131        assert_eq!(vec![1, 3, 6, 6], stream);
132    }
133}