vector_core/transform/runtime_transform/
mod.rs

1mod vec_stream;
2
3use std::{future::ready, pin::Pin, time::Duration};
4
5use futures::{
6    stream::{self, BoxStream},
7    FutureExt, Stream, StreamExt,
8};
9use tokio::time;
10use tokio_stream::wrappers::IntervalStream;
11use vec_stream::VecStreamExt;
12
13use super::{OutputBuffer, TaskTransform};
14use crate::event::Event;
15
16/// A structure representing user-defined timer.
17#[derive(Clone, Copy, Debug)]
18pub struct Timer {
19    pub id: u32,
20    pub interval: Duration,
21}
22
23/// A trait representing a runtime running user-defined code.
24pub trait RuntimeTransform {
25    /// Call user-defined "init" hook.
26    fn hook_init<F>(&mut self, _emit_fn: F)
27    where
28        F: FnMut(Event),
29    {
30    }
31
32    /// Call user-defined "process" hook.
33    fn hook_process<F>(&mut self, event: Event, emit_fn: F)
34    where
35        F: FnMut(Event);
36
37    /// Call user-defined "shutdown" hook.
38    fn hook_shutdown<F>(&mut self, _emit_fn: F)
39    where
40        F: FnMut(Event),
41    {
42    }
43
44    /// Call user-defined timer handler.
45    fn timer_handler<F>(&mut self, _timer: Timer, _emit_fn: F)
46    where
47        F: FnMut(Event),
48    {
49    }
50
51    /// Return (static) list of user-defined timers.
52    fn timers(&self) -> Vec<Timer> {
53        Vec::new()
54    }
55
56    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
57        let mut maybe = None;
58        self.hook_process(event, |event| maybe = Some(event));
59        output.extend(maybe.into_iter());
60    }
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug)]
65enum Message {
66    Init,
67    Process(Event),
68    Shutdown,
69    Timer(Timer),
70}
71
72impl<T> TaskTransform<Event> for T
73where
74    T: RuntimeTransform + Send + 'static,
75{
76    fn transform(
77        mut self: Box<Self>,
78        input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
79    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
80    where
81        Self: 'static,
82    {
83        let timers = self.timers();
84        let mut is_shutdown: bool = false; // TODO: consider using an enum describing the state instead of a
85                                           // a single boolean variable.
86                                           // It is used to prevent timers to emit messages after the source
87                                           // stream stopped.
88
89        Box::pin(
90            input_rx
91                .map(Message::Process)
92                .fuse()
93                .into_future()
94                .map(move |(first, rest)| {
95                    // The first message is always `Message::Init`.
96                    let init_msg = stream::once(ready(Message::Init));
97                    // After it comes the first event, if any.
98                    let first_event = first.map_or_else(
99                        || stream::empty().boxed(),
100                        |msg| stream::once(ready(msg)).boxed(),
101                    );
102                    // Then all other events followed by `Message::Shutdown` message
103                    let rest_events_and_shutdown_msg =
104                        rest.chain(stream::once(ready(Message::Shutdown)));
105                    // A stream of `Message::Timer(..)` events generated by timers.
106                    let timer_msgs = make_timer_msgs_stream(timers);
107
108                    init_msg
109                        .chain(first_event)
110                        .chain(
111                            // We need to finish when `rest_events_and_shutdown_msg` finishes so
112                            // not to hang on timers, but not finish when `timer_msgs` finishes
113                            // as there may not be any timer.
114                            rest_events_and_shutdown_msg
115                                .select_weak(timer_msgs.chain(stream::pending())),
116                        )
117                        .boxed()
118                })
119                .into_stream()
120                .flatten()
121                .map(move |msg| {
122                    let mut acc = Vec::new(); // TODO: create a stream adaptor to avoid buffering all events
123                    if !is_shutdown {
124                        match msg {
125                            Message::Init => self.hook_init(|event| acc.push(event)),
126                            Message::Process(event) => {
127                                self.hook_process(event, |event| acc.push(event));
128                            }
129                            Message::Shutdown => {
130                                self.hook_shutdown(|event| acc.push(event));
131                                is_shutdown = true;
132                            }
133                            Message::Timer(timer) => {
134                                self.timer_handler(timer, |event| acc.push(event));
135                            }
136                        }
137                    }
138                    stream::iter(acc).boxed()
139                })
140                .flatten()
141                .boxed(),
142        )
143    }
144}
145
146fn make_timer_msgs_stream(timers: Vec<Timer>) -> BoxStream<'static, Message> {
147    let streams = timers.into_iter().map(|timer| {
148        IntervalStream::new(time::interval(timer.interval)).map(move |_| Message::Timer(timer))
149    });
150    stream::select_all(streams).boxed()
151}