vector_core/event/
array.rs

1#![deny(missing_docs)]
2//! This module contains the definitions and wrapper types for handling
3//! arrays of type `Event`, in the various forms they may appear.
4
5use std::{iter, slice, vec};
6
7use futures::{Stream, stream};
8#[cfg(test)]
9use quickcheck::{Arbitrary, Gen};
10use vector_buffers::EventCount;
11use vector_common::{
12    byte_size_of::ByteSizeOf,
13    finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
14    json_size::JsonSize,
15};
16
17use super::{
18    EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMetadata, EventMutRef,
19    EventRef, LogEvent, Metric, TraceEvent,
20};
21
22/// The type alias for an array of `LogEvent` elements.
23pub type LogArray = Vec<LogEvent>;
24
25/// The type alias for an array of `TraceEvent` elements.
26pub type TraceArray = Vec<TraceEvent>;
27
28/// The type alias for an array of `Metric` elements.
29pub type MetricArray = Vec<Metric>;
30
31/// The core trait to abstract over any type that may work as an array
32/// of events. This is effectively the same as the standard
33/// `IntoIterator<Item = Event>` implementations, but that would
34/// conflict with the base implementation for the type aliases below.
35pub trait EventContainer: ByteSizeOf + EstimatedJsonEncodedSizeOf {
36    /// The type of `Iterator` used to turn this container into events.
37    type IntoIter: Iterator<Item = Event>;
38
39    /// The number of events in this container.
40    fn len(&self) -> usize;
41
42    /// Is this container empty?
43    fn is_empty(&self) -> bool {
44        self.len() == 0
45    }
46
47    /// Turn this container into an iterator over `Event`.
48    fn into_events(self) -> Self::IntoIter;
49}
50
51/// Turn a container into a futures stream over the contained `Event`
52/// type.  This would ideally be implemented as a default method on
53/// `trait EventContainer`, but the required feature (associated type
54/// defaults) is still unstable.
55/// See <https://github.com/rust-lang/rust/issues/29661>
56pub fn into_event_stream(container: impl EventContainer) -> impl Stream<Item = Event> + Unpin {
57    stream::iter(container.into_events())
58}
59
60impl EventContainer for Event {
61    type IntoIter = iter::Once<Event>;
62
63    fn len(&self) -> usize {
64        1
65    }
66
67    fn is_empty(&self) -> bool {
68        false
69    }
70
71    fn into_events(self) -> Self::IntoIter {
72        iter::once(self)
73    }
74}
75
76impl EventContainer for LogEvent {
77    type IntoIter = iter::Once<Event>;
78
79    fn len(&self) -> usize {
80        1
81    }
82
83    fn is_empty(&self) -> bool {
84        false
85    }
86
87    fn into_events(self) -> Self::IntoIter {
88        iter::once(self.into())
89    }
90}
91
92impl EventContainer for Metric {
93    type IntoIter = iter::Once<Event>;
94
95    fn len(&self) -> usize {
96        1
97    }
98
99    fn is_empty(&self) -> bool {
100        false
101    }
102
103    fn into_events(self) -> Self::IntoIter {
104        iter::once(self.into())
105    }
106}
107
108impl EventContainer for LogArray {
109    type IntoIter = iter::Map<vec::IntoIter<LogEvent>, fn(LogEvent) -> Event>;
110
111    fn len(&self) -> usize {
112        self.len()
113    }
114
115    fn into_events(self) -> Self::IntoIter {
116        self.into_iter().map(Into::into)
117    }
118}
119
120impl EventContainer for MetricArray {
121    type IntoIter = iter::Map<vec::IntoIter<Metric>, fn(Metric) -> Event>;
122
123    fn len(&self) -> usize {
124        self.len()
125    }
126
127    fn into_events(self) -> Self::IntoIter {
128        self.into_iter().map(Into::into)
129    }
130}
131
132/// An array of one of the `Event` variants exclusively.
133#[derive(Clone, Debug, PartialEq)]
134pub enum EventArray {
135    /// An array of type `LogEvent`
136    Logs(LogArray),
137    /// An array of type `Metric`
138    Metrics(MetricArray),
139    /// An array of type `TraceEvent`
140    Traces(TraceArray),
141}
142
143impl EventArray {
144    /// Iterate over references to this array's events.
145    pub fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
146        match self {
147            Self::Logs(array) => EventArrayIter::Logs(array.iter()),
148            Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
149            Self::Traces(array) => EventArrayIter::Traces(array.iter()),
150        }
151    }
152
153    /// Iterate over mutable references to this array's events.
154    pub fn iter_events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
155        match self {
156            Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()),
157            Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()),
158            Self::Traces(array) => EventArrayIterMut::Traces(array.iter_mut()),
159        }
160    }
161
162    /// Iterate over references to the logs in this array.
163    pub fn iter_logs_mut(&mut self) -> impl Iterator<Item = &mut LogEvent> {
164        match self {
165            Self::Logs(array) => TypedArrayIterMut(Some(array.iter_mut())),
166            _ => TypedArrayIterMut(None),
167        }
168    }
169
170    /// Applies a closure to each event's metadata in this array.
171    pub fn for_each_metadata_mut(&mut self, mut f: impl FnMut(&mut EventMetadata)) {
172        match self {
173            Self::Logs(logs) => {
174                for log in logs {
175                    f(log.metadata_mut());
176                }
177            }
178            Self::Metrics(metrics) => {
179                for metric in metrics {
180                    f(metric.metadata_mut());
181                }
182            }
183            Self::Traces(traces) => {
184                for trace in traces {
185                    f(trace.metadata_mut());
186                }
187            }
188        }
189    }
190}
191
192impl From<Event> for EventArray {
193    fn from(event: Event) -> Self {
194        match event {
195            Event::Log(log) => Self::Logs(vec![log]),
196            Event::Metric(metric) => Self::Metrics(vec![metric]),
197            Event::Trace(trace) => Self::Traces(vec![trace]),
198        }
199    }
200}
201
202impl From<LogEvent> for EventArray {
203    fn from(log: LogEvent) -> Self {
204        Event::from(log).into()
205    }
206}
207
208impl From<Metric> for EventArray {
209    fn from(metric: Metric) -> Self {
210        Event::from(metric).into()
211    }
212}
213
214impl From<TraceEvent> for EventArray {
215    fn from(trace: TraceEvent) -> Self {
216        Event::from(trace).into()
217    }
218}
219
220impl From<LogArray> for EventArray {
221    fn from(array: LogArray) -> Self {
222        Self::Logs(array)
223    }
224}
225
226impl From<MetricArray> for EventArray {
227    fn from(array: MetricArray) -> Self {
228        Self::Metrics(array)
229    }
230}
231
232impl AddBatchNotifier for EventArray {
233    fn add_batch_notifier(&mut self, batch: BatchNotifier) {
234        match self {
235            Self::Logs(array) => array
236                .iter_mut()
237                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
238            Self::Metrics(array) => array
239                .iter_mut()
240                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
241            Self::Traces(array) => array
242                .iter_mut()
243                .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
244        }
245    }
246}
247
248impl ByteSizeOf for EventArray {
249    fn allocated_bytes(&self) -> usize {
250        match self {
251            Self::Logs(a) => a.allocated_bytes(),
252            Self::Metrics(a) => a.allocated_bytes(),
253            Self::Traces(a) => a.allocated_bytes(),
254        }
255    }
256}
257
258impl EstimatedJsonEncodedSizeOf for EventArray {
259    fn estimated_json_encoded_size_of(&self) -> JsonSize {
260        match self {
261            Self::Logs(v) => v.estimated_json_encoded_size_of(),
262            Self::Traces(v) => v.estimated_json_encoded_size_of(),
263            Self::Metrics(v) => v.estimated_json_encoded_size_of(),
264        }
265    }
266}
267
268impl EventCount for EventArray {
269    fn event_count(&self) -> usize {
270        match self {
271            Self::Logs(a) => a.len(),
272            Self::Metrics(a) => a.len(),
273            Self::Traces(a) => a.len(),
274        }
275    }
276}
277
278impl EventContainer for EventArray {
279    type IntoIter = EventArrayIntoIter;
280
281    fn len(&self) -> usize {
282        match self {
283            Self::Logs(a) => a.len(),
284            Self::Metrics(a) => a.len(),
285            Self::Traces(a) => a.len(),
286        }
287    }
288
289    fn into_events(self) -> Self::IntoIter {
290        match self {
291            Self::Logs(a) => EventArrayIntoIter::Logs(a.into_iter()),
292            Self::Metrics(a) => EventArrayIntoIter::Metrics(a.into_iter()),
293            Self::Traces(a) => EventArrayIntoIter::Traces(a.into_iter()),
294        }
295    }
296}
297
298impl EventDataEq for EventArray {
299    fn event_data_eq(&self, other: &Self) -> bool {
300        match (self, other) {
301            (Self::Logs(a), Self::Logs(b)) => a.event_data_eq(b),
302            (Self::Metrics(a), Self::Metrics(b)) => a.event_data_eq(b),
303            (Self::Traces(a), Self::Traces(b)) => a.event_data_eq(b),
304            _ => false,
305        }
306    }
307}
308
309impl Finalizable for EventArray {
310    fn take_finalizers(&mut self) -> EventFinalizers {
311        match self {
312            Self::Logs(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
313            Self::Metrics(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
314            Self::Traces(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
315        }
316    }
317}
318
319#[cfg(test)]
320impl Arbitrary for EventArray {
321    fn arbitrary(g: &mut Gen) -> Self {
322        let len = u8::arbitrary(g) as usize;
323        let choice: u8 = u8::arbitrary(g);
324        // Quickcheck can't derive Arbitrary for enums, see
325        // https://github.com/BurntSushi/quickcheck/issues/98
326        if choice.is_multiple_of(2) {
327            let mut logs = Vec::new();
328            for _ in 0..len {
329                logs.push(LogEvent::arbitrary(g));
330            }
331            EventArray::Logs(logs)
332        } else {
333            let mut metrics = Vec::new();
334            for _ in 0..len {
335                metrics.push(Metric::arbitrary(g));
336            }
337            EventArray::Metrics(metrics)
338        }
339    }
340
341    fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
342        match self {
343            EventArray::Logs(logs) => Box::new(logs.shrink().map(EventArray::Logs)),
344            EventArray::Metrics(metrics) => Box::new(metrics.shrink().map(EventArray::Metrics)),
345            EventArray::Traces(traces) => Box::new(traces.shrink().map(EventArray::Traces)),
346        }
347    }
348}
349
350/// The iterator type for `EventArray::iter_events`.
351#[derive(Debug)]
352pub enum EventArrayIter<'a> {
353    /// An iterator over type `LogEvent`.
354    Logs(slice::Iter<'a, LogEvent>),
355    /// An iterator over type `Metric`.
356    Metrics(slice::Iter<'a, Metric>),
357    /// An iterator over type `Trace`.
358    Traces(slice::Iter<'a, TraceEvent>),
359}
360
361impl<'a> Iterator for EventArrayIter<'a> {
362    type Item = EventRef<'a>;
363
364    fn next(&mut self) -> Option<Self::Item> {
365        match self {
366            Self::Logs(i) => i.next().map(EventRef::from),
367            Self::Metrics(i) => i.next().map(EventRef::from),
368            Self::Traces(i) => i.next().map(EventRef::from),
369        }
370    }
371}
372
373/// The iterator type for `EventArray::iter_events_mut`.
374#[derive(Debug)]
375pub enum EventArrayIterMut<'a> {
376    /// An iterator over type `LogEvent`.
377    Logs(slice::IterMut<'a, LogEvent>),
378    /// An iterator over type `Metric`.
379    Metrics(slice::IterMut<'a, Metric>),
380    /// An iterator over type `Trace`.
381    Traces(slice::IterMut<'a, TraceEvent>),
382}
383
384impl<'a> Iterator for EventArrayIterMut<'a> {
385    type Item = EventMutRef<'a>;
386
387    fn next(&mut self) -> Option<Self::Item> {
388        match self {
389            Self::Logs(i) => i.next().map(EventMutRef::from),
390            Self::Metrics(i) => i.next().map(EventMutRef::from),
391            Self::Traces(i) => i.next().map(EventMutRef::from),
392        }
393    }
394}
395
396/// The iterator type for `EventArray::into_events`.
397#[derive(Debug)]
398pub enum EventArrayIntoIter {
399    /// An iterator over type `LogEvent`.
400    Logs(vec::IntoIter<LogEvent>),
401    /// An iterator over type `Metric`.
402    Metrics(vec::IntoIter<Metric>),
403    /// An iterator over type `TraceEvent`.
404    Traces(vec::IntoIter<TraceEvent>),
405}
406
407impl Iterator for EventArrayIntoIter {
408    type Item = Event;
409
410    fn next(&mut self) -> Option<Self::Item> {
411        match self {
412            Self::Logs(i) => i.next().map(Into::into),
413            Self::Metrics(i) => i.next().map(Into::into),
414            Self::Traces(i) => i.next().map(Event::Trace),
415        }
416    }
417}
418
419struct TypedArrayIterMut<'a, T>(Option<slice::IterMut<'a, T>>);
420
421impl<'a, T> Iterator for TypedArrayIterMut<'a, T> {
422    type Item = &'a mut T;
423    fn next(&mut self) -> Option<Self::Item> {
424        self.0.as_mut().and_then(Iterator::next)
425    }
426}
427
428/// Intermediate buffer for conversion of a sequence of individual
429/// `Event`s into a sequence of `EventArray`s by coalescing contiguous
430/// events of the same type into one array. This is used by
431/// `events_into_array`.
432#[derive(Debug, Default)]
433pub struct EventArrayBuffer {
434    buffer: Option<EventArray>,
435    max_size: usize,
436}
437
438impl EventArrayBuffer {
439    fn new(max_size: Option<usize>) -> Self {
440        let max_size = max_size.unwrap_or(usize::MAX);
441        let buffer = None;
442        Self { buffer, max_size }
443    }
444
445    #[must_use]
446    fn push(&mut self, event: Event) -> Option<EventArray> {
447        match (event, &mut self.buffer) {
448            (Event::Log(event), Some(EventArray::Logs(array))) if array.len() < self.max_size => {
449                array.push(event);
450                None
451            }
452            (Event::Metric(event), Some(EventArray::Metrics(array)))
453                if array.len() < self.max_size =>
454            {
455                array.push(event);
456                None
457            }
458            (Event::Trace(event), Some(EventArray::Traces(array)))
459                if array.len() < self.max_size =>
460            {
461                array.push(event);
462                None
463            }
464            (event, current) => current.replace(EventArray::from(event)),
465        }
466    }
467
468    fn take(&mut self) -> Option<EventArray> {
469        self.buffer.take()
470    }
471}
472
473/// Convert the iterator over individual `Event`s into an iterator
474/// over coalesced `EventArray`s.
475pub fn events_into_arrays(
476    events: impl IntoIterator<Item = Event>,
477    max_size: Option<usize>,
478) -> impl Iterator<Item = EventArray> {
479    IntoEventArraysIter {
480        inner: events.into_iter().fuse(),
481        current: EventArrayBuffer::new(max_size),
482    }
483}
484
485/// Iterator type implementing `into_arrays`
486pub struct IntoEventArraysIter<I> {
487    inner: iter::Fuse<I>,
488    current: EventArrayBuffer,
489}
490
491impl<I: Iterator<Item = Event>> Iterator for IntoEventArraysIter<I> {
492    type Item = EventArray;
493    fn next(&mut self) -> Option<Self::Item> {
494        for event in self.inner.by_ref() {
495            if let Some(array) = self.current.push(event) {
496                return Some(array);
497            }
498        }
499        self.current.take()
500    }
501}