1#![deny(missing_docs)]
2use std::{iter, slice, sync::Arc, vec};
6
7use futures::{stream, Stream};
8#[cfg(test)]
9use quickcheck::{Arbitrary, Gen};
10use vector_buffers::EventCount;
11use vector_common::{
12 byte_size_of::ByteSizeOf,
13 config::ComponentKey,
14 finalization::{AddBatchNotifier, BatchNotifier, EventFinalizers, Finalizable},
15 json_size::JsonSize,
16};
17
18use super::{
19 EstimatedJsonEncodedSizeOf, Event, EventDataEq, EventFinalizer, EventMutRef, EventRef,
20 LogEvent, Metric, TraceEvent,
21};
22
23pub type LogArray = Vec<LogEvent>;
25
26pub type TraceArray = Vec<TraceEvent>;
28
29pub type MetricArray = Vec<Metric>;
31
32pub trait EventContainer: ByteSizeOf + EstimatedJsonEncodedSizeOf {
37 type IntoIter: Iterator<Item = Event>;
39
40 fn len(&self) -> usize;
42
43 fn is_empty(&self) -> bool {
45 self.len() == 0
46 }
47
48 fn into_events(self) -> Self::IntoIter;
50}
51
52pub fn into_event_stream(container: impl EventContainer) -> impl Stream<Item = Event> + Unpin {
58 stream::iter(container.into_events())
59}
60
61impl EventContainer for Event {
62 type IntoIter = iter::Once<Event>;
63
64 fn len(&self) -> usize {
65 1
66 }
67
68 fn is_empty(&self) -> bool {
69 false
70 }
71
72 fn into_events(self) -> Self::IntoIter {
73 iter::once(self)
74 }
75}
76
77impl EventContainer for LogEvent {
78 type IntoIter = iter::Once<Event>;
79
80 fn len(&self) -> usize {
81 1
82 }
83
84 fn is_empty(&self) -> bool {
85 false
86 }
87
88 fn into_events(self) -> Self::IntoIter {
89 iter::once(self.into())
90 }
91}
92
93impl EventContainer for Metric {
94 type IntoIter = iter::Once<Event>;
95
96 fn len(&self) -> usize {
97 1
98 }
99
100 fn is_empty(&self) -> bool {
101 false
102 }
103
104 fn into_events(self) -> Self::IntoIter {
105 iter::once(self.into())
106 }
107}
108
109impl EventContainer for LogArray {
110 type IntoIter = iter::Map<vec::IntoIter<LogEvent>, fn(LogEvent) -> Event>;
111
112 fn len(&self) -> usize {
113 self.len()
114 }
115
116 fn into_events(self) -> Self::IntoIter {
117 self.into_iter().map(Into::into)
118 }
119}
120
121impl EventContainer for MetricArray {
122 type IntoIter = iter::Map<vec::IntoIter<Metric>, fn(Metric) -> Event>;
123
124 fn len(&self) -> usize {
125 self.len()
126 }
127
128 fn into_events(self) -> Self::IntoIter {
129 self.into_iter().map(Into::into)
130 }
131}
132
133#[derive(Clone, Debug, PartialEq)]
135pub enum EventArray {
136 Logs(LogArray),
138 Metrics(MetricArray),
140 Traces(TraceArray),
142}
143
144impl EventArray {
145 pub fn set_output_id(&mut self, output_id: &Arc<ComponentKey>) {
147 match self {
148 EventArray::Logs(logs) => {
149 for log in logs {
150 log.metadata_mut().set_source_id(Arc::clone(output_id));
151 }
152 }
153 EventArray::Metrics(metrics) => {
154 for metric in metrics {
155 metric.metadata_mut().set_source_id(Arc::clone(output_id));
156 }
157 }
158 EventArray::Traces(traces) => {
159 for trace in traces {
160 trace.metadata_mut().set_source_id(Arc::clone(output_id));
161 }
162 }
163 }
164 }
165
166 pub fn set_source_type(&mut self, source_type: &'static str) {
168 if let EventArray::Metrics(metrics) = self {
169 for metric in metrics {
170 metric.metadata_mut().set_source_type(source_type);
171 }
172 }
173 }
174
175 pub fn iter_events(&self) -> impl Iterator<Item = EventRef> {
177 match self {
178 Self::Logs(array) => EventArrayIter::Logs(array.iter()),
179 Self::Metrics(array) => EventArrayIter::Metrics(array.iter()),
180 Self::Traces(array) => EventArrayIter::Traces(array.iter()),
181 }
182 }
183
184 pub fn iter_events_mut(&mut self) -> impl Iterator<Item = EventMutRef> {
186 match self {
187 Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()),
188 Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()),
189 Self::Traces(array) => EventArrayIterMut::Traces(array.iter_mut()),
190 }
191 }
192
193 pub fn iter_logs_mut(&mut self) -> impl Iterator<Item = &mut LogEvent> {
195 match self {
196 Self::Logs(array) => TypedArrayIterMut(Some(array.iter_mut())),
197 _ => TypedArrayIterMut(None),
198 }
199 }
200}
201
202impl From<Event> for EventArray {
203 fn from(event: Event) -> Self {
204 match event {
205 Event::Log(log) => Self::Logs(vec![log]),
206 Event::Metric(metric) => Self::Metrics(vec![metric]),
207 Event::Trace(trace) => Self::Traces(vec![trace]),
208 }
209 }
210}
211
212impl From<LogEvent> for EventArray {
213 fn from(log: LogEvent) -> Self {
214 Event::from(log).into()
215 }
216}
217
218impl From<Metric> for EventArray {
219 fn from(metric: Metric) -> Self {
220 Event::from(metric).into()
221 }
222}
223
224impl From<TraceEvent> for EventArray {
225 fn from(trace: TraceEvent) -> Self {
226 Event::from(trace).into()
227 }
228}
229
230impl From<LogArray> for EventArray {
231 fn from(array: LogArray) -> Self {
232 Self::Logs(array)
233 }
234}
235
236impl From<MetricArray> for EventArray {
237 fn from(array: MetricArray) -> Self {
238 Self::Metrics(array)
239 }
240}
241
242impl AddBatchNotifier for EventArray {
243 fn add_batch_notifier(&mut self, batch: BatchNotifier) {
244 match self {
245 Self::Logs(array) => array
246 .iter_mut()
247 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
248 Self::Metrics(array) => array
249 .iter_mut()
250 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
251 Self::Traces(array) => array
252 .iter_mut()
253 .for_each(|item| item.add_finalizer(EventFinalizer::new(batch.clone()))),
254 }
255 }
256}
257
258impl ByteSizeOf for EventArray {
259 fn allocated_bytes(&self) -> usize {
260 match self {
261 Self::Logs(a) => a.allocated_bytes(),
262 Self::Metrics(a) => a.allocated_bytes(),
263 Self::Traces(a) => a.allocated_bytes(),
264 }
265 }
266}
267
268impl EstimatedJsonEncodedSizeOf for EventArray {
269 fn estimated_json_encoded_size_of(&self) -> JsonSize {
270 match self {
271 Self::Logs(v) => v.estimated_json_encoded_size_of(),
272 Self::Traces(v) => v.estimated_json_encoded_size_of(),
273 Self::Metrics(v) => v.estimated_json_encoded_size_of(),
274 }
275 }
276}
277
278impl EventCount for EventArray {
279 fn event_count(&self) -> usize {
280 match self {
281 Self::Logs(a) => a.len(),
282 Self::Metrics(a) => a.len(),
283 Self::Traces(a) => a.len(),
284 }
285 }
286}
287
288impl EventContainer for EventArray {
289 type IntoIter = EventArrayIntoIter;
290
291 fn len(&self) -> usize {
292 match self {
293 Self::Logs(a) => a.len(),
294 Self::Metrics(a) => a.len(),
295 Self::Traces(a) => a.len(),
296 }
297 }
298
299 fn into_events(self) -> Self::IntoIter {
300 match self {
301 Self::Logs(a) => EventArrayIntoIter::Logs(a.into_iter()),
302 Self::Metrics(a) => EventArrayIntoIter::Metrics(a.into_iter()),
303 Self::Traces(a) => EventArrayIntoIter::Traces(a.into_iter()),
304 }
305 }
306}
307
308impl EventDataEq for EventArray {
309 fn event_data_eq(&self, other: &Self) -> bool {
310 match (self, other) {
311 (Self::Logs(a), Self::Logs(b)) => a.event_data_eq(b),
312 (Self::Metrics(a), Self::Metrics(b)) => a.event_data_eq(b),
313 (Self::Traces(a), Self::Traces(b)) => a.event_data_eq(b),
314 _ => false,
315 }
316 }
317}
318
319impl Finalizable for EventArray {
320 fn take_finalizers(&mut self) -> EventFinalizers {
321 match self {
322 Self::Logs(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
323 Self::Metrics(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
324 Self::Traces(a) => a.iter_mut().map(Finalizable::take_finalizers).collect(),
325 }
326 }
327}
328
329#[cfg(test)]
330impl Arbitrary for EventArray {
331 fn arbitrary(g: &mut Gen) -> Self {
332 let len = u8::arbitrary(g) as usize;
333 let choice: u8 = u8::arbitrary(g);
334 if choice % 2 == 0 {
337 let mut logs = Vec::new();
338 for _ in 0..len {
339 logs.push(LogEvent::arbitrary(g));
340 }
341 EventArray::Logs(logs)
342 } else {
343 let mut metrics = Vec::new();
344 for _ in 0..len {
345 metrics.push(Metric::arbitrary(g));
346 }
347 EventArray::Metrics(metrics)
348 }
349 }
350
351 fn shrink(&self) -> Box<dyn Iterator<Item = Self>> {
352 match self {
353 EventArray::Logs(logs) => Box::new(logs.shrink().map(EventArray::Logs)),
354 EventArray::Metrics(metrics) => Box::new(metrics.shrink().map(EventArray::Metrics)),
355 EventArray::Traces(traces) => Box::new(traces.shrink().map(EventArray::Traces)),
356 }
357 }
358}
359
360#[derive(Debug)]
362pub enum EventArrayIter<'a> {
363 Logs(slice::Iter<'a, LogEvent>),
365 Metrics(slice::Iter<'a, Metric>),
367 Traces(slice::Iter<'a, TraceEvent>),
369}
370
371impl<'a> Iterator for EventArrayIter<'a> {
372 type Item = EventRef<'a>;
373
374 fn next(&mut self) -> Option<Self::Item> {
375 match self {
376 Self::Logs(i) => i.next().map(EventRef::from),
377 Self::Metrics(i) => i.next().map(EventRef::from),
378 Self::Traces(i) => i.next().map(EventRef::from),
379 }
380 }
381}
382
383#[derive(Debug)]
385pub enum EventArrayIterMut<'a> {
386 Logs(slice::IterMut<'a, LogEvent>),
388 Metrics(slice::IterMut<'a, Metric>),
390 Traces(slice::IterMut<'a, TraceEvent>),
392}
393
394impl<'a> Iterator for EventArrayIterMut<'a> {
395 type Item = EventMutRef<'a>;
396
397 fn next(&mut self) -> Option<Self::Item> {
398 match self {
399 Self::Logs(i) => i.next().map(EventMutRef::from),
400 Self::Metrics(i) => i.next().map(EventMutRef::from),
401 Self::Traces(i) => i.next().map(EventMutRef::from),
402 }
403 }
404}
405
406#[derive(Debug)]
408pub enum EventArrayIntoIter {
409 Logs(vec::IntoIter<LogEvent>),
411 Metrics(vec::IntoIter<Metric>),
413 Traces(vec::IntoIter<TraceEvent>),
415}
416
417impl Iterator for EventArrayIntoIter {
418 type Item = Event;
419
420 fn next(&mut self) -> Option<Self::Item> {
421 match self {
422 Self::Logs(i) => i.next().map(Into::into),
423 Self::Metrics(i) => i.next().map(Into::into),
424 Self::Traces(i) => i.next().map(Event::Trace),
425 }
426 }
427}
428
429struct TypedArrayIterMut<'a, T>(Option<slice::IterMut<'a, T>>);
430
431impl<'a, T> Iterator for TypedArrayIterMut<'a, T> {
432 type Item = &'a mut T;
433 fn next(&mut self) -> Option<Self::Item> {
434 self.0.as_mut().and_then(Iterator::next)
435 }
436}
437
438#[derive(Debug, Default)]
443pub struct EventArrayBuffer {
444 buffer: Option<EventArray>,
445 max_size: usize,
446}
447
448impl EventArrayBuffer {
449 fn new(max_size: Option<usize>) -> Self {
450 let max_size = max_size.unwrap_or(usize::MAX);
451 let buffer = None;
452 Self { buffer, max_size }
453 }
454
455 #[must_use]
456 fn push(&mut self, event: Event) -> Option<EventArray> {
457 match (event, &mut self.buffer) {
458 (Event::Log(event), Some(EventArray::Logs(array))) if array.len() < self.max_size => {
459 array.push(event);
460 None
461 }
462 (Event::Metric(event), Some(EventArray::Metrics(array)))
463 if array.len() < self.max_size =>
464 {
465 array.push(event);
466 None
467 }
468 (Event::Trace(event), Some(EventArray::Traces(array)))
469 if array.len() < self.max_size =>
470 {
471 array.push(event);
472 None
473 }
474 (event, current) => current.replace(EventArray::from(event)),
475 }
476 }
477
478 fn take(&mut self) -> Option<EventArray> {
479 self.buffer.take()
480 }
481}
482
483pub fn events_into_arrays(
486 events: impl IntoIterator<Item = Event>,
487 max_size: Option<usize>,
488) -> impl Iterator<Item = EventArray> {
489 IntoEventArraysIter {
490 inner: events.into_iter().fuse(),
491 current: EventArrayBuffer::new(max_size),
492 }
493}
494
495pub struct IntoEventArraysIter<I> {
497 inner: iter::Fuse<I>,
498 current: EventArrayBuffer,
499}
500
501impl<I: Iterator<Item = Event>> Iterator for IntoEventArraysIter<I> {
502 type Item = EventArray;
503 fn next(&mut self) -> Option<Self::Item> {
504 for event in self.inner.by_ref() {
505 if let Some(array) = self.current.push(event) {
506 return Some(array);
507 }
508 }
509 self.current.take()
510 }
511}