1#![deny(missing_docs)]
2use std::{iter, slice, vec};
6
7use futures::{Stream, stream};
8#[cfg(test)]
9use quickcheck::{Arbitrary, Gen};
10use vector_buffers::EventCount;
11use vector_common::{
12 byte_size_of::ByteSizeOf,
13 finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
14 json_size::JsonSize,
15};
16
17use super::{
18 EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMetadata, EventMutRef,
19 EventRef, LogEvent, Metric, TraceEvent,
20};
21
22pub type LogArray = Vec<LogEvent>;
24
25pub type TraceArray = Vec<TraceEvent>;
27
28pub type MetricArray = Vec<Metric>;
30
31pub trait EventContainer: ByteSizeOf + EstimatedJsonEncodedSizeOf {
36 type IntoIter: Iterator<Item = Event>;
38
39 fn len(&self) -> usize;
41
42 fn is_empty(&self) -> bool {
44 self.len() == 0
45 }
46
47 fn into_events(self) -> Self::IntoIter;
49}
50
51pub fn into_event_stream(container: impl EventContainer) -> impl Stream<Item = Event> + Unpin {
57 stream::iter(container.into_events())
58}
59
60impl EventContainer for Event {
61 type IntoIter = iter::Once<Event>;
62
63 fn len(&self) -> usize {
64 1
65 }
66
67 fn is_empty(&self) -> bool {
68 false
69 }
70
71 fn into_events(self) -> Self::IntoIter {
72 iter::once(self)
73 }
74}
75
76impl EventContainer for LogEvent {
77 type IntoIter = iter::Once<Event>;
78
79 fn len(&self) -> usize {
80 1
81 }
82
83 fn is_empty(&self) -> bool {
84 false
85 }
86
87 fn into_events(self) -> Self::IntoIter {
88 iter::once(self.into())
89 }
90}
91
92impl EventContainer for Metric {
93 type IntoIter = iter::Once<Event>;
94
95 fn len(&self) -> usize {
96 1
97 }
98
99 fn is_empty(&self) -> bool {
100 false
101 }
102
103 fn into_events(self) -> Self::IntoIter {
104 iter::once(self.into())
105 }
106}
107
108impl EventContainer for LogArray {
109 type IntoIter = iter::Map<vec::IntoIter<LogEvent>, fn(LogEvent) -> Event>;
110
111 fn len(&self) -> usize {
112 self.len()
113 }
114
115 fn into_events(self) -> Self::IntoIter {
116 self.into_iter().map(Into::into)
117 }
118}
119
120impl EventContainer for MetricArray {
121 type IntoIter = iter::Map<vec::IntoIter<Metric>, fn(Metric) -> Event>;
122
123 fn len(&self) -> usize {
124 self.len()
125 }
126
127 fn into_events(self) -> Self::IntoIter {
128 self.into_iter().map(Into::into)
129 }
130}
131
132#[derive(Clone, Debug, PartialEq)]
134pub enum EventArray {
135 Logs(LogArray),
137 Metrics(MetricArray),
139 Traces(TraceArray),
141}
142
143impl EventArray {
144 pub fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
146 match self {
147 Self::Logs(array) => EventArrayIter::Logs(array.iter()),
148 Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
149 Self::Traces(array) => EventArrayIter::Traces(array.iter()),
150 }
151 }
152
153 pub fn iter_events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
155 match self {
156 Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()),
157 Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()),
158 Self::Traces(array) => EventArrayIterMut::Traces(array.iter_mut()),
159 }
160 }
161
162 pub fn iter_logs_mut(&mut self) -> impl Iterator<Item = &mut LogEvent> {
164 match self {
165 Self::Logs(array) => TypedArrayIterMut(Some(array.iter_mut())),
166 _ => TypedArrayIterMut(None),
167 }
168 }
169
170 pub fn for_each_metadata_mut(&mut self, mut f: impl FnMut(&mut EventMetadata)) {
172 match self {
173 Self::Logs(logs) => {
174 for log in logs {
175 f(log.metadata_mut());
176 }
177 }
178 Self::Metrics(metrics) => {
179 for metric in metrics {
180 f(metric.metadata_mut());
181 }
182 }
183 Self::Traces(traces) => {
184 for trace in traces {
185 f(trace.metadata_mut());
186 }
187 }
188 }
189 }
190}
191
192impl From<Event> for EventArray {
193 fn from(event: Event) -> Self {
194 match event {
195 Event::Log(log) => Self::Logs(vec![log]),
196 Event::Metric(metric) => Self::Metrics(vec![metric]),
197 Event::Trace(trace) => Self::Traces(vec![trace]),
198 }
199 }
200}
201
202impl From<LogEvent> for EventArray {
203 fn from(log: LogEvent) -> Self {
204 Event::from(log).into()
205 }
206}
207
208impl From<Metric> for EventArray {
209 fn from(metric: Metric) -> Self {
210 Event::from(metric).into()
211 }
212}
213
214impl From<TraceEvent> for EventArray {
215 fn from(trace: TraceEvent) -> Self {
216 Event::from(trace).into()
217 }
218}
219
220impl From<LogArray> for EventArray {
221 fn from(array: LogArray) -> Self {
222 Self::Logs(array)
223 }
224}
225
226impl From<MetricArray> for EventArray {
227 fn from(array: MetricArray) -> Self {
228 Self::Metrics(array)
229 }
230}
231
232impl AddBatchNotifier for EventArray {
233 fn add_batch_notifier(&mut self, batch: BatchNotifier) {
234 match self {
235 Self::Logs(array) => array
236 .iter_mut()
237 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
238 Self::Metrics(array) => array
239 .iter_mut()
240 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
241 Self::Traces(array) => array
242 .iter_mut()
243 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
244 }
245 }
246}
247
248impl ByteSizeOf for EventArray {
249 fn allocated_bytes(&self) -> usize {
250 match self {
251 Self::Logs(a) => a.allocated_bytes(),
252 Self::Metrics(a) => a.allocated_bytes(),
253 Self::Traces(a) => a.allocated_bytes(),
254 }
255 }
256}
257
258impl EstimatedJsonEncodedSizeOf for EventArray {
259 fn estimated_json_encoded_size_of(&self) -> JsonSize {
260 match self {
261 Self::Logs(v) => v.estimated_json_encoded_size_of(),
262 Self::Traces(v) => v.estimated_json_encoded_size_of(),
263 Self::Metrics(v) => v.estimated_json_encoded_size_of(),
264 }
265 }
266}
267
268impl EventCount for EventArray {
269 fn event_count(&self) -> usize {
270 match self {
271 Self::Logs(a) => a.len(),
272 Self::Metrics(a) => a.len(),
273 Self::Traces(a) => a.len(),
274 }
275 }
276}
277
278impl EventContainer for EventArray {
279 type IntoIter = EventArrayIntoIter;
280
281 fn len(&self) -> usize {
282 match self {
283 Self::Logs(a) => a.len(),
284 Self::Metrics(a) => a.len(),
285 Self::Traces(a) => a.len(),
286 }
287 }
288
289 fn into_events(self) -> Self::IntoIter {
290 match self {
291 Self::Logs(a) => EventArrayIntoIter::Logs(a.into_iter()),
292 Self::Metrics(a) => EventArrayIntoIter::Metrics(a.into_iter()),
293 Self::Traces(a) => EventArrayIntoIter::Traces(a.into_iter()),
294 }
295 }
296}
297
298impl EventDataEq for EventArray {
299 fn event_data_eq(&self, other: &Self) -> bool {
300 match (self, other) {
301 (Self::Logs(a), Self::Logs(b)) => a.event_data_eq(b),
302 (Self::Metrics(a), Self::Metrics(b)) => a.event_data_eq(b),
303 (Self::Traces(a), Self::Traces(b)) => a.event_data_eq(b),
304 _ => false,
305 }
306 }
307}
308
309impl Finalizable for EventArray {
310 fn take_finalizers(&mut self) -> EventFinalizers {
311 match self {
312 Self::Logs(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
313 Self::Metrics(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
314 Self::Traces(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
315 }
316 }
317}
318
319#[cfg(test)]
320impl Arbitrary for EventArray {
321 fn arbitrary(g: &mut Gen) -> Self {
322 let len = u8::arbitrary(g) as usize;
323 let choice: u8 = u8::arbitrary(g);
324 if choice.is_multiple_of(2) {
327 let mut logs = Vec::new();
328 for _ in 0..len {
329 logs.push(LogEvent::arbitrary(g));
330 }
331 EventArray::Logs(logs)
332 } else {
333 let mut metrics = Vec::new();
334 for _ in 0..len {
335 metrics.push(Metric::arbitrary(g));
336 }
337 EventArray::Metrics(metrics)
338 }
339 }
340
341 fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
342 match self {
343 EventArray::Logs(logs) => Box::new(logs.shrink().map(EventArray::Logs)),
344 EventArray::Metrics(metrics) => Box::new(metrics.shrink().map(EventArray::Metrics)),
345 EventArray::Traces(traces) => Box::new(traces.shrink().map(EventArray::Traces)),
346 }
347 }
348}
349
350#[derive(Debug)]
352pub enum EventArrayIter<'a> {
353 Logs(slice::Iter<'a, LogEvent>),
355 Metrics(slice::Iter<'a, Metric>),
357 Traces(slice::Iter<'a, TraceEvent>),
359}
360
361impl<'a> Iterator for EventArrayIter<'a> {
362 type Item = EventRef<'a>;
363
364 fn next(&mut self) -> Option<Self::Item> {
365 match self {
366 Self::Logs(i) => i.next().map(EventRef::from),
367 Self::Metrics(i) => i.next().map(EventRef::from),
368 Self::Traces(i) => i.next().map(EventRef::from),
369 }
370 }
371}
372
373#[derive(Debug)]
375pub enum EventArrayIterMut<'a> {
376 Logs(slice::IterMut<'a, LogEvent>),
378 Metrics(slice::IterMut<'a, Metric>),
380 Traces(slice::IterMut<'a, TraceEvent>),
382}
383
384impl<'a> Iterator for EventArrayIterMut<'a> {
385 type Item = EventMutRef<'a>;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 match self {
389 Self::Logs(i) => i.next().map(EventMutRef::from),
390 Self::Metrics(i) => i.next().map(EventMutRef::from),
391 Self::Traces(i) => i.next().map(EventMutRef::from),
392 }
393 }
394}
395
396#[derive(Debug)]
398pub enum EventArrayIntoIter {
399 Logs(vec::IntoIter<LogEvent>),
401 Metrics(vec::IntoIter<Metric>),
403 Traces(vec::IntoIter<TraceEvent>),
405}
406
407impl Iterator for EventArrayIntoIter {
408 type Item = Event;
409
410 fn next(&mut self) -> Option<Self::Item> {
411 match self {
412 Self::Logs(i) => i.next().map(Into::into),
413 Self::Metrics(i) => i.next().map(Into::into),
414 Self::Traces(i) => i.next().map(Event::Trace),
415 }
416 }
417}
418
419struct TypedArrayIterMut<'a, T>(Option<slice::IterMut<'a, T>>);
420
421impl<'a, T> Iterator for TypedArrayIterMut<'a, T> {
422 type Item = &'a mut T;
423 fn next(&mut self) -> Option<Self::Item> {
424 self.0.as_mut().and_then(Iterator::next)
425 }
426}
427
428#[derive(Debug, Default)]
433pub struct EventArrayBuffer {
434 buffer: Option<EventArray>,
435 max_size: usize,
436}
437
438impl EventArrayBuffer {
439 fn new(max_size: Option<usize>) -> Self {
440 let max_size = max_size.unwrap_or(usize::MAX);
441 let buffer = None;
442 Self { buffer, max_size }
443 }
444
445 #[must_use]
446 fn push(&mut self, event: Event) -> Option<EventArray> {
447 match (event, &mut self.buffer) {
448 (Event::Log(event), Some(EventArray::Logs(array))) if array.len() < self.max_size => {
449 array.push(event);
450 None
451 }
452 (Event::Metric(event), Some(EventArray::Metrics(array)))
453 if array.len() < self.max_size =>
454 {
455 array.push(event);
456 None
457 }
458 (Event::Trace(event), Some(EventArray::Traces(array)))
459 if array.len() < self.max_size =>
460 {
461 array.push(event);
462 None
463 }
464 (event, current) => current.replace(EventArray::from(event)),
465 }
466 }
467
468 fn take(&mut self) -> Option<EventArray> {
469 self.buffer.take()
470 }
471}
472
473pub fn events_into_arrays(
476 events: impl IntoIterator<Item = Event>,
477 max_size: Option<usize>,
478) -> impl Iterator<Item = EventArray> {
479 IntoEventArraysIter {
480 inner: events.into_iter().fuse(),
481 current: EventArrayBuffer::new(max_size),
482 }
483}
484
485pub struct IntoEventArraysIter<I> {
487 inner: iter::Fuse<I>,
488 current: EventArrayBuffer,
489}
490
491impl<I: Iterator<Item = Event>> Iterator for IntoEventArraysIter<I> {
492 type Item = EventArray;
493 fn next(&mut self) -> Option<Self::Item> {
494 for event in self.inner.by_ref() {
495 if let Some(array) = self.current.push(event) {
496 return Some(array);
497 }
498 }
499 self.current.take()
500 }
501}