vector_core/transform/
mod.rs

1use std::{collections::HashMap, error, pin::Pin, sync::Arc, time::Instant};
2
3use futures::{Stream, StreamExt};
4use vector_common::internal_event::{
5    self, register, CountByteSize, EventsSent, InternalEventHandle as _, Registered, DEFAULT_OUTPUT,
6};
7use vector_common::{byte_size_of::ByteSizeOf, json_size::JsonSize, EventDataEq};
8
9use crate::config::{ComponentKey, OutputId};
10use crate::event::EventMutRef;
11use crate::schema::Definition;
12use crate::{
13    config,
14    event::{
15        into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef,
16    },
17    fanout::{self, Fanout},
18    schema,
19};
20
21#[cfg(feature = "lua")]
22pub mod runtime_transform;
23
24/// Transforms come in two variants. Functions, or tasks.
25///
26/// While function transforms can be run out of order, or concurrently, task
27/// transforms act as a coordination or barrier point.
28pub enum Transform {
29    Function(Box<dyn FunctionTransform>),
30    Synchronous(Box<dyn SyncTransform>),
31    Task(Box<dyn TaskTransform<EventArray>>),
32}
33
34impl Transform {
35    /// Create a new function transform.
36    ///
37    /// These functions are "stateless" and can be run in parallel, without
38    /// regard for coordination.
39    ///
40    /// **Note:** You should prefer to implement this over [`TaskTransform`]
41    /// where possible.
42    pub fn function(v: impl FunctionTransform + 'static) -> Self {
43        Transform::Function(Box::new(v))
44    }
45
46    /// Create a new synchronous transform.
47    ///
48    /// This is a broader trait than the simple [`FunctionTransform`] in that it allows transforms
49    /// to write to multiple outputs. Those outputs must be known in advanced and returned via
50    /// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
51    /// considered a bug and will cause a panic.
52    pub fn synchronous(v: impl SyncTransform + 'static) -> Self {
53        Transform::Synchronous(Box::new(v))
54    }
55
56    /// Create a new task transform.
57    ///
58    /// These tasks are coordinated, and map a stream of some `U` to some other
59    /// `T`.
60    ///
61    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
62    /// where possible.
63    pub fn task(v: impl TaskTransform<EventArray> + 'static) -> Self {
64        Transform::Task(Box::new(v))
65    }
66
67    /// Create a new task transform over individual `Event`s.
68    ///
69    /// These tasks are coordinated, and map a stream of some `U` to some other
70    /// `T`.
71    ///
72    /// **Note:** You should prefer to implement [`FunctionTransform`] over this
73    /// where possible.
74    ///
75    /// # Panics
76    ///
77    /// TODO
78    pub fn event_task(v: impl TaskTransform<Event> + 'static) -> Self {
79        Transform::Task(Box::new(WrapEventTask(v)))
80    }
81
82    /// Transmute the inner transform into a task transform.
83    ///
84    /// # Panics
85    ///
86    /// If the transform is a [`FunctionTransform`] this will panic.
87    pub fn into_task(self) -> Box<dyn TaskTransform<EventArray>> {
88        match self {
89            Transform::Task(t) => t,
90            _ => {
91                panic!("Called `Transform::into_task` on something that was not a task variant.")
92            }
93        }
94    }
95}
96
97/// Transforms that are simple, and don't require attention to coordination.
98/// You can run them as simple functions over events in any order.
99///
100/// # Invariants
101///
102/// * It is an illegal invariant to implement `FunctionTransform` for a
103///   `TaskTransform` or vice versa.
104pub trait FunctionTransform: Send + dyn_clone::DynClone + Sync {
105    fn transform(&mut self, output: &mut OutputBuffer, event: Event);
106}
107
108dyn_clone::clone_trait_object!(FunctionTransform);
109
110/// Transforms that tend to be more complicated runtime style components.
111///
112/// These require coordination and map a stream of some `T` to some `U`.
113///
114/// # Invariants
115///
116/// * It is an illegal invariant to implement `FunctionTransform` for a
117///   `TaskTransform` or vice versa.
118pub trait TaskTransform<T: EventContainer + 'static>: Send + 'static {
119    fn transform(
120        self: Box<Self>,
121        task: Pin<Box<dyn Stream<Item = T> + Send>>,
122    ) -> Pin<Box<dyn Stream<Item = T> + Send>>;
123
124    /// Wrap the transform task to process and emit individual
125    /// events. This is used to simplify testing task transforms.
126    fn transform_events(
127        self: Box<Self>,
128        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
129    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
130    where
131        T: From<Event>,
132        T::IntoIter: Send,
133    {
134        self.transform(task.map(Into::into).boxed())
135            .flat_map(into_event_stream)
136            .boxed()
137    }
138}
139
140/// Broader than the simple [`FunctionTransform`], this trait allows transforms to write to
141/// multiple outputs. Those outputs must be known in advanced and returned via
142/// `TransformConfig::outputs`. Attempting to send to any output not registered in advance is
143/// considered a bug and will cause a panic.
144pub trait SyncTransform: Send + dyn_clone::DynClone + Sync {
145    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf);
146
147    fn transform_all(&mut self, events: EventArray, output: &mut TransformOutputsBuf) {
148        for event in events.into_events() {
149            self.transform(event, output);
150        }
151    }
152}
153
154dyn_clone::clone_trait_object!(SyncTransform);
155
156impl<T> SyncTransform for T
157where
158    T: FunctionTransform,
159{
160    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
161        FunctionTransform::transform(
162            self,
163            output.primary_buffer.as_mut().expect("no default output"),
164            event,
165        );
166    }
167}
168
169// TODO: this is a bit ugly when we already have the above impl
170impl SyncTransform for Box<dyn FunctionTransform> {
171    fn transform(&mut self, event: Event, output: &mut TransformOutputsBuf) {
172        FunctionTransform::transform(
173            self.as_mut(),
174            output.primary_buffer.as_mut().expect("no default output"),
175            event,
176        );
177    }
178}
179
180struct TransformOutput {
181    fanout: Fanout,
182    events_sent: Registered<EventsSent>,
183    log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
184    output_id: Arc<OutputId>,
185}
186
187pub struct TransformOutputs {
188    outputs_spec: Vec<config::TransformOutput>,
189    primary_output: Option<TransformOutput>,
190    named_outputs: HashMap<String, TransformOutput>,
191}
192
193impl TransformOutputs {
194    pub fn new(
195        outputs_in: Vec<config::TransformOutput>,
196        component_key: &ComponentKey,
197    ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
198        let outputs_spec = outputs_in.clone();
199        let mut primary_output = None;
200        let mut named_outputs = HashMap::new();
201        let mut controls = HashMap::new();
202
203        for output in outputs_in {
204            let (fanout, control) = Fanout::new();
205
206            let log_schema_definitions = output
207                .log_schema_definitions
208                .into_iter()
209                .map(|(id, definition)| (id, Arc::new(definition)))
210                .collect();
211
212            match output.port {
213                None => {
214                    primary_output = Some(TransformOutput {
215                        fanout,
216                        events_sent: register(EventsSent::from(internal_event::Output(Some(
217                            DEFAULT_OUTPUT.into(),
218                        )))),
219                        log_schema_definitions,
220                        output_id: Arc::new(OutputId {
221                            component: component_key.clone(),
222                            port: None,
223                        }),
224                    });
225                    controls.insert(None, control);
226                }
227                Some(name) => {
228                    named_outputs.insert(
229                        name.clone(),
230                        TransformOutput {
231                            fanout,
232                            events_sent: register(EventsSent::from(internal_event::Output(Some(
233                                name.clone().into(),
234                            )))),
235                            log_schema_definitions,
236                            output_id: Arc::new(OutputId {
237                                component: component_key.clone(),
238                                port: Some(name.clone()),
239                            }),
240                        },
241                    );
242                    controls.insert(Some(name.clone()), control);
243                }
244            }
245        }
246
247        let me = Self {
248            outputs_spec,
249            primary_output,
250            named_outputs,
251        };
252
253        (me, controls)
254    }
255
256    pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
257        TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
258    }
259
260    /// Sends the events in the buffer to their respective outputs.
261    ///
262    /// # Errors
263    ///
264    /// If an error occurs while sending events to their respective output, an error variant will be
265    /// returned detailing the cause.
266    pub async fn send(
267        &mut self,
268        buf: &mut TransformOutputsBuf,
269    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
270        if let Some(primary) = self.primary_output.as_mut() {
271            let buf = buf
272                .primary_buffer
273                .as_mut()
274                .unwrap_or_else(|| unreachable!("mismatched outputs"));
275            Self::send_single_buffer(buf, primary).await?;
276        }
277        for (key, buf) in &mut buf.named_buffers {
278            let output = self
279                .named_outputs
280                .get_mut(key)
281                .unwrap_or_else(|| unreachable!("unknown output"));
282            Self::send_single_buffer(buf, output).await?;
283        }
284        Ok(())
285    }
286
287    async fn send_single_buffer(
288        buf: &mut OutputBuffer,
289        output: &mut TransformOutput,
290    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
291        for event in buf.events_mut() {
292            update_runtime_schema_definition(
293                event,
294                &output.output_id,
295                &output.log_schema_definitions,
296            );
297        }
298        let count = buf.len();
299        let byte_size = buf.estimated_json_encoded_size_of();
300        buf.send(&mut output.fanout).await?;
301        output.events_sent.emit(CountByteSize(count, byte_size));
302        Ok(())
303    }
304}
305
306#[allow(clippy::implicit_hasher)]
307/// `event`: The event that will be updated
308/// `output_id`: The `output_id` that the current even is being sent to (will be used as the new `parent_id`)
309/// `log_schema_definitions`: A mapping of parent `OutputId` to definitions, that will be used to lookup the new runtime definition of the event
310pub fn update_runtime_schema_definition(
311    mut event: EventMutRef,
312    output_id: &Arc<OutputId>,
313    log_schema_definitions: &HashMap<OutputId, Arc<Definition>>,
314) {
315    if let EventMutRef::Log(log) = &mut event {
316        if let Some(parent_component_id) = log.metadata().upstream_id() {
317            if let Some(definition) = log_schema_definitions.get(parent_component_id) {
318                log.metadata_mut().set_schema_definition(definition);
319            }
320        } else {
321            // there is no parent defined. That means this event originated from a component that
322            // isn't able to track the source, such as `reduce` or `lua`. In these cases, all of the
323            // schema definitions _must_ be the same, so the first one is picked
324            if let Some(definition) = log_schema_definitions.values().next() {
325                log.metadata_mut().set_schema_definition(definition);
326            }
327        }
328    }
329    event.metadata_mut().set_upstream_id(Arc::clone(output_id));
330}
331
332#[derive(Debug, Clone)]
333pub struct TransformOutputsBuf {
334    primary_buffer: Option<OutputBuffer>,
335    named_buffers: HashMap<String, OutputBuffer>,
336}
337
338impl TransformOutputsBuf {
339    pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
340        let mut primary_buffer = None;
341        let mut named_buffers = HashMap::new();
342
343        for output in outputs_in {
344            match output.port {
345                None => {
346                    primary_buffer = Some(OutputBuffer::with_capacity(capacity));
347                }
348                Some(name) => {
349                    named_buffers.insert(name.clone(), OutputBuffer::default());
350                }
351            }
352        }
353
354        Self {
355            primary_buffer,
356            named_buffers,
357        }
358    }
359
360    /// Adds a new event to the named output buffer.
361    ///
362    /// # Panics
363    ///
364    /// Panics if there is no output with the given name.
365    pub fn push(&mut self, name: Option<&str>, event: Event) {
366        match name {
367            Some(name) => self.named_buffers.get_mut(name),
368            None => self.primary_buffer.as_mut(),
369        }
370        .expect("unknown output")
371        .push(event);
372    }
373
374    /// Drains the default output buffer.
375    ///
376    /// # Panics
377    ///
378    /// Panics if there is no default output.
379    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
380        self.primary_buffer
381            .as_mut()
382            .expect("no default output")
383            .drain()
384    }
385
386    /// Drains the named output buffer.
387    ///
388    /// # Panics
389    ///
390    /// Panics if there is no output with the given name.
391    pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
392        self.named_buffers
393            .get_mut(name)
394            .expect("unknown output")
395            .drain()
396    }
397
398    /// Takes the default output buffer.
399    ///
400    /// # Panics
401    ///
402    /// Panics if there is no default output.
403    pub fn take_primary(&mut self) -> OutputBuffer {
404        std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
405    }
406
407    pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
408        std::mem::take(&mut self.named_buffers)
409    }
410}
411
412impl ByteSizeOf for TransformOutputsBuf {
413    fn allocated_bytes(&self) -> usize {
414        self.primary_buffer.size_of()
415            + self
416                .named_buffers
417                .values()
418                .map(ByteSizeOf::size_of)
419                .sum::<usize>()
420    }
421}
422
423#[derive(Debug, Default, Clone)]
424pub struct OutputBuffer(Vec<EventArray>);
425
426impl OutputBuffer {
427    pub fn with_capacity(capacity: usize) -> Self {
428        Self(Vec::with_capacity(capacity))
429    }
430
431    pub fn push(&mut self, event: Event) {
432        // Coalesce multiple pushes of the same type into one array.
433        match (event, self.0.last_mut()) {
434            (Event::Log(log), Some(EventArray::Logs(logs))) => {
435                logs.push(log);
436            }
437            (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
438                metrics.push(metric);
439            }
440            (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
441                traces.push(trace);
442            }
443            (event, _) => {
444                self.0.push(event.into());
445            }
446        }
447    }
448
449    pub fn append(&mut self, events: &mut Vec<Event>) {
450        for event in events.drain(..) {
451            self.push(event);
452        }
453    }
454
455    pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
456        for event in events {
457            self.push(event);
458        }
459    }
460
461    pub fn is_empty(&self) -> bool {
462        self.0.is_empty()
463    }
464
465    pub fn len(&self) -> usize {
466        self.0.iter().map(EventArray::len).sum()
467    }
468
469    pub fn capacity(&self) -> usize {
470        self.0.capacity()
471    }
472
473    pub fn first(&self) -> Option<EventRef> {
474        self.0.first().and_then(|first| match first {
475            EventArray::Logs(l) => l.first().map(Into::into),
476            EventArray::Metrics(m) => m.first().map(Into::into),
477            EventArray::Traces(t) => t.first().map(Into::into),
478        })
479    }
480
481    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
482        self.0.drain(..).flat_map(EventArray::into_events)
483    }
484
485    async fn send(
486        &mut self,
487        output: &mut Fanout,
488    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
489        let send_start = Some(Instant::now());
490        for array in std::mem::take(&mut self.0) {
491            output.send(array, send_start).await?;
492        }
493
494        Ok(())
495    }
496
497    fn iter_events(&self) -> impl Iterator<Item = EventRef> {
498        self.0.iter().flat_map(EventArray::iter_events)
499    }
500
501    fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
502        self.0.iter_mut().flat_map(EventArray::iter_events_mut)
503    }
504
505    pub fn into_events(self) -> impl Iterator<Item = Event> {
506        self.0.into_iter().flat_map(EventArray::into_events)
507    }
508}
509
510impl ByteSizeOf for OutputBuffer {
511    fn allocated_bytes(&self) -> usize {
512        self.0.iter().map(ByteSizeOf::size_of).sum()
513    }
514}
515
516impl EventDataEq<Vec<Event>> for OutputBuffer {
517    fn event_data_eq(&self, other: &Vec<Event>) -> bool {
518        struct Comparator<'a>(EventRef<'a>);
519
520        impl PartialEq<&Event> for Comparator<'_> {
521            fn eq(&self, that: &&Event) -> bool {
522                self.0.event_data_eq(that)
523            }
524        }
525
526        self.iter_events().map(Comparator).eq(other.iter())
527    }
528}
529
530impl EstimatedJsonEncodedSizeOf for OutputBuffer {
531    fn estimated_json_encoded_size_of(&self) -> JsonSize {
532        self.0
533            .iter()
534            .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
535            .sum()
536    }
537}
538
539impl From<Vec<Event>> for OutputBuffer {
540    fn from(events: Vec<Event>) -> Self {
541        let mut result = Self::default();
542        result.extend(events.into_iter());
543        result
544    }
545}
546
547struct WrapEventTask<T>(T);
548
549impl<T: TaskTransform<Event> + Send + 'static> TaskTransform<EventArray> for WrapEventTask<T> {
550    fn transform(
551        self: Box<Self>,
552        stream: Pin<Box<dyn Stream<Item = EventArray> + Send>>,
553    ) -> Pin<Box<dyn Stream<Item = EventArray> + Send>> {
554        // This is an awful lot of boxes
555        let stream = stream.flat_map(into_event_stream).boxed();
556        Box::new(self.0).transform(stream).map(Into::into).boxed()
557    }
558}
559
560#[cfg(test)]
561mod test {
562    use super::*;
563    use crate::event::{LogEvent, Metric, MetricKind, MetricValue};
564
565    #[test]
566    fn buffers_output() {
567        let mut buf = OutputBuffer::default();
568        assert_eq!(buf.len(), 0);
569        assert_eq!(buf.0.len(), 0);
570
571        // Push adds a new element
572        buf.push(LogEvent::default().into());
573        assert_eq!(buf.len(), 1);
574        assert_eq!(buf.0.len(), 1);
575
576        // Push of the same type adds to the existing element
577        buf.push(LogEvent::default().into());
578        assert_eq!(buf.len(), 2);
579        assert_eq!(buf.0.len(), 1);
580
581        // Push of a different type adds a new element
582        buf.push(
583            Metric::new(
584                "name",
585                MetricKind::Absolute,
586                MetricValue::Counter { value: 1.0 },
587            )
588            .into(),
589        );
590        assert_eq!(buf.len(), 3);
591        assert_eq!(buf.0.len(), 2);
592
593        // And pushing again adds a new element
594        buf.push(LogEvent::default().into());
595        assert_eq!(buf.len(), 4);
596        assert_eq!(buf.0.len(), 3);
597    }
598}