vector_stream/
expiration_map.rs1use async_stream::stream;
2use futures::{Stream, StreamExt};
3use std::time::Duration;
4
5#[derive(Default)]
6pub struct Emitter<T> {
7 values: Vec<T>,
8}
9
10impl<T> Emitter<T> {
11 pub fn new() -> Self {
12 Self { values: vec![] }
13 }
14 pub fn emit(&mut self, value: T) {
15 self.values.push(value);
16 }
17}
18
19pub fn map_with_expiration<S, T, M, E, F>(
22 initial_state: S,
23 input: impl Stream<Item = T> + 'static,
24 expiration_interval: Duration,
25 mut map_fn: M,
27 mut expiration_fn: E,
29 mut flush_fn: F,
31) -> impl Stream<Item = T>
32where
33 M: FnMut(&mut S, T, &mut Emitter<T>),
34 E: FnMut(&mut S, &mut Emitter<T>),
35 F: FnMut(&mut S, &mut Emitter<T>),
36{
37 let mut state = initial_state;
38 let mut flush_stream = tokio::time::interval(expiration_interval);
39
40 Box::pin(stream! {
41 futures_util::pin_mut!(input);
42 loop {
43 let mut emitter = Emitter::<T>::new();
44 let done = tokio::select! {
45 _ = flush_stream.tick() => {
46 expiration_fn(&mut state, &mut emitter);
47 false
48 }
49 maybe_event = input.next() => {
50 match maybe_event {
51 None => {
52 flush_fn(&mut state, &mut emitter);
53 true
54 }
55 Some(event) => {
56 map_fn(&mut state, event, &mut emitter);
57 false
58 }
59 }
60 }
61 };
62 yield futures::stream::iter(emitter.values.into_iter());
63 if done { break }
64 }
65
66 })
67 .flatten()
68}
69
70#[cfg(test)]
71mod test {
72 use super::*;
73
74 #[tokio::test]
75 async fn test_simple() {
76 let input = futures::stream::iter([1, 2, 3]);
77
78 let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
79 *state += event;
80 emitter.emit(*state);
81 };
82 let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
83 };
85 let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
86 emitter.emit(*state);
87 };
88 let stream: Vec<i32> = map_with_expiration(
89 0_i32,
90 input,
91 Duration::from_secs(100),
92 map_fn,
93 expiration_fn,
94 flush_fn,
95 )
96 .take(4)
97 .collect()
98 .await;
99
100 assert_eq!(vec![1, 3, 6, 6], stream);
101 }
102
103 #[tokio::test]
104 async fn test_expiration() {
105 let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());
107
108 let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
109 *state += event;
110 emitter.emit(*state);
111 };
112 let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
113 emitter.emit(*state);
114 };
115 let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
116 };
118 let stream: Vec<i32> = map_with_expiration(
119 0_i32,
120 input,
121 Duration::from_secs(1),
122 map_fn,
123 expiration_fn,
124 flush_fn,
125 )
126 .take(4)
127 .collect()
128 .await;
129
130 assert_eq!(vec![1, 3, 6, 6], stream);
131 }
132}