vector_core/transform/
mod.rs

1use std::{collections::HashMap, pin::Pin, sync::Arc};
2
3use futures::{Stream, StreamExt};
4
5use crate::{
6    config::OutputId,
7    event::{Event, EventArray, EventContainer, EventMutRef, into_event_stream},
8    schema::Definition,
9};
10
11mod outputs;
12#[cfg(feature = "lua")]
13pub mod runtime_transform;
14
15pub use outputs::{OutputBuffer, TransformOutputs, TransformOutputsBuf};
16
17/// Transforms come in two variants. Functions, or tasks.
18///
19/// While function transforms can be run out of order, or concurrently, task
20/// transforms act as a coordination or barrier point.
21pub enum Transform {
22    Function(Box<dyn FunctionTransform>),
23    Synchronous(Box<dyn SyncTransform>),
24    Task(Box<dyn TaskTransform<EventArray>>),
25}
26
27impl Transform {
28    /// Create a new function transform.
29    ///
30    /// These functions are "stateless" and can be run in parallel, without
31    /// regard for coordination.
32    ///
33    /// **Note:** You should prefer to implement this over [`TaskTransform`]
34    /// where possible.
35    pub fn function(v: impl FunctionTransform + 'static) -> Self {
36        Transform::Function(Box::new(v))
37    }
38
39    /// Create a new synchronous transform.
40    ///
41    /// This is a broader trait than the simple [`FunctionTransform`] in that it allows transforms
42    /// to write to multiple outputs. Those outputs must be known in advanced and returned via
43    /// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
44    /// considered a bug and will cause a panic.
45    pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
46        Transform::Synchronous(Box::new(v))
47    }
48
49    /// Create a new task transform.
50    ///
51    /// These tasks are coordinated, and map a stream of some `U` to some other
52    /// `T`.
53    ///
54    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
55    /// where possible.
56    pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
57        Transform::Task(Box::new(v))
58    }
59
60    /// Create a new task transform over individual `Event`s.
61    ///
62    /// These tasks are coordinated, and map a stream of some `U` to some other
63    /// `T`.
64    ///
65    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
66    /// where possible.
67    ///
68    /// # Panics
69    ///
70    /// TODO
71    pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
72        Transform::Task(Box::new(WrapEventTask(v)))
73    }
74
75    /// Transmute the inner transform into a task transform.
76    ///
77    /// # Panics
78    ///
79    /// If the transform is a [`FunctionTransform`] this will panic.
80    pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
81        match self {
82            Transform::Task(t) => t,
83            _ => {
84                panic!("Called `Transform::into_task` on something that was not a task variant.")
85            }
86        }
87    }
88}
89
90/// Transforms that are simple, and don't require attention to coordination.
91/// You can run them as simple functions over events in any order.
92///
93/// # Invariants
94///
95/// * It is an illegal invariant to implement `FunctionTransform` for a
96///   `TaskTransform` or vice versa.
97pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
98    fn transform(&mut self, output: &mut OutputBuffer, event: Event);
99}
100
101dyn_clone::clone_trait_object!(FunctionTransform);
102
103/// Transforms that tend to be more complicated runtime style components.
104///
105/// These require coordination and map a stream of some `T` to some `U`.
106///
107/// # Invariants
108///
109/// * It is an illegal invariant to implement `FunctionTransform` for a
110///   `TaskTransform` or vice versa.
111pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
112    fn transform(
113        self: Box<Self>,
114        task: Pin<Box<dyn Stream<Item = T> + Send>>,
115    ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
116
117    /// Wrap the transform task to process and emit individual
118    /// events. This is used to simplify testing task transforms.
119    fn transform_events(
120        self: Box<Self>,
121        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
122    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
123    where
124        T: From<Event>,
125        T::IntoIter: Send,
126    {
127        self.transform(task.map(Into::into).boxed())
128            .flat_map(into_event_stream)
129            .boxed()
130    }
131}
132
133/// Broader than the simple [`FunctionTransform`], this trait allows transforms to write to
134/// multiple outputs. Those outputs must be known in advanced and returned via
135/// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
136/// considered a bug and will cause a panic.
137pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
138    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
139
140    fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
141        for event in events.into_events() {
142            self.transform(event, output);
143        }
144    }
145}
146
147dyn_clone::clone_trait_object!(SyncTransform);
148
149impl<T> SyncTransform for T
150where
151    T: FunctionTransform,
152{
153    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
154        FunctionTransform::transform(
155            self,
156            output.primary_buffer.as_mut().expect("no default output"),
157            event,
158        );
159    }
160}
161
162// TODO: this is a bit ugly when we already have the above impl
163impl SyncTransform for Box<dyn FunctionTransform> {
164    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
165        FunctionTransform::transform(
166            self.as_mut(),
167            output.primary_buffer.as_mut().expect("no default output"),
168            event,
169        );
170    }
171}
172
173#[allow(clippy::implicit_hasher)]
174/// `event`: The event that will be updated
175/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`)
176/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event
177pub fn update_runtime_schema_definition(
178    mut event: EventMutRef,
179    output_id: &Arc<OutputId>,
180    log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
181) {
182    if let EventMutRef::Log(log) = &mut event {
183        if let Some(parent_component_id) = log.metadata().upstream_id() {
184            if let Some(definition) = log_schema_definitions.get(parent_component_id) {
185                log.metadata_mut().set_schema_definition(definition);
186            }
187        } else {
188            // there is no parent defined. That means this event originated from a component that
189            // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the
190            // schema definitions _must_ be the same, so the first one is picked
191            if let Some(definition) = log_schema_definitions.values().next() {
192                log.metadata_mut().set_schema_definition(definition);
193            }
194        }
195    }
196    event.metadata_mut().set_upstream_id(Arc::clone(output_id));
197}
198
199struct WrapEventTask<T>(T);
200
201impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
202    fn transform(
203        self: Box<Self>,
204        stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
205    ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
206        // This is an awful lot of boxes
207        let stream = stream.flat_map(into_event_stream).boxed();
208        Box::new(self.0).transform(stream).map(Into::into).boxed()
209    }
210}
211
212#[cfg(test)]
213mod test {
214    use super::*;
215    use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
216
217    #[test]
218    fn buffers_output() {
219        let mut buf = OutputBuffer::default();
220        assert_eq!(buf.len(), 0);
221        assert_eq!(buf.0.len(), 0);
222
223        // Push adds a new element
224        buf.push(LogEvent::default().into());
225        assert_eq!(buf.len(), 1);
226        assert_eq!(buf.0.len(), 1);
227
228        // Push of the same type adds to the existing element
229        buf.push(LogEvent::default().into());
230        assert_eq!(buf.len(), 2);
231        assert_eq!(buf.0.len(), 1);
232
233        // Push of a different type adds a new element
234        buf.push(
235            Metric::new(
236                "name",
237                MetricKind::Absolute,
238                MetricValue::Counter { value: 1.0 },
239            )
240            .into(),
241        );
242        assert_eq!(buf.len(), 3);
243        assert_eq!(buf.0.len(), 2);
244
245        // And pushing again adds a new element
246        buf.push(LogEvent::default().into());
247        assert_eq!(buf.len(), 4);
248        assert_eq!(buf.0.len(), 3);
249    }
250}