1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6 BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7 Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17 byte_size_of::ByteSizeOf, config::ComponentKey, finalization, internal_event::TaggedEventsSent,
18 json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::LogNamespace;
25use crate::config::OutputId;
26
27pub mod array;
28pub mod discriminant;
29mod estimated_json_encoded_size_of;
30mod log_event;
31#[cfg(feature = "lua")]
32pub mod lua;
33pub mod merge_state;
34mod metadata;
35pub mod metric;
36pub mod proto;
37mod r#ref;
38mod ser;
39#[cfg(test)]
40mod test;
41mod trace;
42pub mod util;
43#[cfg(feature = "vrl")]
44mod vrl_target;
45
46pub const PARTIAL: &str = "_partial";
47
48#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50#[allow(clippy::large_enum_variant)]
51pub enum Event {
52 Log(LogEvent),
53 Metric(Metric),
54 Trace(TraceEvent),
55}
56
57impl ByteSizeOf for Event {
58 fn allocated_bytes(&self) -> usize {
59 match self {
60 Event::Log(log_event) => log_event.allocated_bytes(),
61 Event::Metric(metric_event) => metric_event.allocated_bytes(),
62 Event::Trace(trace_event) => trace_event.allocated_bytes(),
63 }
64 }
65}
66
67impl EstimatedJsonEncodedSizeOf for Event {
68 fn estimated_json_encoded_size_of(&self) -> JsonSize {
69 match self {
70 Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
71 Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
72 Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
73 }
74 }
75}
76
77impl EventCount for Event {
78 fn event_count(&self) -> usize {
79 1
80 }
81}
82
83impl Finalizable for Event {
84 fn take_finalizers(&mut self) -> EventFinalizers {
85 match self {
86 Event::Log(log_event) => log_event.take_finalizers(),
87 Event::Metric(metric) => metric.take_finalizers(),
88 Event::Trace(trace_event) => trace_event.take_finalizers(),
89 }
90 }
91}
92
93impl GetEventCountTags for Event {
94 fn get_tags(&self) -> TaggedEventsSent {
95 match self {
96 Event::Log(log) => log.get_tags(),
97 Event::Metric(metric) => metric.get_tags(),
98 Event::Trace(trace) => trace.get_tags(),
99 }
100 }
101}
102
103impl Event {
104 pub fn as_log(&self) -> &LogEvent {
110 match self {
111 Event::Log(log) => log,
112 _ => panic!("Failed type coercion, {self:?} is not a log event"),
113 }
114 }
115
116 pub fn as_mut_log(&mut self) -> &mut LogEvent {
122 match self {
123 Event::Log(log) => log,
124 _ => panic!("Failed type coercion, {self:?} is not a log event"),
125 }
126 }
127
128 pub fn into_log(self) -> LogEvent {
134 match self {
135 Event::Log(log) => log,
136 _ => panic!("Failed type coercion, {self:?} is not a log event"),
137 }
138 }
139
140 pub fn try_into_log(self) -> Option<LogEvent> {
144 match self {
145 Event::Log(log) => Some(log),
146 _ => None,
147 }
148 }
149
150 pub fn maybe_as_log(&self) -> Option<&LogEvent> {
154 match self {
155 Event::Log(log) => Some(log),
156 _ => None,
157 }
158 }
159
160 pub fn as_metric(&self) -> &Metric {
166 match self {
167 Event::Metric(metric) => metric,
168 _ => panic!("Failed type coercion, {self:?} is not a metric"),
169 }
170 }
171
172 pub fn as_mut_metric(&mut self) -> &mut Metric {
178 match self {
179 Event::Metric(metric) => metric,
180 _ => panic!("Failed type coercion, {self:?} is not a metric"),
181 }
182 }
183
184 pub fn into_metric(self) -> Metric {
190 match self {
191 Event::Metric(metric) => metric,
192 _ => panic!("Failed type coercion, {self:?} is not a metric"),
193 }
194 }
195
196 pub fn try_into_metric(self) -> Option<Metric> {
200 match self {
201 Event::Metric(metric) => Some(metric),
202 _ => None,
203 }
204 }
205
206 pub fn as_trace(&self) -> &TraceEvent {
212 match self {
213 Event::Trace(trace) => trace,
214 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
215 }
216 }
217
218 pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
224 match self {
225 Event::Trace(trace) => trace,
226 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
227 }
228 }
229
230 pub fn into_trace(self) -> TraceEvent {
236 match self {
237 Event::Trace(trace) => trace,
238 _ => panic!("Failed type coercion, {self:?} is not a trace event"),
239 }
240 }
241
242 pub fn try_into_trace(self) -> Option<TraceEvent> {
246 match self {
247 Event::Trace(trace) => Some(trace),
248 _ => None,
249 }
250 }
251
252 pub fn metadata(&self) -> &EventMetadata {
253 match self {
254 Self::Log(log) => log.metadata(),
255 Self::Metric(metric) => metric.metadata(),
256 Self::Trace(trace) => trace.metadata(),
257 }
258 }
259
260 pub fn metadata_mut(&mut self) -> &mut EventMetadata {
261 match self {
262 Self::Log(log) => log.metadata_mut(),
263 Self::Metric(metric) => metric.metadata_mut(),
264 Self::Trace(trace) => trace.metadata_mut(),
265 }
266 }
267
268 pub fn into_metadata(self) -> EventMetadata {
270 match self {
271 Self::Log(log) => log.into_parts().1,
272 Self::Metric(metric) => metric.into_parts().2,
273 Self::Trace(trace) => trace.into_parts().1,
274 }
275 }
276
277 #[must_use]
278 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
279 match self {
280 Self::Log(log) => log.with_batch_notifier(batch).into(),
281 Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
282 Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
283 }
284 }
285
286 #[must_use]
287 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
288 match self {
289 Self::Log(log) => log.with_batch_notifier_option(batch).into(),
290 Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
291 Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
292 }
293 }
294
295 #[must_use]
297 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
298 self.metadata().source_id()
299 }
300
301 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
303 self.metadata_mut().set_source_id(source_id);
304 }
305
306 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
308 self.metadata_mut().set_upstream_id(upstream_id);
309 }
310
311 pub fn set_source_type(&mut self, source_type: &'static str) {
313 self.metadata_mut().set_source_type(source_type);
314 }
315
316 #[must_use]
318 pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
319 self.metadata_mut().set_source_id(source_id);
320 self
321 }
322
323 #[must_use]
325 pub fn with_source_type(mut self, source_type: &'static str) -> Self {
326 self.metadata_mut().set_source_type(source_type);
327 self
328 }
329
330 #[must_use]
332 pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
333 self.metadata_mut().set_upstream_id(upstream_id);
334 self
335 }
336
337 pub fn from_json_value(
342 value: serde_json::Value,
343 log_namespace: LogNamespace,
344 ) -> crate::Result<Self> {
345 match log_namespace {
346 LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
347 LogNamespace::Legacy => match value {
348 serde_json::Value::Object(fields) => Ok(LogEvent::from(
349 fields
350 .into_iter()
351 .map(|(k, v)| (k.into(), v.into()))
352 .collect::<ObjectMap>(),
353 )
354 .into()),
355 _ => Err(crate::Error::from(
356 "Attempted to convert non-Object JSON into an Event.",
357 )),
358 },
359 }
360 }
361}
362
363impl EventDataEq for Event {
364 fn event_data_eq(&self, other: &Self) -> bool {
365 match (self, other) {
366 (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
367 (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
368 (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
369 _ => false,
370 }
371 }
372}
373
374impl finalization::AddBatchNotifier for Event {
375 fn add_batch_notifier(&mut self, batch: BatchNotifier) {
376 let finalizer = EventFinalizer::new(batch);
377 match self {
378 Self::Log(log) => log.add_finalizer(finalizer),
379 Self::Metric(metric) => metric.add_finalizer(finalizer),
380 Self::Trace(trace) => trace.add_finalizer(finalizer),
381 }
382 }
383}
384
385impl TryInto<serde_json::Value> for Event {
386 type Error = serde_json::Error;
387
388 fn try_into(self) -> Result<serde_json::Value, Self::Error> {
389 match self {
390 Event::Log(fields) => serde_json::to_value(fields),
391 Event::Metric(metric) => serde_json::to_value(metric),
392 Event::Trace(fields) => serde_json::to_value(fields),
393 }
394 }
395}
396
397impl From<proto::StatisticKind> for StatisticKind {
398 fn from(kind: proto::StatisticKind) -> Self {
399 match kind {
400 proto::StatisticKind::Histogram => StatisticKind::Histogram,
401 proto::StatisticKind::Summary => StatisticKind::Summary,
402 }
403 }
404}
405
406impl From<metric::Sample> for proto::DistributionSample {
407 fn from(sample: metric::Sample) -> Self {
408 Self {
409 value: sample.value,
410 rate: sample.rate,
411 }
412 }
413}
414
415impl From<proto::DistributionSample> for metric::Sample {
416 fn from(sample: proto::DistributionSample) -> Self {
417 Self {
418 value: sample.value,
419 rate: sample.rate,
420 }
421 }
422}
423
424impl From<proto::HistogramBucket> for metric::Bucket {
425 fn from(bucket: proto::HistogramBucket) -> Self {
426 Self {
427 upper_limit: bucket.upper_limit,
428 count: u64::from(bucket.count),
429 }
430 }
431}
432
433impl From<metric::Bucket> for proto::HistogramBucket3 {
434 fn from(bucket: metric::Bucket) -> Self {
435 Self {
436 upper_limit: bucket.upper_limit,
437 count: bucket.count,
438 }
439 }
440}
441
442impl From<proto::HistogramBucket3> for metric::Bucket {
443 fn from(bucket: proto::HistogramBucket3) -> Self {
444 Self {
445 upper_limit: bucket.upper_limit,
446 count: bucket.count,
447 }
448 }
449}
450
451impl From<metric::Quantile> for proto::SummaryQuantile {
452 fn from(quantile: metric::Quantile) -> Self {
453 Self {
454 quantile: quantile.quantile,
455 value: quantile.value,
456 }
457 }
458}
459
460impl From<proto::SummaryQuantile> for metric::Quantile {
461 fn from(quantile: proto::SummaryQuantile) -> Self {
462 Self {
463 quantile: quantile.quantile,
464 value: quantile.value,
465 }
466 }
467}
468
469impl From<LogEvent> for Event {
470 fn from(log: LogEvent) -> Self {
471 Event::Log(log)
472 }
473}
474
475impl From<Metric> for Event {
476 fn from(metric: Metric) -> Self {
477 Event::Metric(metric)
478 }
479}
480
481impl From<TraceEvent> for Event {
482 fn from(trace: TraceEvent) -> Self {
483 Event::Trace(trace)
484 }
485}
486
487pub trait MaybeAsLogMut {
488 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
489}
490
491impl MaybeAsLogMut for Event {
492 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
493 match self {
494 Event::Log(log) => Some(log),
495 _ => None,
496 }
497 }
498}