vector_core/transform/runtime_transform/
mod.rs1mod vec_stream;
2
3use std::{future::ready, pin::Pin, time::Duration};
4
5use futures::{
6 stream::{self, BoxStream},
7 FutureExt, Stream, StreamExt,
8};
9use tokio::time;
10use tokio_stream::wrappers::IntervalStream;
11use vec_stream::VecStreamExt;
12
13use super::{OutputBuffer, TaskTransform};
14use crate::event::Event;
15
16#[derive(Clone, Copy, Debug)]
18pub struct Timer {
19 pub id: u32,
20 pub interval: Duration,
21}
22
23pub trait RuntimeTransform {
25 fn hook_init<F>(&mut self, _emit_fn: F)
27 where
28 F: FnMut(Event),
29 {
30 }
31
32 fn hook_process<F>(&mut self, event: Event, emit_fn: F)
34 where
35 F: FnMut(Event);
36
37 fn hook_shutdown<F>(&mut self, _emit_fn: F)
39 where
40 F: FnMut(Event),
41 {
42 }
43
44 fn timer_handler<F>(&mut self, _timer: Timer, _emit_fn: F)
46 where
47 F: FnMut(Event),
48 {
49 }
50
51 fn timers(&self) -> Vec<Timer> {
53 Vec::new()
54 }
55
56 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
57 let mut maybe = None;
58 self.hook_process(event, |event| maybe = Some(event));
59 output.extend(maybe.into_iter());
60 }
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug)]
65enum Message {
66 Init,
67 Process(Event),
68 Shutdown,
69 Timer(Timer),
70}
71
72impl<T> TaskTransform<Event> for T
73where
74 T: RuntimeTransform + Send + 'static,
75{
76 fn transform(
77 mut self: Box<Self>,
78 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
79 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
80 where
81 Self: 'static,
82 {
83 let timers = self.timers();
84 let mut is_shutdown: bool = false; Box::pin(
90 input_rx
91 .map(Message::Process)
92 .fuse()
93 .into_future()
94 .map(move |(first, rest)| {
95 let init_msg = stream::once(ready(Message::Init));
97 let first_event = first.map_or_else(
99 || stream::empty().boxed(),
100 |msg| stream::once(ready(msg)).boxed(),
101 );
102 let rest_events_and_shutdown_msg =
104 rest.chain(stream::once(ready(Message::Shutdown)));
105 let timer_msgs = make_timer_msgs_stream(timers);
107
108 init_msg
109 .chain(first_event)
110 .chain(
111 rest_events_and_shutdown_msg
115 .select_weak(timer_msgs.chain(stream::pending())),
116 )
117 .boxed()
118 })
119 .into_stream()
120 .flatten()
121 .map(move |msg| {
122 let mut acc = Vec::new(); if !is_shutdown {
124 match msg {
125 Message::Init => self.hook_init(|event| acc.push(event)),
126 Message::Process(event) => {
127 self.hook_process(event, |event| acc.push(event));
128 }
129 Message::Shutdown => {
130 self.hook_shutdown(|event| acc.push(event));
131 is_shutdown = true;
132 }
133 Message::Timer(timer) => {
134 self.timer_handler(timer, |event| acc.push(event));
135 }
136 }
137 }
138 stream::iter(acc).boxed()
139 })
140 .flatten()
141 .boxed(),
142 )
143 }
144}
145
146fn make_timer_msgs_stream(timers: Vec<Timer>) -> BoxStream<'static, Message> {
147 let streams = timers.into_iter().map(|timer| {
148 IntervalStream::new(time::interval(timer.interval)).map(move |_| Message::Timer(timer))
149 });
150 stream::select_all(streams).boxed()
151}