vector_stream/
expiration_map.rs1use std::time::Duration;
2
3use async_stream::stream;
4use futures::{Stream, StreamExt};
5
6#[derive(Default)]
7pub struct Emitter<T> {
8 values: Vec<T>,
9}
10
11impl<T> Emitter<T> {
12 pub fn new() -> Self {
13 Self { values: vec![] }
14 }
15 pub fn emit(&mut self, value: T) {
16 self.values.push(value);
17 }
18}
19
20pub fn map_with_expiration<S, T, M, E, F>(
23 initial_state: S,
24 input: impl Stream<Item = T> + 'static,
25 expiration_interval: Duration,
26 mut map_fn: M,
28 mut expiration_fn: E,
30 mut flush_fn: F,
32) -> impl Stream<Item = T>
33where
34 M: FnMut(&mut S, T, &mut Emitter<T>),
35 E: FnMut(&mut S, &mut Emitter<T>),
36 F: FnMut(&mut S, &mut Emitter<T>),
37{
38 let mut state = initial_state;
39 let mut flush_stream = tokio::time::interval(expiration_interval);
40
41 Box::pin(stream! {
42 futures_util::pin_mut!(input);
43 loop {
44 let mut emitter = Emitter::<T>::new();
45 let done = tokio::select! {
46 _ = flush_stream.tick() => {
47 expiration_fn(&mut state, &mut emitter);
48 false
49 }
50 maybe_event = input.next() => {
51 match maybe_event {
52 None => {
53 flush_fn(&mut state, &mut emitter);
54 true
55 }
56 Some(event) => {
57 map_fn(&mut state, event, &mut emitter);
58 false
59 }
60 }
61 }
62 };
63 yield futures::stream::iter(emitter.values.into_iter());
64 if done { break }
65 }
66
67 })
68 .flatten()
69}
70
71#[cfg(test)]
72mod test {
73 use super::*;
74
75 #[tokio::test]
76 async fn test_simple() {
77 let input = futures::stream::iter([1, 2, 3]);
78
79 let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
80 *state += event;
81 emitter.emit(*state);
82 };
83 let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
84 };
86 let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
87 emitter.emit(*state);
88 };
89 let stream: Vec<i32> = map_with_expiration(
90 0_i32,
91 input,
92 Duration::from_secs(100),
93 map_fn,
94 expiration_fn,
95 flush_fn,
96 )
97 .take(4)
98 .collect()
99 .await;
100
101 assert_eq!(vec![1, 3, 6, 6], stream);
102 }
103
104 #[tokio::test]
105 async fn test_expiration() {
106 let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());
108
109 let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
110 *state += event;
111 emitter.emit(*state);
112 };
113 let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
114 emitter.emit(*state);
115 };
116 let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
117 };
119 let stream: Vec<i32> = map_with_expiration(
120 0_i32,
121 input,
122 Duration::from_secs(1),
123 map_fn,
124 expiration_fn,
125 flush_fn,
126 )
127 .take(4)
128 .collect()
129 .await;
130
131 assert_eq!(vec![1, 3, 6, 6], stream);
132 }
133}