1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{EventArray, EventContainer, LogArray, MetricArray, TraceArray, into_event_stream};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6 BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7 Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17 EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey, finalization,
18 internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::{LogNamespace, OutputId};
25
26pub mod array;
27pub mod discriminant;
28mod estimated_json_encoded_size_of;
29mod log_event;
30#[cfg(feature = "lua")]
31pub mod lua;
32pub mod merge_state;
33mod metadata;
34pub mod metric;
35pub mod proto;
36mod r#ref;
37mod ser;
38#[cfg(test)]
39mod test;
40mod trace;
41pub mod util;
42#[cfg(feature = "vrl")]
43mod vrl_target;
44
45pub const PARTIAL: &str = "_partial";
46
47#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49#[allow(clippy::large_enum_variant)]
50pub enum Event {
51 Log(LogEvent),
52 Metric(Metric),
53 Trace(TraceEvent),
54}
55
56impl ByteSizeOf for Event {
57 fn allocated_bytes(&self) -> usize {
58 match self {
59 Event::Log(log_event) => log_event.allocated_bytes(),
60 Event::Metric(metric_event) => metric_event.allocated_bytes(),
61 Event::Trace(trace_event) => trace_event.allocated_bytes(),
62 }
63 }
64}
65
66impl EstimatedJsonEncodedSizeOf for Event {
67 fn estimated_json_encoded_size_of(&self) -> JsonSize {
68 match self {
69 Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
70 Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
71 Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
72 }
73 }
74}
75
76impl EventCount for Event {
77 fn event_count(&self) -> usize {
78 1
79 }
80}
81
82impl Finalizable for Event {
83 fn take_finalizers(&mut self) -> EventFinalizers {
84 match self {
85 Event::Log(log_event) => log_event.take_finalizers(),
86 Event::Metric(metric) => metric.take_finalizers(),
87 Event::Trace(trace_event) => trace_event.take_finalizers(),
88 }
89 }
90}
91
92impl GetEventCountTags for Event {
93 fn get_tags(&self) -> TaggedEventsSent {
94 match self {
95 Event::Log(log) => log.get_tags(),
96 Event::Metric(metric) => metric.get_tags(),
97 Event::Trace(trace) => trace.get_tags(),
98 }
99 }
100}
101
102impl Event {
103 pub fn as_log(&self) -> &LogEvent {
109 match self {
110 Event::Log(log) => log,
111 _ => panic!("Failed type coercion, {self:?} is not a log event"),
112 }
113 }
114
115 pub fn as_mut_log(&mut self) -> &mut LogEvent {
121 match self {
122 Event::Log(log) => log,
123 _ => panic!("Failed type coercion, {self:?} is not a log event"),
124 }
125 }
126
127 pub fn into_log(self) -> LogEvent {
133 match self {
134 Event::Log(log) => log,
135 _ => panic!("Failed type coercion, {self:?} is not a log event"),
136 }
137 }
138
139 pub fn try_into_log(self) -> Option<LogEvent> {
143 match self {
144 Event::Log(log) => Some(log),
145 _ => None,
146 }
147 }
148
149 pub fn maybe_as_log(&self) -> Option<&LogEvent> {
153 match self {
154 Event::Log(log) => Some(log),
155 _ => None,
156 }
157 }
158
159 pub fn as_metric(&self) -> &Metric {
165 match self {
166 Event::Metric(metric) => metric,
167 _ => panic!("Failed type coercion, {self:?} is not a metric"),
168 }
169 }
170
171 pub fn as_mut_metric(&mut self) -> &mut Metric {
177 match self {
178 Event::Metric(metric) => metric,
179 _ => panic!("Failed type coercion, {self:?} is not a metric"),
180 }
181 }
182
183 pub fn into_metric(self) -> Metric {
189 match self {
190 Event::Metric(metric) => metric,
191 _ => panic!("Failed type coercion, {self:?} is not a metric"),
192 }
193 }
194
195 pub fn try_into_metric(self) -> Option<Metric> {
199 match self {
200 Event::Metric(metric) => Some(metric),
201 _ => None,
202 }
203 }
204
205 pub fn as_trace(&self) -> &TraceEvent {
211 match self {
212 Event::Trace(trace) => trace,
213 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
214 }
215 }
216
217 pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
223 match self {
224 Event::Trace(trace) => trace,
225 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
226 }
227 }
228
229 pub fn into_trace(self) -> TraceEvent {
235 match self {
236 Event::Trace(trace) => trace,
237 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
238 }
239 }
240
241 pub fn try_into_trace(self) -> Option<TraceEvent> {
245 match self {
246 Event::Trace(trace) => Some(trace),
247 _ => None,
248 }
249 }
250
251 pub fn metadata(&self) -> &EventMetadata {
252 match self {
253 Self::Log(log) => log.metadata(),
254 Self::Metric(metric) => metric.metadata(),
255 Self::Trace(trace) => trace.metadata(),
256 }
257 }
258
259 pub fn metadata_mut(&mut self) -> &mut EventMetadata {
260 match self {
261 Self::Log(log) => log.metadata_mut(),
262 Self::Metric(metric) => metric.metadata_mut(),
263 Self::Trace(trace) => trace.metadata_mut(),
264 }
265 }
266
267 pub fn into_metadata(self) -> EventMetadata {
269 match self {
270 Self::Log(log) => log.into_parts().1,
271 Self::Metric(metric) => metric.into_parts().2,
272 Self::Trace(trace) => trace.into_parts().1,
273 }
274 }
275
276 #[must_use]
277 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
278 match self {
279 Self::Log(log) => log.with_batch_notifier(batch).into(),
280 Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
281 Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
282 }
283 }
284
285 #[must_use]
286 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
287 match self {
288 Self::Log(log) => log.with_batch_notifier_option(batch).into(),
289 Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
290 Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
291 }
292 }
293
294 #[must_use]
296 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
297 self.metadata().source_id()
298 }
299
300 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
302 self.metadata_mut().set_source_id(source_id);
303 }
304
305 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
307 self.metadata_mut().set_upstream_id(upstream_id);
308 }
309
310 pub fn set_source_type(&mut self, source_type: &'static str) {
312 self.metadata_mut().set_source_type(source_type);
313 }
314
315 #[must_use]
317 pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
318 self.metadata_mut().set_source_id(source_id);
319 self
320 }
321
322 #[must_use]
324 pub fn with_source_type(mut self, source_type: &'static str) -> Self {
325 self.metadata_mut().set_source_type(source_type);
326 self
327 }
328
329 #[must_use]
331 pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
332 self.metadata_mut().set_upstream_id(upstream_id);
333 self
334 }
335
336 pub fn from_json_value(
341 value: serde_json::Value,
342 log_namespace: LogNamespace,
343 ) -> crate::Result<Self> {
344 match log_namespace {
345 LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
346 LogNamespace::Legacy => match value {
347 serde_json::Value::Object(fields) => Ok(LogEvent::from(
348 fields
349 .into_iter()
350 .map(|(k, v)| (k.into(), v.into()))
351 .collect::<ObjectMap>(),
352 )
353 .into()),
354 _ => Err(crate::Error::from(
355 "Attempted to convert non-Object JSON into an Event.",
356 )),
357 },
358 }
359 }
360}
361
362impl EventDataEq for Event {
363 fn event_data_eq(&self, other: &Self) -> bool {
364 match (self, other) {
365 (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
366 (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
367 (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
368 _ => false,
369 }
370 }
371}
372
373impl finalization::AddBatchNotifier for Event {
374 fn add_batch_notifier(&mut self, batch: BatchNotifier) {
375 let finalizer = EventFinalizer::new(batch);
376 match self {
377 Self::Log(log) => log.add_finalizer(finalizer),
378 Self::Metric(metric) => metric.add_finalizer(finalizer),
379 Self::Trace(trace) => trace.add_finalizer(finalizer),
380 }
381 }
382}
383
384impl TryInto<serde_json::Value> for Event {
385 type Error = serde_json::Error;
386
387 fn try_into(self) -> Result<serde_json::Value, Self::Error> {
388 match self {
389 Event::Log(fields) => serde_json::to_value(fields),
390 Event::Metric(metric) => serde_json::to_value(metric),
391 Event::Trace(fields) => serde_json::to_value(fields),
392 }
393 }
394}
395
396impl From<proto::StatisticKind> for StatisticKind {
397 fn from(kind: proto::StatisticKind) -> Self {
398 match kind {
399 proto::StatisticKind::Histogram => StatisticKind::Histogram,
400 proto::StatisticKind::Summary => StatisticKind::Summary,
401 }
402 }
403}
404
405impl From<metric::Sample> for proto::DistributionSample {
406 fn from(sample: metric::Sample) -> Self {
407 Self {
408 value: sample.value,
409 rate: sample.rate,
410 }
411 }
412}
413
414impl From<proto::DistributionSample> for metric::Sample {
415 fn from(sample: proto::DistributionSample) -> Self {
416 Self {
417 value: sample.value,
418 rate: sample.rate,
419 }
420 }
421}
422
423impl From<proto::HistogramBucket> for metric::Bucket {
424 fn from(bucket: proto::HistogramBucket) -> Self {
425 Self {
426 upper_limit: bucket.upper_limit,
427 count: u64::from(bucket.count),
428 }
429 }
430}
431
432impl From<metric::Bucket> for proto::HistogramBucket3 {
433 fn from(bucket: metric::Bucket) -> Self {
434 Self {
435 upper_limit: bucket.upper_limit,
436 count: bucket.count,
437 }
438 }
439}
440
441impl From<proto::HistogramBucket3> for metric::Bucket {
442 fn from(bucket: proto::HistogramBucket3) -> Self {
443 Self {
444 upper_limit: bucket.upper_limit,
445 count: bucket.count,
446 }
447 }
448}
449
450impl From<metric::Quantile> for proto::SummaryQuantile {
451 fn from(quantile: metric::Quantile) -> Self {
452 Self {
453 quantile: quantile.quantile,
454 value: quantile.value,
455 }
456 }
457}
458
459impl From<proto::SummaryQuantile> for metric::Quantile {
460 fn from(quantile: proto::SummaryQuantile) -> Self {
461 Self {
462 quantile: quantile.quantile,
463 value: quantile.value,
464 }
465 }
466}
467
468impl From<LogEvent> for Event {
469 fn from(log: LogEvent) -> Self {
470 Event::Log(log)
471 }
472}
473
474impl From<Metric> for Event {
475 fn from(metric: Metric) -> Self {
476 Event::Metric(metric)
477 }
478}
479
480impl From<TraceEvent> for Event {
481 fn from(trace: TraceEvent) -> Self {
482 Event::Trace(trace)
483 }
484}
485
486pub trait MaybeAsLogMut {
487 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
488}
489
490impl MaybeAsLogMut for Event {
491 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
492 match self {
493 Event::Log(log) => Some(log),
494 _ => None,
495 }
496 }
497}