vector_core/transform/
mod.rs

1use std::{collections::HashMap, error, pin::Pin, sync::Arc, time::Instant};
2
3use futures::{Stream, StreamExt};
4use vector_common::{
5    EventDataEq,
6    byte_size_of::ByteSizeOf,
7    internal_event::{
8        self, CountByteSize, DEFAULT_OUTPUT, EventsSent, InternalEventHandle as _, Registered,
9        register,
10    },
11    json_size::JsonSize,
12};
13
14use crate::{
15    config,
16    config::{ComponentKey, OutputId},
17    event::{
18        EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventMutRef, EventRef,
19        into_event_stream,
20    },
21    fanout::{self, Fanout},
22    schema,
23    schema::Definition,
24};
25
26#[cfg(feature = "lua")]
27pub mod runtime_transform;
28
29/// Transforms come in two variants. Functions, or tasks.
30///
31/// While function transforms can be run out of order, or concurrently, task
32/// transforms act as a coordination or barrier point.
33pub enum Transform {
34    Function(Box<dyn FunctionTransform>),
35    Synchronous(Box<dyn SyncTransform>),
36    Task(Box<dyn TaskTransform<EventArray>>),
37}
38
39impl Transform {
40    /// Create a new function transform.
41    ///
42    /// These functions are "stateless" and can be run in parallel, without
43    /// regard for coordination.
44    ///
45    /// **Note:** You should prefer to implement this over [`TaskTransform`]
46    /// where possible.
47    pub fn function(v: impl FunctionTransform + 'static) -> Self {
48        Transform::Function(Box::new(v))
49    }
50
51    /// Create a new synchronous transform.
52    ///
53    /// This is a broader trait than the simple [`FunctionTransform`] in that it allows transforms
54    /// to write to multiple outputs. Those outputs must be known in advanced and returned via
55    /// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
56    /// considered a bug and will cause a panic.
57    pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
58        Transform::Synchronous(Box::new(v))
59    }
60
61    /// Create a new task transform.
62    ///
63    /// These tasks are coordinated, and map a stream of some `U` to some other
64    /// `T`.
65    ///
66    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
67    /// where possible.
68    pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
69        Transform::Task(Box::new(v))
70    }
71
72    /// Create a new task transform over individual `Event`s.
73    ///
74    /// These tasks are coordinated, and map a stream of some `U` to some other
75    /// `T`.
76    ///
77    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
78    /// where possible.
79    ///
80    /// # Panics
81    ///
82    /// TODO
83    pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
84        Transform::Task(Box::new(WrapEventTask(v)))
85    }
86
87    /// Transmute the inner transform into a task transform.
88    ///
89    /// # Panics
90    ///
91    /// If the transform is a [`FunctionTransform`] this will panic.
92    pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
93        match self {
94            Transform::Task(t) => t,
95            _ => {
96                panic!("Called `Transform::into_task` on something that was not a task variant.")
97            }
98        }
99    }
100}
101
102/// Transforms that are simple, and don't require attention to coordination.
103/// You can run them as simple functions over events in any order.
104///
105/// # Invariants
106///
107/// * It is an illegal invariant to implement `FunctionTransform` for a
108///   `TaskTransform` or vice versa.
109pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
110    fn transform(&mut self, output: &mut OutputBuffer, event: Event);
111}
112
113dyn_clone::clone_trait_object!(FunctionTransform);
114
115/// Transforms that tend to be more complicated runtime style components.
116///
117/// These require coordination and map a stream of some `T` to some `U`.
118///
119/// # Invariants
120///
121/// * It is an illegal invariant to implement `FunctionTransform` for a
122///   `TaskTransform` or vice versa.
123pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
124    fn transform(
125        self: Box<Self>,
126        task: Pin<Box<dyn Stream<Item = T> + Send>>,
127    ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
128
129    /// Wrap the transform task to process and emit individual
130    /// events. This is used to simplify testing task transforms.
131    fn transform_events(
132        self: Box<Self>,
133        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
134    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
135    where
136        T: From<Event>,
137        T::IntoIter: Send,
138    {
139        self.transform(task.map(Into::into).boxed())
140            .flat_map(into_event_stream)
141            .boxed()
142    }
143}
144
145/// Broader than the simple [`FunctionTransform`], this trait allows transforms to write to
146/// multiple outputs. Those outputs must be known in advanced and returned via
147/// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
148/// considered a bug and will cause a panic.
149pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
150    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
151
152    fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
153        for event in events.into_events() {
154            self.transform(event, output);
155        }
156    }
157}
158
159dyn_clone::clone_trait_object!(SyncTransform);
160
161impl<T> SyncTransform for T
162where
163    T: FunctionTransform,
164{
165    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
166        FunctionTransform::transform(
167            self,
168            output.primary_buffer.as_mut().expect("no default output"),
169            event,
170        );
171    }
172}
173
174// TODO: this is a bit ugly when we already have the above impl
175impl SyncTransform for Box<dyn FunctionTransform> {
176    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
177        FunctionTransform::transform(
178            self.as_mut(),
179            output.primary_buffer.as_mut().expect("no default output"),
180            event,
181        );
182    }
183}
184
185struct TransformOutput {
186    fanout: Fanout,
187    events_sent: Registered<EventsSent>,
188    log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
189    output_id: Arc<OutputId>,
190}
191
192pub struct TransformOutputs {
193    outputs_spec: Vec<config::TransformOutput>,
194    primary_output: Option<TransformOutput>,
195    named_outputs: HashMap<String, TransformOutput>,
196}
197
198impl TransformOutputs {
199    pub fn new(
200        outputs_in: Vec<config::TransformOutput>,
201        component_key: &ComponentKey,
202    ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
203        let outputs_spec = outputs_in.clone();
204        let mut primary_output = None;
205        let mut named_outputs = HashMap::new();
206        let mut controls = HashMap::new();
207
208        for output in outputs_in {
209            let (fanout, control) = Fanout::new();
210
211            let log_schema_definitions = output
212                .log_schema_definitions
213                .into_iter()
214                .map(|(id, definition)| (id, Arc::new(definition)))
215                .collect();
216
217            match output.port {
218                None => {
219                    primary_output = Some(TransformOutput {
220                        fanout,
221                        events_sent: register(EventsSent::from(internal_event::Output(Some(
222                            DEFAULT_OUTPUT.into(),
223                        )))),
224                        log_schema_definitions,
225                        output_id: Arc::new(OutputId {
226                            component: component_key.clone(),
227                            port: None,
228                        }),
229                    });
230                    controls.insert(None, control);
231                }
232                Some(name) => {
233                    named_outputs.insert(
234                        name.clone(),
235                        TransformOutput {
236                            fanout,
237                            events_sent: register(EventsSent::from(internal_event::Output(Some(
238                                name.clone().into(),
239                            )))),
240                            log_schema_definitions,
241                            output_id: Arc::new(OutputId {
242                                component: component_key.clone(),
243                                port: Some(name.clone()),
244                            }),
245                        },
246                    );
247                    controls.insert(Some(name.clone()), control);
248                }
249            }
250        }
251
252        let me = Self {
253            outputs_spec,
254            primary_output,
255            named_outputs,
256        };
257
258        (me, controls)
259    }
260
261    pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
262        TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
263    }
264
265    /// Sends the events in the buffer to their respective outputs.
266    ///
267    /// # Errors
268    ///
269    /// If an error occurs while sending events to their respective output, an error variant will be
270    /// returned detailing the cause.
271    pub async fn send(
272        &mut self,
273        buf: &mut TransformOutputsBuf,
274    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
275        if let Some(primary) = self.primary_output.as_mut() {
276            let buf = buf
277                .primary_buffer
278                .as_mut()
279                .unwrap_or_else(|| unreachable!("mismatched outputs"));
280            Self::send_single_buffer(buf, primary).await?;
281        }
282        for (key, buf) in &mut buf.named_buffers {
283            let output = self
284                .named_outputs
285                .get_mut(key)
286                .unwrap_or_else(|| unreachable!("unknown output"));
287            Self::send_single_buffer(buf, output).await?;
288        }
289        Ok(())
290    }
291
292    async fn send_single_buffer(
293        buf: &mut OutputBuffer,
294        output: &mut TransformOutput,
295    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
296        for event in buf.events_mut() {
297            update_runtime_schema_definition(
298                event,
299                &output.output_id,
300                &output.log_schema_definitions,
301            );
302        }
303        let count = buf.len();
304        let byte_size = buf.estimated_json_encoded_size_of();
305        buf.send(&mut output.fanout).await?;
306        output.events_sent.emit(CountByteSize(count, byte_size));
307        Ok(())
308    }
309}
310
311#[allow(clippy::implicit_hasher)]
312/// `event`: The event that will be updated
313/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`)
314/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event
315pub fn update_runtime_schema_definition(
316    mut event: EventMutRef,
317    output_id: &Arc<OutputId>,
318    log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
319) {
320    if let EventMutRef::Log(log) = &mut event {
321        if let Some(parent_component_id) = log.metadata().upstream_id() {
322            if let Some(definition) = log_schema_definitions.get(parent_component_id) {
323                log.metadata_mut().set_schema_definition(definition);
324            }
325        } else {
326            // there is no parent defined. That means this event originated from a component that
327            // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the
328            // schema definitions _must_ be the same, so the first one is picked
329            if let Some(definition) = log_schema_definitions.values().next() {
330                log.metadata_mut().set_schema_definition(definition);
331            }
332        }
333    }
334    event.metadata_mut().set_upstream_id(Arc::clone(output_id));
335}
336
337#[derive(Debug, Clone)]
338pub struct TransformOutputsBuf {
339    primary_buffer: Option<OutputBuffer>,
340    named_buffers: HashMap<String, OutputBuffer>,
341}
342
343impl TransformOutputsBuf {
344    pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
345        let mut primary_buffer = None;
346        let mut named_buffers = HashMap::new();
347
348        for output in outputs_in {
349            match output.port {
350                None => {
351                    primary_buffer = Some(OutputBuffer::with_capacity(capacity));
352                }
353                Some(name) => {
354                    named_buffers.insert(name.clone(), OutputBuffer::default());
355                }
356            }
357        }
358
359        Self {
360            primary_buffer,
361            named_buffers,
362        }
363    }
364
365    /// Adds a new event to the named output buffer.
366    ///
367    /// # Panics
368    ///
369    /// Panics if there is no output with the given name.
370    pub fn push(&mut self, name: Option<&str>, event: Event) {
371        match name {
372            Some(name) => self.named_buffers.get_mut(name),
373            None => self.primary_buffer.as_mut(),
374        }
375        .expect("unknown output")
376        .push(event);
377    }
378
379    /// Drains the default output buffer.
380    ///
381    /// # Panics
382    ///
383    /// Panics if there is no default output.
384    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
385        self.primary_buffer
386            .as_mut()
387            .expect("no default output")
388            .drain()
389    }
390
391    /// Drains the named output buffer.
392    ///
393    /// # Panics
394    ///
395    /// Panics if there is no output with the given name.
396    pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
397        self.named_buffers
398            .get_mut(name)
399            .expect("unknown output")
400            .drain()
401    }
402
403    /// Takes the default output buffer.
404    ///
405    /// # Panics
406    ///
407    /// Panics if there is no default output.
408    pub fn take_primary(&mut self) -> OutputBuffer {
409        std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
410    }
411
412    pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
413        std::mem::take(&mut self.named_buffers)
414    }
415}
416
417impl ByteSizeOf for TransformOutputsBuf {
418    fn allocated_bytes(&self) -> usize {
419        self.primary_buffer.size_of()
420            + self
421                .named_buffers
422                .values()
423                .map(ByteSizeOf::size_of)
424                .sum::<usize>()
425    }
426}
427
428#[derive(Debug, Default, Clone)]
429pub struct OutputBuffer(Vec<EventArray>);
430
431impl OutputBuffer {
432    pub fn with_capacity(capacity: usize) -> Self {
433        Self(Vec::with_capacity(capacity))
434    }
435
436    pub fn push(&mut self, event: Event) {
437        // Coalesce multiple pushes of the same type into one array.
438        match (event, self.0.last_mut()) {
439            (Event::Log(log), Some(EventArray::Logs(logs))) => {
440                logs.push(log);
441            }
442            (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
443                metrics.push(metric);
444            }
445            (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
446                traces.push(trace);
447            }
448            (event, _) => {
449                self.0.push(event.into());
450            }
451        }
452    }
453
454    pub fn append(&mut self, events: &mut Vec<Event>) {
455        for event in events.drain(..) {
456            self.push(event);
457        }
458    }
459
460    pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
461        for event in events {
462            self.push(event);
463        }
464    }
465
466    pub fn is_empty(&self) -> bool {
467        self.0.is_empty()
468    }
469
470    pub fn len(&self) -> usize {
471        self.0.iter().map(EventArray::len).sum()
472    }
473
474    pub fn capacity(&self) -> usize {
475        self.0.capacity()
476    }
477
478    pub fn first(&self) -> Option<EventRef<'_>> {
479        self.0.first().and_then(|first| match first {
480            EventArray::Logs(l) => l.first().map(Into::into),
481            EventArray::Metrics(m) => m.first().map(Into::into),
482            EventArray::Traces(t) => t.first().map(Into::into),
483        })
484    }
485
486    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
487        self.0.drain(..).flat_map(EventArray::into_events)
488    }
489
490    async fn send(
491        &mut self,
492        output: &mut Fanout,
493    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
494        let send_start = Some(Instant::now());
495        for array in std::mem::take(&mut self.0) {
496            output.send(array, send_start).await?;
497        }
498
499        Ok(())
500    }
501
502    fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
503        self.0.iter().flat_map(EventArray::iter_events)
504    }
505
506    fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
507        self.0.iter_mut().flat_map(EventArray::iter_events_mut)
508    }
509
510    pub fn into_events(self) -> impl Iterator<Item = Event> {
511        self.0.into_iter().flat_map(EventArray::into_events)
512    }
513}
514
515impl ByteSizeOf for OutputBuffer {
516    fn allocated_bytes(&self) -> usize {
517        self.0.iter().map(ByteSizeOf::size_of).sum()
518    }
519}
520
521impl EventDataEq<Vec<Event>> for OutputBuffer {
522    fn event_data_eq(&self, other: &Vec<Event>) -> bool {
523        struct Comparator<'a>(EventRef<'a>);
524
525        impl PartialEq<&Event> for Comparator<'_> {
526            fn eq(&self, that: &&Event) -> bool {
527                self.0.event_data_eq(that)
528            }
529        }
530
531        self.iter_events().map(Comparator).eq(other.iter())
532    }
533}
534
535impl EstimatedJsonEncodedSizeOf for OutputBuffer {
536    fn estimated_json_encoded_size_of(&self) -> JsonSize {
537        self.0
538            .iter()
539            .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
540            .sum()
541    }
542}
543
544impl From<Vec<Event>> for OutputBuffer {
545    fn from(events: Vec<Event>) -> Self {
546        let mut result = Self::default();
547        result.extend(events.into_iter());
548        result
549    }
550}
551
552struct WrapEventTask<T>(T);
553
554impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
555    fn transform(
556        self: Box<Self>,
557        stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
558    ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
559        // This is an awful lot of boxes
560        let stream = stream.flat_map(into_event_stream).boxed();
561        Box::new(self.0).transform(stream).map(Into::into).boxed()
562    }
563}
564
565#[cfg(test)]
566mod test {
567    use super::*;
568    use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
569
570    #[test]
571    fn buffers_output() {
572        let mut buf = OutputBuffer::default();
573        assert_eq!(buf.len(), 0);
574        assert_eq!(buf.0.len(), 0);
575
576        // Push adds a new element
577        buf.push(LogEvent::default().into());
578        assert_eq!(buf.len(), 1);
579        assert_eq!(buf.0.len(), 1);
580
581        // Push of the same type adds to the existing element
582        buf.push(LogEvent::default().into());
583        assert_eq!(buf.len(), 2);
584        assert_eq!(buf.0.len(), 1);
585
586        // Push of a different type adds a new element
587        buf.push(
588            Metric::new(
589                "name",
590                MetricKind::Absolute,
591                MetricValue::Counter { value: 1.0 },
592            )
593            .into(),
594        );
595        assert_eq!(buf.len(), 3);
596        assert_eq!(buf.0.len(), 2);
597
598        // And pushing again adds a new element
599        buf.push(LogEvent::default().into());
600        assert_eq!(buf.len(), 4);
601        assert_eq!(buf.0.len(), 3);
602    }
603}