vector_core/transform/
mod.rs1use std::{collections::HashMap, pin::Pin, sync::Arc};
2
3use futures::{Stream, StreamExt};
4
5use crate::{
6 config::OutputId,
7 event::{Event, EventArray, EventContainer, EventMutRef, into_event_stream},
8 schema::Definition,
9};
10
11mod outputs;
12#[cfg(feature = "lua")]
13pub mod runtime_transform;
14
15pub use outputs::{OutputBuffer, TransformOutputs, TransformOutputsBuf};
16
17pub enum Transform {
22 Function(Box<dyn FunctionTransform>),
23 Synchronous(Box<dyn SyncTransform>),
24 Task(Box<dyn TaskTransform<EventArray>>),
25}
26
27impl Transform {
28 pub fn function(v: impl FunctionTransform + 'static) -> Self {
36 Transform::Function(Box::new(v))
37 }
38
39 pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
46 Transform::Synchronous(Box::new(v))
47 }
48
49 pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
57 Transform::Task(Box::new(v))
58 }
59
60 pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
72 Transform::Task(Box::new(WrapEventTask(v)))
73 }
74
75 pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
81 match self {
82 Transform::Task(t) => t,
83 _ => {
84 panic!("Called `Transform::into_task` on something that was not a task variant.")
85 }
86 }
87 }
88}
89
90pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
98 fn transform(&mut self, output: &mut OutputBuffer, event: Event);
99}
100
101dyn_clone::clone_trait_object!(FunctionTransform);
102
103pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
112 fn transform(
113 self: Box<Self>,
114 task: Pin<Box<dyn Stream<Item = T> + Send>>,
115 ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
116
117 fn transform_events(
120 self: Box<Self>,
121 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
122 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
123 where
124 T: From<Event>,
125 T::IntoIter: Send,
126 {
127 self.transform(task.map(Into::into).boxed())
128 .flat_map(into_event_stream)
129 .boxed()
130 }
131}
132
133pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
138 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
139
140 fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
141 for event in events.into_events() {
142 self.transform(event, output);
143 }
144 }
145}
146
147dyn_clone::clone_trait_object!(SyncTransform);
148
149impl<T> SyncTransform for T
150where
151 T: FunctionTransform,
152{
153 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
154 FunctionTransform::transform(
155 self,
156 output.primary_buffer.as_mut().expect("no default output"),
157 event,
158 );
159 }
160}
161
162impl SyncTransform for Box<dyn FunctionTransform> {
164 fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
165 FunctionTransform::transform(
166 self.as_mut(),
167 output.primary_buffer.as_mut().expect("no default output"),
168 event,
169 );
170 }
171}
172
173#[allow(clippy::implicit_hasher)]
174pub fn update_runtime_schema_definition(
178 mut event: EventMutRef,
179 output_id: &Arc<OutputId>,
180 log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
181) {
182 if let EventMutRef::Log(log) = &mut event {
183 if let Some(parent_component_id) = log.metadata().upstream_id() {
184 if let Some(definition) = log_schema_definitions.get(parent_component_id) {
185 log.metadata_mut().set_schema_definition(definition);
186 }
187 } else {
188 if let Some(definition) = log_schema_definitions.values().next() {
192 log.metadata_mut().set_schema_definition(definition);
193 }
194 }
195 }
196 event.metadata_mut().set_upstream_id(Arc::clone(output_id));
197}
198
199struct WrapEventTask<T>(T);
200
201impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
202 fn transform(
203 self: Box<Self>,
204 stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
205 ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
206 let stream = stream.flat_map(into_event_stream).boxed();
208 Box::new(self.0).transform(stream).map(Into::into).boxed()
209 }
210}
211
212#[cfg(test)]
213mod test {
214 use super::*;
215 use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
216
217 #[test]
218 fn buffers_output() {
219 let mut buf = OutputBuffer::default();
220 assert_eq!(buf.len(), 0);
221 assert_eq!(buf.0.len(), 0);
222
223 buf.push(LogEvent::default().into());
225 assert_eq!(buf.len(), 1);
226 assert_eq!(buf.0.len(), 1);
227
228 buf.push(LogEvent::default().into());
230 assert_eq!(buf.len(), 2);
231 assert_eq!(buf.0.len(), 1);
232
233 buf.push(
235 Metric::new(
236 "name",
237 MetricKind::Absolute,
238 MetricValue::Counter { value: 1.0 },
239 )
240 .into(),
241 );
242 assert_eq!(buf.len(), 3);
243 assert_eq!(buf.0.len(), 2);
244
245 buf.push(LogEvent::default().into());
247 assert_eq!(buf.len(), 4);
248 assert_eq!(buf.0.len(), 3);
249 }
250}