vector_core/event/
mod.rs

1use std::{convert::TryInto, fmt::Debug, sync::Arc};
2
3pub use array::{EventArray, EventContainer, LogArray, MetricArray, TraceArray, into_event_stream};
4pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
5pub use finalization::{
6    BatchNotifier, BatchStatus, BatchStatusReceiver, EventFinalizer, EventFinalizers, EventStatus,
7    Finalizable,
8};
9pub use log_event::LogEvent;
10pub use metadata::{DatadogMetricOriginMetadata, EventMetadata, WithMetadata};
11pub use metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
12pub use r#ref::{EventMutRef, EventRef};
13use serde::{Deserialize, Serialize};
14pub use trace::TraceEvent;
15use vector_buffers::EventCount;
16use vector_common::{
17    EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey, finalization,
18    internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
19};
20pub use vrl::value::{KeyString, ObjectMap, Value};
21#[cfg(feature = "vrl")]
22pub use vrl_target::{TargetEvents, VrlTarget};
23
24use crate::config::{LogNamespace, OutputId};
25
26pub mod array;
27pub mod discriminant;
28mod estimated_json_encoded_size_of;
29mod log_event;
30#[cfg(feature = "lua")]
31pub mod lua;
32pub mod merge_state;
33mod metadata;
34pub mod metric;
35pub mod proto;
36mod r#ref;
37mod ser;
38#[cfg(test)]
39mod test;
40mod trace;
41pub mod util;
42#[cfg(feature = "vrl")]
43mod vrl_target;
44
45pub const PARTIAL: &str = "_partial";
46
47#[derive(PartialEq, Debug, Clone, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49#[allow(clippy::large_enum_variant)]
50pub enum Event {
51    Log(LogEvent),
52    Metric(Metric),
53    Trace(TraceEvent),
54}
55
56impl ByteSizeOf for Event {
57    fn allocated_bytes(&self) -> usize {
58        match self {
59            Event::Log(log_event) => log_event.allocated_bytes(),
60            Event::Metric(metric_event) => metric_event.allocated_bytes(),
61            Event::Trace(trace_event) => trace_event.allocated_bytes(),
62        }
63    }
64}
65
66impl EstimatedJsonEncodedSizeOf for Event {
67    fn estimated_json_encoded_size_of(&self) -> JsonSize {
68        match self {
69            Event::Log(log_event) => log_event.estimated_json_encoded_size_of(),
70            Event::Metric(metric_event) => metric_event.estimated_json_encoded_size_of(),
71            Event::Trace(trace_event) => trace_event.estimated_json_encoded_size_of(),
72        }
73    }
74}
75
76impl EventCount for Event {
77    fn event_count(&self) -> usize {
78        1
79    }
80}
81
82impl Finalizable for Event {
83    fn take_finalizers(&mut self) -> EventFinalizers {
84        match self {
85            Event::Log(log_event) => log_event.take_finalizers(),
86            Event::Metric(metric) => metric.take_finalizers(),
87            Event::Trace(trace_event) => trace_event.take_finalizers(),
88        }
89    }
90}
91
92impl GetEventCountTags for Event {
93    fn get_tags(&self) -> TaggedEventsSent {
94        match self {
95            Event::Log(log) => log.get_tags(),
96            Event::Metric(metric) => metric.get_tags(),
97            Event::Trace(trace) => trace.get_tags(),
98        }
99    }
100}
101
102impl Event {
103    /// Return self as a `LogEvent`
104    ///
105    /// # Panics
106    ///
107    /// This function panics if self is anything other than an `Event::Log`.
108    pub fn as_log(&self) -> &LogEvent {
109        match self {
110            Event::Log(log) => log,
111            _ => panic!("Failed type coercion, {self:?} is not a log event"),
112        }
113    }
114
115    /// Return self as a mutable `LogEvent`
116    ///
117    /// # Panics
118    ///
119    /// This function panics if self is anything other than an `Event::Log`.
120    pub fn as_mut_log(&mut self) -> &mut LogEvent {
121        match self {
122            Event::Log(log) => log,
123            _ => panic!("Failed type coercion, {self:?} is not a log event"),
124        }
125    }
126
127    /// Coerces self into a `LogEvent`
128    ///
129    /// # Panics
130    ///
131    /// This function panics if self is anything other than an `Event::Log`.
132    pub fn into_log(self) -> LogEvent {
133        match self {
134            Event::Log(log) => log,
135            _ => panic!("Failed type coercion, {self:?} is not a log event"),
136        }
137    }
138
139    /// Fallibly coerces self into a `LogEvent`
140    ///
141    /// If the event is a `LogEvent`, then `Some(log_event)` is returned, otherwise `None`.
142    pub fn try_into_log(self) -> Option<LogEvent> {
143        match self {
144            Event::Log(log) => Some(log),
145            _ => None,
146        }
147    }
148
149    /// Return self as a `LogEvent` if possible
150    ///
151    /// If the event is a `LogEvent`, then `Some(&log_event)` is returned, otherwise `None`.
152    pub fn maybe_as_log(&self) -> Option<&LogEvent> {
153        match self {
154            Event::Log(log) => Some(log),
155            _ => None,
156        }
157    }
158
159    /// Return self as a `Metric`
160    ///
161    /// # Panics
162    ///
163    /// This function panics if self is anything other than an `Event::Metric`.
164    pub fn as_metric(&self) -> &Metric {
165        match self {
166            Event::Metric(metric) => metric,
167            _ => panic!("Failed type coercion, {self:?} is not a metric"),
168        }
169    }
170
171    /// Return self as a mutable `Metric`
172    ///
173    /// # Panics
174    ///
175    /// This function panics if self is anything other than an `Event::Metric`.
176    pub fn as_mut_metric(&mut self) -> &mut Metric {
177        match self {
178            Event::Metric(metric) => metric,
179            _ => panic!("Failed type coercion, {self:?} is not a metric"),
180        }
181    }
182
183    /// Coerces self into `Metric`
184    ///
185    /// # Panics
186    ///
187    /// This function panics if self is anything other than an `Event::Metric`.
188    pub fn into_metric(self) -> Metric {
189        match self {
190            Event::Metric(metric) => metric,
191            _ => panic!("Failed type coercion, {self:?} is not a metric"),
192        }
193    }
194
195    /// Fallibly coerces self into a `Metric`
196    ///
197    /// If the event is a `Metric`, then `Some(metric)` is returned, otherwise `None`.
198    pub fn try_into_metric(self) -> Option<Metric> {
199        match self {
200            Event::Metric(metric) => Some(metric),
201            _ => None,
202        }
203    }
204
205    /// Return self as a `TraceEvent`
206    ///
207    /// # Panics
208    ///
209    /// This function panics if self is anything other than an `Event::Trace`.
210    pub fn as_trace(&self) -> &TraceEvent {
211        match self {
212            Event::Trace(trace) => trace,
213            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
214        }
215    }
216
217    /// Return self as a mutable `TraceEvent`
218    ///
219    /// # Panics
220    ///
221    /// This function panics if self is anything other than an `Event::Trace`.
222    pub fn as_mut_trace(&mut self) -> &mut TraceEvent {
223        match self {
224            Event::Trace(trace) => trace,
225            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
226        }
227    }
228
229    /// Coerces self into a `TraceEvent`
230    ///
231    /// # Panics
232    ///
233    /// This function panics if self is anything other than an `Event::Trace`.
234    pub fn into_trace(self) -> TraceEvent {
235        match self {
236            Event::Trace(trace) => trace,
237            _ => panic!("Failed type coercion, {self:?} is not a trace event"),
238        }
239    }
240
241    /// Fallibly coerces self into a `TraceEvent`
242    ///
243    /// If the event is a `TraceEvent`, then `Some(trace)` is returned, otherwise `None`.
244    pub fn try_into_trace(self) -> Option<TraceEvent> {
245        match self {
246            Event::Trace(trace) => Some(trace),
247            _ => None,
248        }
249    }
250
251    pub fn metadata(&self) -> &EventMetadata {
252        match self {
253            Self::Log(log) => log.metadata(),
254            Self::Metric(metric) => metric.metadata(),
255            Self::Trace(trace) => trace.metadata(),
256        }
257    }
258
259    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
260        match self {
261            Self::Log(log) => log.metadata_mut(),
262            Self::Metric(metric) => metric.metadata_mut(),
263            Self::Trace(trace) => trace.metadata_mut(),
264        }
265    }
266
267    /// Destroy the event and return the metadata.
268    pub fn into_metadata(self) -> EventMetadata {
269        match self {
270            Self::Log(log) => log.into_parts().1,
271            Self::Metric(metric) => metric.into_parts().2,
272            Self::Trace(trace) => trace.into_parts().1,
273        }
274    }
275
276    #[must_use]
277    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
278        match self {
279            Self::Log(log) => log.with_batch_notifier(batch).into(),
280            Self::Metric(metric) => metric.with_batch_notifier(batch).into(),
281            Self::Trace(trace) => trace.with_batch_notifier(batch).into(),
282        }
283    }
284
285    #[must_use]
286    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
287        match self {
288            Self::Log(log) => log.with_batch_notifier_option(batch).into(),
289            Self::Metric(metric) => metric.with_batch_notifier_option(batch).into(),
290            Self::Trace(trace) => trace.with_batch_notifier_option(batch).into(),
291        }
292    }
293
294    /// Returns a reference to the event metadata source.
295    #[must_use]
296    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
297        self.metadata().source_id()
298    }
299
300    /// Sets the `source_id` in the event metadata to the provided value.
301    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
302        self.metadata_mut().set_source_id(source_id);
303    }
304
305    /// Sets the `upstream_id` in the event metadata to the provided value.
306    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
307        self.metadata_mut().set_upstream_id(upstream_id);
308    }
309
310    /// Sets the `source_type` in the event metadata to the provided value.
311    pub fn set_source_type(&mut self, source_type: &'static str) {
312        self.metadata_mut().set_source_type(source_type);
313    }
314
315    /// Sets the `source_id` in the event metadata to the provided value.
316    #[must_use]
317    pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
318        self.metadata_mut().set_source_id(source_id);
319        self
320    }
321
322    /// Sets the `source_type` in the event metadata to the provided value.
323    #[must_use]
324    pub fn with_source_type(mut self, source_type: &'static str) -> Self {
325        self.metadata_mut().set_source_type(source_type);
326        self
327    }
328
329    /// Sets the `upstream_id` in the event metadata to the provided value.
330    #[must_use]
331    pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
332        self.metadata_mut().set_upstream_id(upstream_id);
333        self
334    }
335
336    /// Creates an Event from a JSON value.
337    ///
338    /// # Errors
339    /// If a non-object JSON value is passed in with the `Legacy` namespace, this will return an error.
340    pub fn from_json_value(
341        value: serde_json::Value,
342        log_namespace: LogNamespace,
343    ) -> crate::Result<Self> {
344        match log_namespace {
345            LogNamespace::Vector => Ok(LogEvent::from(Value::from(value)).into()),
346            LogNamespace::Legacy => match value {
347                serde_json::Value::Object(fields) => Ok(LogEvent::from(
348                    fields
349                        .into_iter()
350                        .map(|(k, v)| (k.into(), v.into()))
351                        .collect::<ObjectMap>(),
352                )
353                .into()),
354                _ => Err(crate::Error::from(
355                    "Attempted to convert non-Object JSON into an Event.",
356                )),
357            },
358        }
359    }
360}
361
362impl EventDataEq for Event {
363    fn event_data_eq(&self, other: &Self) -> bool {
364        match (self, other) {
365            (Self::Log(a), Self::Log(b)) => a.event_data_eq(b),
366            (Self::Metric(a), Self::Metric(b)) => a.event_data_eq(b),
367            (Self::Trace(a), Self::Trace(b)) => a.event_data_eq(b),
368            _ => false,
369        }
370    }
371}
372
373impl finalization::AddBatchNotifier for Event {
374    fn add_batch_notifier(&mut self, batch: BatchNotifier) {
375        let finalizer = EventFinalizer::new(batch);
376        match self {
377            Self::Log(log) => log.add_finalizer(finalizer),
378            Self::Metric(metric) => metric.add_finalizer(finalizer),
379            Self::Trace(trace) => trace.add_finalizer(finalizer),
380        }
381    }
382}
383
384impl TryInto<serde_json::Value> for Event {
385    type Error = serde_json::Error;
386
387    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
388        match self {
389            Event::Log(fields) => serde_json::to_value(fields),
390            Event::Metric(metric) => serde_json::to_value(metric),
391            Event::Trace(fields) => serde_json::to_value(fields),
392        }
393    }
394}
395
396impl From<proto::StatisticKind> for StatisticKind {
397    fn from(kind: proto::StatisticKind) -> Self {
398        match kind {
399            proto::StatisticKind::Histogram => StatisticKind::Histogram,
400            proto::StatisticKind::Summary => StatisticKind::Summary,
401        }
402    }
403}
404
405impl From<metric::Sample> for proto::DistributionSample {
406    fn from(sample: metric::Sample) -> Self {
407        Self {
408            value: sample.value,
409            rate: sample.rate,
410        }
411    }
412}
413
414impl From<proto::DistributionSample> for metric::Sample {
415    fn from(sample: proto::DistributionSample) -> Self {
416        Self {
417            value: sample.value,
418            rate: sample.rate,
419        }
420    }
421}
422
423impl From<proto::HistogramBucket> for metric::Bucket {
424    fn from(bucket: proto::HistogramBucket) -> Self {
425        Self {
426            upper_limit: bucket.upper_limit,
427            count: u64::from(bucket.count),
428        }
429    }
430}
431
432impl From<metric::Bucket> for proto::HistogramBucket3 {
433    fn from(bucket: metric::Bucket) -> Self {
434        Self {
435            upper_limit: bucket.upper_limit,
436            count: bucket.count,
437        }
438    }
439}
440
441impl From<proto::HistogramBucket3> for metric::Bucket {
442    fn from(bucket: proto::HistogramBucket3) -> Self {
443        Self {
444            upper_limit: bucket.upper_limit,
445            count: bucket.count,
446        }
447    }
448}
449
450impl From<metric::Quantile> for proto::SummaryQuantile {
451    fn from(quantile: metric::Quantile) -> Self {
452        Self {
453            quantile: quantile.quantile,
454            value: quantile.value,
455        }
456    }
457}
458
459impl From<proto::SummaryQuantile> for metric::Quantile {
460    fn from(quantile: proto::SummaryQuantile) -> Self {
461        Self {
462            quantile: quantile.quantile,
463            value: quantile.value,
464        }
465    }
466}
467
468impl From<LogEvent> for Event {
469    fn from(log: LogEvent) -> Self {
470        Event::Log(log)
471    }
472}
473
474impl From<Metric> for Event {
475    fn from(metric: Metric) -> Self {
476        Event::Metric(metric)
477    }
478}
479
480impl From<TraceEvent> for Event {
481    fn from(trace: TraceEvent) -> Self {
482        Event::Trace(trace)
483    }
484}
485
486pub trait MaybeAsLogMut {
487    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent>;
488}
489
490impl MaybeAsLogMut for Event {
491    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
492        match self {
493            Event::Log(log) => Some(log),
494            _ => None,
495        }
496    }
497}