vector_core/transform/
outputs.rs

1use std::{collections::HashMap, error, sync::Arc, time::Instant};
2
3use vector_common::{
4    EventDataEq,
5    byte_size_of::ByteSizeOf,
6    internal_event::{
7        self, CountByteSize, DEFAULT_OUTPUT, EventsSent, InternalEventHandle as _, Registered,
8        register,
9    },
10    json_size::JsonSize,
11};
12
13use crate::{
14    config,
15    config::{ComponentKey, OutputId},
16    event::{EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventMutRef, EventRef},
17    fanout::{self, Fanout},
18    schema,
19};
20
21struct TransformOutput {
22    fanout: Fanout,
23    events_sent: Registered<EventsSent>,
24    log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
25    output_id: Arc<OutputId>,
26}
27
28pub struct TransformOutputs {
29    outputs_spec: Vec<config::TransformOutput>,
30    primary_output: Option<TransformOutput>,
31    named_outputs: HashMap<String, TransformOutput>,
32}
33
34impl TransformOutputs {
35    pub fn new(
36        outputs_in: Vec<config::TransformOutput>,
37        component_key: &ComponentKey,
38    ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
39        let outputs_spec = outputs_in.clone();
40        let mut primary_output = None;
41        let mut named_outputs = HashMap::new();
42        let mut controls = HashMap::new();
43
44        for output in outputs_in {
45            let (fanout, control) = Fanout::new();
46
47            let log_schema_definitions = output
48                .log_schema_definitions
49                .into_iter()
50                .map(|(id, definition)| (id, Arc::new(definition)))
51                .collect();
52
53            match output.port {
54                None => {
55                    primary_output = Some(TransformOutput {
56                        fanout,
57                        events_sent: register(EventsSent::from(internal_event::Output(Some(
58                            DEFAULT_OUTPUT.into(),
59                        )))),
60                        log_schema_definitions,
61                        output_id: Arc::new(OutputId {
62                            component: component_key.clone(),
63                            port: None,
64                        }),
65                    });
66                    controls.insert(None, control);
67                }
68                Some(name) => {
69                    named_outputs.insert(
70                        name.clone(),
71                        TransformOutput {
72                            fanout,
73                            events_sent: register(EventsSent::from(internal_event::Output(Some(
74                                name.clone().into(),
75                            )))),
76                            log_schema_definitions,
77                            output_id: Arc::new(OutputId {
78                                component: component_key.clone(),
79                                port: Some(name.clone()),
80                            }),
81                        },
82                    );
83                    controls.insert(Some(name.clone()), control);
84                }
85            }
86        }
87
88        let me = Self {
89            outputs_spec,
90            primary_output,
91            named_outputs,
92        };
93
94        (me, controls)
95    }
96
97    pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
98        TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
99    }
100
101    /// Sends the events in the buffer to their respective outputs.
102    ///
103    /// # Errors
104    ///
105    /// If an error occurs while sending events to their respective output, an error variant will be
106    /// returned detailing the cause.
107    pub async fn send(
108        &mut self,
109        buf: &mut TransformOutputsBuf,
110    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
111        if let Some(primary) = self.primary_output.as_mut() {
112            let Some(buf) = buf.primary_buffer.as_mut() else {
113                unreachable!("mismatched outputs");
114            };
115            Self::send_single_buffer(buf, primary).await?;
116        }
117        for (key, buf) in &mut buf.named_buffers {
118            let Some(output) = self.named_outputs.get_mut(key) else {
119                unreachable!("unknown output");
120            };
121            Self::send_single_buffer(buf, output).await?;
122        }
123        Ok(())
124    }
125
126    async fn send_single_buffer(
127        buf: &mut OutputBuffer,
128        output: &mut TransformOutput,
129    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
130        for event in buf.events_mut() {
131            super::update_runtime_schema_definition(
132                event,
133                &output.output_id,
134                &output.log_schema_definitions,
135            );
136        }
137        let count = buf.len();
138        let byte_size = buf.estimated_json_encoded_size_of();
139        buf.send(&mut output.fanout).await?;
140        output.events_sent.emit(CountByteSize(count, byte_size));
141        Ok(())
142    }
143}
144
145#[derive(Debug, Clone)]
146pub struct TransformOutputsBuf {
147    pub(super) primary_buffer: Option<OutputBuffer>,
148    pub(super) named_buffers: HashMap<String, OutputBuffer>,
149}
150
151impl TransformOutputsBuf {
152    pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
153        let mut primary_buffer = None;
154        let mut named_buffers = HashMap::new();
155
156        for output in outputs_in {
157            match output.port {
158                None => {
159                    primary_buffer = Some(OutputBuffer::with_capacity(capacity));
160                }
161                Some(name) => {
162                    named_buffers.insert(name.clone(), OutputBuffer::default());
163                }
164            }
165        }
166
167        Self {
168            primary_buffer,
169            named_buffers,
170        }
171    }
172
173    /// Adds a new event to the named output buffer.
174    ///
175    /// # Panics
176    ///
177    /// Panics if there is no output with the given name.
178    pub fn push(&mut self, name: Option<&str>, event: Event) {
179        match name {
180            Some(name) => self.named_buffers.get_mut(name),
181            None => self.primary_buffer.as_mut(),
182        }
183        .expect("unknown output")
184        .push(event);
185    }
186
187    /// Drains the default output buffer.
188    ///
189    /// # Panics
190    ///
191    /// Panics if there is no default output.
192    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
193        self.primary_buffer
194            .as_mut()
195            .expect("no default output")
196            .drain()
197    }
198
199    /// Drains the named output buffer.
200    ///
201    /// # Panics
202    ///
203    /// Panics if there is no output with the given name.
204    pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
205        self.named_buffers
206            .get_mut(name)
207            .expect("unknown output")
208            .drain()
209    }
210
211    /// Takes the default output buffer.
212    ///
213    /// # Panics
214    ///
215    /// Panics if there is no default output.
216    pub fn take_primary(&mut self) -> OutputBuffer {
217        std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
218    }
219
220    pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
221        std::mem::take(&mut self.named_buffers)
222    }
223
224    /// Applies `f` to each [`EventArray`] currently buffered in this outputs buffer.
225    ///
226    /// This is useful for cross-cutting instrumentation (e.g. latency timestamp propagation)
227    /// that needs mutable access to the buffered arrays before they are sent.
228    pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
229        if let Some(primary) = self.primary_buffer.as_mut() {
230            primary.for_each_array_mut(&mut f);
231        }
232        for buf in self.named_buffers.values_mut() {
233            buf.for_each_array_mut(&mut f);
234        }
235    }
236}
237
238impl ByteSizeOf for TransformOutputsBuf {
239    fn allocated_bytes(&self) -> usize {
240        self.primary_buffer.size_of()
241            + self
242                .named_buffers
243                .values()
244                .map(ByteSizeOf::size_of)
245                .sum::<usize>()
246    }
247}
248
249#[derive(Debug, Default, Clone)]
250pub struct OutputBuffer(pub(super) Vec<EventArray>);
251
252impl OutputBuffer {
253    pub fn with_capacity(capacity: usize) -> Self {
254        Self(Vec::with_capacity(capacity))
255    }
256
257    pub fn push(&mut self, event: Event) {
258        // Coalesce multiple pushes of the same type into one array.
259        match (event, self.0.last_mut()) {
260            (Event::Log(log), Some(EventArray::Logs(logs))) => {
261                logs.push(log);
262            }
263            (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
264                metrics.push(metric);
265            }
266            (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
267                traces.push(trace);
268            }
269            (event, _) => {
270                self.0.push(event.into());
271            }
272        }
273    }
274
275    pub fn append(&mut self, events: &mut Vec<Event>) {
276        for event in events.drain(..) {
277            self.push(event);
278        }
279    }
280
281    pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
282        for event in events {
283            self.push(event);
284        }
285    }
286
287    pub fn is_empty(&self) -> bool {
288        self.0.is_empty()
289    }
290
291    pub fn len(&self) -> usize {
292        self.0.iter().map(EventArray::len).sum()
293    }
294
295    pub fn capacity(&self) -> usize {
296        self.0.capacity()
297    }
298
299    pub fn first(&self) -> Option<EventRef<'_>> {
300        self.0.first().and_then(|first| match first {
301            EventArray::Logs(l) => l.first().map(Into::into),
302            EventArray::Metrics(m) => m.first().map(Into::into),
303            EventArray::Traces(t) => t.first().map(Into::into),
304        })
305    }
306
307    pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
308        self.0.drain(..).flat_map(EventArray::into_events)
309    }
310
311    /// Applies `f` to each [`EventArray`] currently held by this buffer.
312    pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
313        for array in &mut self.0 {
314            f(array);
315        }
316    }
317
318    async fn send(
319        &mut self,
320        output: &mut Fanout,
321    ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
322        let send_start = Some(Instant::now());
323        for array in std::mem::take(&mut self.0) {
324            output.send(array, send_start).await?;
325        }
326
327        Ok(())
328    }
329
330    fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
331        self.0.iter().flat_map(EventArray::iter_events)
332    }
333
334    fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
335        self.0.iter_mut().flat_map(EventArray::iter_events_mut)
336    }
337
338    pub fn into_events(self) -> impl Iterator<Item = Event> {
339        self.0.into_iter().flat_map(EventArray::into_events)
340    }
341}
342
343impl ByteSizeOf for OutputBuffer {
344    fn allocated_bytes(&self) -> usize {
345        self.0.iter().map(ByteSizeOf::size_of).sum()
346    }
347}
348
349impl EventDataEq<Vec<Event>> for OutputBuffer {
350    fn event_data_eq(&self, other: &Vec<Event>) -> bool {
351        struct Comparator<'a>(EventRef<'a>);
352
353        impl PartialEq<&Event> for Comparator<'_> {
354            fn eq(&self, that: &&Event) -> bool {
355                self.0.event_data_eq(that)
356            }
357        }
358
359        self.iter_events().map(Comparator).eq(other.iter())
360    }
361}
362
363impl EstimatedJsonEncodedSizeOf for OutputBuffer {
364    fn estimated_json_encoded_size_of(&self) -> JsonSize {
365        self.0
366            .iter()
367            .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
368            .sum()
369    }
370}
371
372impl From<Vec<Event>> for OutputBuffer {
373    fn from(events: Vec<Event>) -> Self {
374        let mut result = Self::default();
375        result.extend(events.into_iter());
376        result
377    }
378}