vector_core/event/
mod.rs

1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6    BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7    Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17    byte_size_of::ByteSizeOf, config::ComponentKey, finalization, internal_event::TaggedEventsSent,
18    json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::LogNamespace;
25use crate::config::OutputId;
26
27pub mod array;
28pub mod discriminant;
29mod estimated_json_encoded_size_of;
30mod log_event;
31#[cfg(feature = "lua")]
32pub mod lua;
33pub mod merge_state;
34mod metadata;
35pub mod metric;
36pub mod proto;
37mod r#ref;
38mod ser;
39#[cfg(test)]
40mod test;
41mod trace;
42pub mod util;
43#[cfg(feature = "vrl")]
44mod vrl_target;
45
46pub const PARTIAL: &str = "_partial";
47
48#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
49#[serde(rename_all = "snake_case")]
50#[allow(clippy::large_enum_variant)]
51pub enum Event {
52    Log(LogEvent),
53    Metric(Metric),
54    Trace(TraceEvent),
55}
56
57impl ByteSizeOf for Event {
58    fn allocated_bytes(&self) -> usize {
59        match self {
60            Event::Log(log_event) => log_event.allocated_bytes(),
61            Event::Metric(metric_event) => metric_event.allocated_bytes(),
62            Event::Trace(trace_event) => trace_event.allocated_bytes(),
63        }
64    }
65}
66
67impl EstimatedJsonEncodedSizeOf for Event {
68    fn estimated_json_encoded_size_of(&self) -> JsonSize {
69        match self {
70            Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
71            Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
72            Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
73        }
74    }
75}
76
77impl EventCount for Event {
78    fn event_count(&self) -> usize {
79        1
80    }
81}
82
83impl Finalizable for Event {
84    fn take_finalizers(&mut self) -> EventFinalizers {
85        match self {
86            Event::Log(log_event) => log_event.take_finalizers(),
87            Event::Metric(metric) => metric.take_finalizers(),
88            Event::Trace(trace_event) => trace_event.take_finalizers(),
89        }
90    }
91}
92
93impl GetEventCountTags for Event {
94    fn get_tags(&self) -> TaggedEventsSent {
95        match self {
96            Event::Log(log) => log.get_tags(),
97            Event::Metric(metric) => metric.get_tags(),
98            Event::Trace(trace) => trace.get_tags(),
99        }
100    }
101}
102
103impl Event {
104    /// Return self as a `LogEvent`
105    ///
106    /// # Panics
107    ///
108    /// This function panics if self is anything other than an `Event::Log`.
109    pub fn as_log(&self) -> &LogEvent {
110        match self {
111            Event::Log(log) => log,
112            _ => panic!("Failed type coercion, {self:?} is not a log event"),
113        }
114    }
115
116    /// Return self as a mutable `LogEvent`
117    ///
118    /// # Panics
119    ///
120    /// This function panics if self is anything other than an `Event::Log`.
121    pub fn as_mut_log(&mut self) -> &mut LogEvent {
122        match self {
123            Event::Log(log) => log,
124            _ => panic!("Failed type coercion, {self:?} is not a log event"),
125        }
126    }
127
128    /// Coerces self into a `LogEvent`
129    ///
130    /// # Panics
131    ///
132    /// This function panics if self is anything other than an `Event::Log`.
133    pub fn into_log(self) -> LogEvent {
134        match self {
135            Event::Log(log) => log,
136            _ => panic!("Failed type coercion, {self:?} is not a log event"),
137        }
138    }
139
140    /// Fallibly coerces self into a `LogEvent`
141    ///
142    /// If the event is a `LogEvent`, then `Some(log_event)` is returned, otherwise `None`.
143    pub fn try_into_log(self) -> Option<LogEvent> {
144        match self {
145            Event::Log(log) => Some(log),
146            _ => None,
147        }
148    }
149
150    /// Return self as a `LogEvent` if possible
151    ///
152    /// If the event is a `LogEvent`, then `Some(&log_event)` is returned, otherwise `None`.
153    pub fn maybe_as_log(&self) -> Option<&LogEvent> {
154        match self {
155            Event::Log(log) => Some(log),
156            _ => None,
157        }
158    }
159
160    /// Return self as a `Metric`
161    ///
162    /// # Panics
163    ///
164    /// This function panics if self is anything other than an `Event::Metric`.
165    pub fn as_metric(&self) -> &Metric {
166        match self {
167            Event::Metric(metric) => metric,
168            _ => panic!("Failed type coercion, {self:?} is not a metric"),
169        }
170    }
171
172    /// Return self as a mutable `Metric`
173    ///
174    /// # Panics
175    ///
176    /// This function panics if self is anything other than an `Event::Metric`.
177    pub fn as_mut_metric(&mut self) -> &mut Metric {
178        match self {
179            Event::Metric(metric) => metric,
180            _ => panic!("Failed type coercion, {self:?} is not a metric"),
181        }
182    }
183
184    /// Coerces self into `Metric`
185    ///
186    /// # Panics
187    ///
188    /// This function panics if self is anything other than an `Event::Metric`.
189    pub fn into_metric(self) -> Metric {
190        match self {
191            Event::Metric(metric) => metric,
192            _ => panic!("Failed type coercion, {self:?} is not a metric"),
193        }
194    }
195
196    /// Fallibly coerces self into a `Metric`
197    ///
198    /// If the event is a `Metric`, then `Some(metric)` is returned, otherwise `None`.
199    pub fn try_into_metric(self) -> Option<Metric> {
200        match self {
201            Event::Metric(metric) => Some(metric),
202            _ => None,
203        }
204    }
205
206    /// Return self as a `TraceEvent`
207    ///
208    /// # Panics
209    ///
210    /// This function panics if self is anything other than an `Event::Trace`.
211    pub fn as_trace(&self) -> &TraceEvent {
212        match self {
213            Event::Trace(trace) => trace,
214            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
215        }
216    }
217
218    /// Return self as a mutable `TraceEvent`
219    ///
220    /// # Panics
221    ///
222    /// This function panics if self is anything other than an `Event::Trace`.
223    pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
224        match self {
225            Event::Trace(trace) => trace,
226            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
227        }
228    }
229
230    /// Coerces self into a `TraceEvent`
231    ///
232    /// # Panics
233    ///
234    /// This function panics if self is anything other than an `Event::Trace`.
235    pub fn into_trace(self) -> TraceEvent {
236        match self {
237            Event::Trace(trace) => trace,
238            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
239        }
240    }
241
242    /// Fallibly coerces self into a `TraceEvent`
243    ///
244    /// If the event is a `TraceEvent`, then `Some(trace)` is returned, otherwise `None`.
245    pub fn try_into_trace(self) -> Option<TraceEvent> {
246        match self {
247            Event::Trace(trace) => Some(trace),
248            _ => None,
249        }
250    }
251
252    pub fn metadata(&self) -> &EventMetadata {
253        match self {
254            Self::Log(log) => log.metadata(),
255            Self::Metric(metric) => metric.metadata(),
256            Self::Trace(trace) => trace.metadata(),
257        }
258    }
259
260    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
261        match self {
262            Self::Log(log) => log.metadata_mut(),
263            Self::Metric(metric) => metric.metadata_mut(),
264            Self::Trace(trace) => trace.metadata_mut(),
265        }
266    }
267
268    /// Destroy the event and return the metadata.
269    pub fn into_metadata(self) -> EventMetadata {
270        match self {
271            Self::Log(log) => log.into_parts().1,
272            Self::Metric(metric) => metric.into_parts().2,
273            Self::Trace(trace) => trace.into_parts().1,
274        }
275    }
276
277    #[must_use]
278    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
279        match self {
280            Self::Log(log) => log.with_batch_notifier(batch).into(),
281            Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
282            Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
283        }
284    }
285
286    #[must_use]
287    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
288        match self {
289            Self::Log(log) => log.with_batch_notifier_option(batch).into(),
290            Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
291            Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
292        }
293    }
294
295    /// Returns a reference to the event metadata source.
296    #[must_use]
297    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
298        self.metadata().source_id()
299    }
300
301    /// Sets the `source_id` in the event metadata to the provided value.
302    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
303        self.metadata_mut().set_source_id(source_id);
304    }
305
306    /// Sets the `upstream_id` in the event metadata to the provided value.
307    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
308        self.metadata_mut().set_upstream_id(upstream_id);
309    }
310
311    /// Sets the `source_type` in the event metadata to the provided value.
312    pub fn set_source_type(&mut self, source_type: &'static str) {
313        self.metadata_mut().set_source_type(source_type);
314    }
315
316    /// Sets the `source_id` in the event metadata to the provided value.
317    #[must_use]
318    pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
319        self.metadata_mut().set_source_id(source_id);
320        self
321    }
322
323    /// Sets the `source_type` in the event metadata to the provided value.
324    #[must_use]
325    pub fn with_source_type(mut self, source_type: &'static str) -> Self {
326        self.metadata_mut().set_source_type(source_type);
327        self
328    }
329
330    /// Sets the `upstream_id` in the event metadata to the provided value.
331    #[must_use]
332    pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
333        self.metadata_mut().set_upstream_id(upstream_id);
334        self
335    }
336
337    /// Creates an Event from a JSON value.
338    ///
339    /// # Errors
340    /// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error.
341    pub fn from_json_value(
342        value: serde_json::Value,
343        log_namespace: LogNamespace,
344    ) -> crate::Result<Self> {
345        match log_namespace {
346            LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
347            LogNamespace::Legacy => match value {
348                serde_json::Value::Object(fields) => Ok(LogEvent::from(
349                    fields
350                        .into_iter()
351                        .map(|(k, v)| (k.into(), v.into()))
352                        .collect::<ObjectMap>(),
353                )
354                .into()),
355                _ => Err(crate::Error::from(
356                    "Attempted to convert non-Object JSON into an Event.",
357                )),
358            },
359        }
360    }
361}
362
363impl EventDataEq for Event {
364    fn event_data_eq(&self, other: &Self) -> bool {
365        match (self, other) {
366            (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
367            (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
368            (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
369            _ => false,
370        }
371    }
372}
373
374impl finalization::AddBatchNotifier for Event {
375    fn add_batch_notifier(&mut self, batch: BatchNotifier) {
376        let finalizer = EventFinalizer::new(batch);
377        match self {
378            Self::Log(log) => log.add_finalizer(finalizer),
379            Self::Metric(metric) => metric.add_finalizer(finalizer),
380            Self::Trace(trace) => trace.add_finalizer(finalizer),
381        }
382    }
383}
384
385impl TryInto<serde_json::Value> for Event {
386    type Error = serde_json::Error;
387
388    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
389        match self {
390            Event::Log(fields) => serde_json::to_value(fields),
391            Event::Metric(metric) => serde_json::to_value(metric),
392            Event::Trace(fields) => serde_json::to_value(fields),
393        }
394    }
395}
396
397impl From<proto::StatisticKind> for StatisticKind {
398    fn from(kind: proto::StatisticKind) -> Self {
399        match kind {
400            proto::StatisticKind::Histogram => StatisticKind::Histogram,
401            proto::StatisticKind::Summary => StatisticKind::Summary,
402        }
403    }
404}
405
406impl From<metric::Sample> for proto::DistributionSample {
407    fn from(sample: metric::Sample) -> Self {
408        Self {
409            value: sample.value,
410            rate: sample.rate,
411        }
412    }
413}
414
415impl From<proto::DistributionSample> for metric::Sample {
416    fn from(sample: proto::DistributionSample) -> Self {
417        Self {
418            value: sample.value,
419            rate: sample.rate,
420        }
421    }
422}
423
424impl From<proto::HistogramBucket> for metric::Bucket {
425    fn from(bucket: proto::HistogramBucket) -> Self {
426        Self {
427            upper_limit: bucket.upper_limit,
428            count: u64::from(bucket.count),
429        }
430    }
431}
432
433impl From<metric::Bucket> for proto::HistogramBucket3 {
434    fn from(bucket: metric::Bucket) -> Self {
435        Self {
436            upper_limit: bucket.upper_limit,
437            count: bucket.count,
438        }
439    }
440}
441
442impl From<proto::HistogramBucket3> for metric::Bucket {
443    fn from(bucket: proto::HistogramBucket3) -> Self {
444        Self {
445            upper_limit: bucket.upper_limit,
446            count: bucket.count,
447        }
448    }
449}
450
451impl From<metric::Quantile> for proto::SummaryQuantile {
452    fn from(quantile: metric::Quantile) -> Self {
453        Self {
454            quantile: quantile.quantile,
455            value: quantile.value,
456        }
457    }
458}
459
460impl From<proto::SummaryQuantile> for metric::Quantile {
461    fn from(quantile: proto::SummaryQuantile) -> Self {
462        Self {
463            quantile: quantile.quantile,
464            value: quantile.value,
465        }
466    }
467}
468
469impl From<LogEvent> for Event {
470    fn from(log: LogEvent) -> Self {
471        Event::Log(log)
472    }
473}
474
475impl From<Metric> for Event {
476    fn from(metric: Metric) -> Self {
477        Event::Metric(metric)
478    }
479}
480
481impl From<TraceEvent> for Event {
482    fn from(trace: TraceEvent) -> Self {
483        Event::Trace(trace)
484    }
485}
486
487pub trait MaybeAsLogMut {
488    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
489}
490
491impl MaybeAsLogMut for Event {
492    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
493        match self {
494            Event::Log(log) => Some(log),
495            _ => None,
496        }
497    }
498}