vector_core/event/
array.rs

1#![deny(missing_docs)]
2//! This module contains the definitions and wrapper types for handling
3//! arrays of type `Event`, in the various forms they may appear.
4
5use std::{iter, slice, sync::Arc, vec};
6
7use futures::{stream, Stream};
8#[cfg(test)]
9use quickcheck::{Arbitrary, Gen};
10use vector_buffers::EventCount;
11use vector_common::{
12    byte_size_of::ByteSizeOf,
13    config::ComponentKey,
14    finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
15    json_size::JsonSize,
16};
17
18use super::{
19    EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
20    LogEvent, Metric, TraceEvent,
21};
22
23/// The type alias for an array of `LogEvent` elements.
24pub type LogArray = Vec<LogEvent>;
25
26/// The type alias for an array of `TraceEvent` elements.
27pub type TraceArray = Vec<TraceEvent>;
28
29/// The type alias for an array of `Metric` elements.
30pub type MetricArray = Vec<Metric>;
31
32/// The core trait to abstract over any type that may work as an array
33/// of events. This is effectively the same as the standard
34/// `IntoIterator<Item = Event>` implementations, but that would
35/// conflict with the base implementation for the type aliases below.
36pub trait EventContainer: ByteSizeOf + EstimatedJsonEncodedSizeOf {
37    /// The type of `Iterator` used to turn this container into events.
38    type IntoIter: Iterator<Item = Event>;
39
40    /// The number of events in this container.
41    fn len(&self) -> usize;
42
43    /// Is this container empty?
44    fn is_empty(&self) -> bool {
45        self.len() == 0
46    }
47
48    /// Turn this container into an iterator over `Event`.
49    fn into_events(self) -> Self::IntoIter;
50}
51
52/// Turn a container into a futures stream over the contained `Event`
53/// type.  This would ideally be implemented as a default method on
54/// `trait EventContainer`, but the required feature (associated type
55/// defaults) is still unstable.
56/// See <https://github.com/rust-lang/rust/issues/29661>
57pub fn into_event_stream(container: impl EventContainer) -> impl Stream<Item = Event> + Unpin {
58    stream::iter(container.into_events())
59}
60
61impl EventContainer for Event {
62    type IntoIter = iter::Once<Event>;
63
64    fn len(&self) -> usize {
65        1
66    }
67
68    fn is_empty(&self) -> bool {
69        false
70    }
71
72    fn into_events(self) -> Self::IntoIter {
73        iter::once(self)
74    }
75}
76
77impl EventContainer for LogEvent {
78    type IntoIter = iter::Once<Event>;
79
80    fn len(&self) -> usize {
81        1
82    }
83
84    fn is_empty(&self) -> bool {
85        false
86    }
87
88    fn into_events(self) -> Self::IntoIter {
89        iter::once(self.into())
90    }
91}
92
93impl EventContainer for Metric {
94    type IntoIter = iter::Once<Event>;
95
96    fn len(&self) -> usize {
97        1
98    }
99
100    fn is_empty(&self) -> bool {
101        false
102    }
103
104    fn into_events(self) -> Self::IntoIter {
105        iter::once(self.into())
106    }
107}
108
109impl EventContainer for LogArray {
110    type IntoIter = iter::Map<vec::IntoIter<LogEvent>, fn(LogEvent) -> Event>;
111
112    fn len(&self) -> usize {
113        self.len()
114    }
115
116    fn into_events(self) -> Self::IntoIter {
117        self.into_iter().map(Into::into)
118    }
119}
120
121impl EventContainer for MetricArray {
122    type IntoIter = iter::Map<vec::IntoIter<Metric>, fn(Metric) -> Event>;
123
124    fn len(&self) -> usize {
125        self.len()
126    }
127
128    fn into_events(self) -> Self::IntoIter {
129        self.into_iter().map(Into::into)
130    }
131}
132
133/// An array of one of the `Event` variants exclusively.
134#[derive(Clone, Debug, PartialEq)]
135pub enum EventArray {
136    /// An array of type `LogEvent`
137    Logs(LogArray),
138    /// An array of type `Metric`
139    Metrics(MetricArray),
140    /// An array of type `TraceEvent`
141    Traces(TraceArray),
142}
143
144impl EventArray {
145    /// Sets the `OutputId` in the metadata for all the events in this array.
146    pub fn set_output_id(&mut self, output_id: &Arc<ComponentKey>) {
147        match self {
148            EventArray::Logs(logs) => {
149                for log in logs {
150                    log.metadata_mut().set_source_id(Arc::clone(output_id));
151                }
152            }
153            EventArray::Metrics(metrics) => {
154                for metric in metrics {
155                    metric.metadata_mut().set_source_id(Arc::clone(output_id));
156                }
157            }
158            EventArray::Traces(traces) => {
159                for trace in traces {
160                    trace.metadata_mut().set_source_id(Arc::clone(output_id));
161                }
162            }
163        }
164    }
165
166    /// Sets the `source_type` in the metadata for all metric events in this array.
167    pub fn set_source_type(&mut self, source_type: &'static str) {
168        if let EventArray::Metrics(metrics) = self {
169            for metric in metrics {
170                metric.metadata_mut().set_source_type(source_type);
171            }
172        }
173    }
174
175    /// Iterate over references to this array's events.
176    pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
177        match self {
178            Self::Logs(array) => EventArrayIter::Logs(array.iter()),
179            Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
180            Self::Traces(array) => EventArrayIter::Traces(array.iter()),
181        }
182    }
183
184    /// Iterate over mutable references to this array's events.
185    pub fn iter_events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
186        match self {
187            Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()),
188            Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()),
189            Self::Traces(array) => EventArrayIterMut::Traces(array.iter_mut()),
190        }
191    }
192
193    /// Iterate over references to the logs in this array.
194    pub fn iter_logs_mut(&mut self) -> impl Iterator<Item = &mut LogEvent> {
195        match self {
196            Self::Logs(array) => TypedArrayIterMut(Some(array.iter_mut())),
197            _ => TypedArrayIterMut(None),
198        }
199    }
200}
201
202impl From<Event> for EventArray {
203    fn from(event: Event) -> Self {
204        match event {
205            Event::Log(log) => Self::Logs(vec![log]),
206            Event::Metric(metric) => Self::Metrics(vec![metric]),
207            Event::Trace(trace) => Self::Traces(vec![trace]),
208        }
209    }
210}
211
212impl From<LogEvent> for EventArray {
213    fn from(log: LogEvent) -> Self {
214        Event::from(log).into()
215    }
216}
217
218impl From<Metric> for EventArray {
219    fn from(metric: Metric) -> Self {
220        Event::from(metric).into()
221    }
222}
223
224impl From<TraceEvent> for EventArray {
225    fn from(trace: TraceEvent) -> Self {
226        Event::from(trace).into()
227    }
228}
229
230impl From<LogArray> for EventArray {
231    fn from(array: LogArray) -> Self {
232        Self::Logs(array)
233    }
234}
235
236impl From<MetricArray> for EventArray {
237    fn from(array: MetricArray) -> Self {
238        Self::Metrics(array)
239    }
240}
241
242impl AddBatchNotifier for EventArray {
243    fn add_batch_notifier(&mut self, batch: BatchNotifier) {
244        match self {
245            Self::Logs(array) => array
246                .iter_mut()
247                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
248            Self::Metrics(array) => array
249                .iter_mut()
250                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
251            Self::Traces(array) => array
252                .iter_mut()
253                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
254        }
255    }
256}
257
258impl ByteSizeOf for EventArray {
259    fn allocated_bytes(&self) -> usize {
260        match self {
261            Self::Logs(a) => a.allocated_bytes(),
262            Self::Metrics(a) => a.allocated_bytes(),
263            Self::Traces(a) => a.allocated_bytes(),
264        }
265    }
266}
267
268impl EstimatedJsonEncodedSizeOf for EventArray {
269    fn estimated_json_encoded_size_of(&self) -> JsonSize {
270        match self {
271            Self::Logs(v) => v.estimated_json_encoded_size_of(),
272            Self::Traces(v) => v.estimated_json_encoded_size_of(),
273            Self::Metrics(v) => v.estimated_json_encoded_size_of(),
274        }
275    }
276}
277
278impl EventCount for EventArray {
279    fn event_count(&self) -> usize {
280        match self {
281            Self::Logs(a) => a.len(),
282            Self::Metrics(a) => a.len(),
283            Self::Traces(a) => a.len(),
284        }
285    }
286}
287
288impl EventContainer for EventArray {
289    type IntoIter = EventArrayIntoIter;
290
291    fn len(&self) -> usize {
292        match self {
293            Self::Logs(a) => a.len(),
294            Self::Metrics(a) => a.len(),
295            Self::Traces(a) => a.len(),
296        }
297    }
298
299    fn into_events(self) -> Self::IntoIter {
300        match self {
301            Self::Logs(a) => EventArrayIntoIter::Logs(a.into_iter()),
302            Self::Metrics(a) => EventArrayIntoIter::Metrics(a.into_iter()),
303            Self::Traces(a) => EventArrayIntoIter::Traces(a.into_iter()),
304        }
305    }
306}
307
308impl EventDataEq for EventArray {
309    fn event_data_eq(&self, other: &Self) -> bool {
310        match (self, other) {
311            (Self::Logs(a), Self::Logs(b)) => a.event_data_eq(b),
312            (Self::Metrics(a), Self::Metrics(b)) => a.event_data_eq(b),
313            (Self::Traces(a), Self::Traces(b)) => a.event_data_eq(b),
314            _ => false,
315        }
316    }
317}
318
319impl Finalizable for EventArray {
320    fn take_finalizers(&mut self) -> EventFinalizers {
321        match self {
322            Self::Logs(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
323            Self::Metrics(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
324            Self::Traces(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
325        }
326    }
327}
328
329#[cfg(test)]
330impl Arbitrary for EventArray {
331    fn arbitrary(g: &mut Gen) -> Self {
332        let len = u8::arbitrary(g) as usize;
333        let choice: u8 = u8::arbitrary(g);
334        // Quickcheck can't derive Arbitrary for enums, see
335        // https://github.com/BurntSushi/quickcheck/issues/98
336        if choice % 2 == 0 {
337            let mut logs = Vec::new();
338            for _ in 0..len {
339                logs.push(LogEvent::arbitrary(g));
340            }
341            EventArray::Logs(logs)
342        } else {
343            let mut metrics = Vec::new();
344            for _ in 0..len {
345                metrics.push(Metric::arbitrary(g));
346            }
347            EventArray::Metrics(metrics)
348        }
349    }
350
351    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
352        match self {
353            EventArray::Logs(logs) => Box::new(logs.shrink().map(EventArray::Logs)),
354            EventArray::Metrics(metrics) => Box::new(metrics.shrink().map(EventArray::Metrics)),
355            EventArray::Traces(traces) => Box::new(traces.shrink().map(EventArray::Traces)),
356        }
357    }
358}
359
360/// The iterator type for `EventArray::iter_events`.
361#[derive(Debug)]
362pub enum EventArrayIter<'a> {
363    /// An iterator over type `LogEvent`.
364    Logs(slice::Iter<'a, LogEvent>),
365    /// An iterator over type `Metric`.
366    Metrics(slice::Iter<'a, Metric>),
367    /// An iterator over type `Trace`.
368    Traces(slice::Iter<'a, TraceEvent>),
369}
370
371impl<'a> Iterator for EventArrayIter<'a> {
372    type Item = EventRef<'a>;
373
374    fn next(&mut self) -> Option<Self::Item> {
375        match self {
376            Self::Logs(i) => i.next().map(EventRef::from),
377            Self::Metrics(i) => i.next().map(EventRef::from),
378            Self::Traces(i) => i.next().map(EventRef::from),
379        }
380    }
381}
382
383/// The iterator type for `EventArray::iter_events_mut`.
384#[derive(Debug)]
385pub enum EventArrayIterMut<'a> {
386    /// An iterator over type `LogEvent`.
387    Logs(slice::IterMut<'a, LogEvent>),
388    /// An iterator over type `Metric`.
389    Metrics(slice::IterMut<'a, Metric>),
390    /// An iterator over type `Trace`.
391    Traces(slice::IterMut<'a, TraceEvent>),
392}
393
394impl<'a> Iterator for EventArrayIterMut<'a> {
395    type Item = EventMutRef<'a>;
396
397    fn next(&mut self) -> Option<Self::Item> {
398        match self {
399            Self::Logs(i) => i.next().map(EventMutRef::from),
400            Self::Metrics(i) => i.next().map(EventMutRef::from),
401            Self::Traces(i) => i.next().map(EventMutRef::from),
402        }
403    }
404}
405
406/// The iterator type for `EventArray::into_events`.
407#[derive(Debug)]
408pub enum EventArrayIntoIter {
409    /// An iterator over type `LogEvent`.
410    Logs(vec::IntoIter<LogEvent>),
411    /// An iterator over type `Metric`.
412    Metrics(vec::IntoIter<Metric>),
413    /// An iterator over type `TraceEvent`.
414    Traces(vec::IntoIter<TraceEvent>),
415}
416
417impl Iterator for EventArrayIntoIter {
418    type Item = Event;
419
420    fn next(&mut self) -> Option<Self::Item> {
421        match self {
422            Self::Logs(i) => i.next().map(Into::into),
423            Self::Metrics(i) => i.next().map(Into::into),
424            Self::Traces(i) => i.next().map(Event::Trace),
425        }
426    }
427}
428
429struct TypedArrayIterMut<'a, T>(Option<slice::IterMut<'a, T>>);
430
431impl<'a, T> Iterator for TypedArrayIterMut<'a, T> {
432    type Item = &'a mut T;
433    fn next(&mut self) -> Option<Self::Item> {
434        self.0.as_mut().and_then(Iterator::next)
435    }
436}
437
438/// Intermediate buffer for conversion of a sequence of individual
439/// `Event`s into a sequence of `EventArray`s by coalescing contiguous
440/// events of the same type into one array. This is used by
441/// `events_into_array`.
442#[derive(Debug, Default)]
443pub struct EventArrayBuffer {
444    buffer: Option<EventArray>,
445    max_size: usize,
446}
447
448impl EventArrayBuffer {
449    fn new(max_size: Option<usize>) -> Self {
450        let max_size = max_size.unwrap_or(usize::MAX);
451        let buffer = None;
452        Self { buffer, max_size }
453    }
454
455    #[must_use]
456    fn push(&mut self, event: Event) -> Option<EventArray> {
457        match (event, &mut self.buffer) {
458            (Event::Log(event), Some(EventArray::Logs(array))) if array.len() < self.max_size => {
459                array.push(event);
460                None
461            }
462            (Event::Metric(event), Some(EventArray::Metrics(array)))
463                if array.len() < self.max_size =>
464            {
465                array.push(event);
466                None
467            }
468            (Event::Trace(event), Some(EventArray::Traces(array)))
469                if array.len() < self.max_size =>
470            {
471                array.push(event);
472                None
473            }
474            (event, current) => current.replace(EventArray::from(event)),
475        }
476    }
477
478    fn take(&mut self) -> Option<EventArray> {
479        self.buffer.take()
480    }
481}
482
483/// Convert the iterator over individual `Event`s into an iterator
484/// over coalesced `EventArray`s.
485pub fn events_into_arrays(
486    events: impl IntoIterator<Item = Event>,
487    max_size: Option<usize>,
488) -> impl Iterator<Item = EventArray> {
489    IntoEventArraysIter {
490        inner: events.into_iter().fuse(),
491        current: EventArrayBuffer::new(max_size),
492    }
493}
494
495/// Iterator type implementing `into_arrays`
496pub struct IntoEventArraysIter<I> {
497    inner: iter::Fuse<I>,
498    current: EventArrayBuffer,
499}
500
501impl<I: Iterator<Item = Event>> Iterator for IntoEventArraysIter<I> {
502    type Item = EventArray;
503    fn next(&mut self) -> Option<Self::Item> {
504        for event in self.inner.by_ref() {
505            if let Some(array) = self.current.push(event) {
506                return Some(array);
507            }
508        }
509        self.current.take()
510    }
511}