vector_core/event/
log_event.rs

1use std::{
2    collections::HashMap,
3    convert::{TryFrom, TryInto},
4    fmt::Debug,
5    iter::FromIterator,
6    mem::size_of,
7    num::NonZeroUsize,
8    sync::{Arc, LazyLock},
9};
10
11use bytes::Bytes;
12use chrono::Utc;
13use crossbeam_utils::atomic::AtomicCell;
14use lookup::{PathPrefix, lookup_v2::TargetPath, metadata_path, path};
15use serde::{Deserialize, Serialize, Serializer};
16use vector_common::{
17    EventDataEq,
18    byte_size_of::ByteSizeOf,
19    internal_event::{OptionalTag, TaggedEventsSent},
20    json_size::{JsonSize, NonZeroJsonSize},
21    request_metadata::GetEventCountTags,
22};
23use vrl::{
24    event_path, owned_value_path,
25    path::{OwnedTargetPath, PathParseError, parse_target_path},
26};
27
28use super::{
29    EventFinalizers, Finalizable, KeyString, ObjectMap, Value,
30    estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
31    finalization::{BatchNotifier, EventFinalizer},
32    metadata::EventMetadata,
33    util,
34};
35use crate::{
36    config::{LogNamespace, log_schema, telemetry},
37    event::{
38        MaybeAsLogMut,
39        util::log::{all_fields, all_fields_skip_array_elements, all_metadata_fields},
40    },
41};
42
43static VECTOR_SOURCE_TYPE_PATH: LazyLock<Option<OwnedTargetPath>> = LazyLock::new(|| {
44    Some(OwnedTargetPath::metadata(owned_value_path!(
45        "vector",
46        "source_type"
47    )))
48});
49
50#[derive(Debug, Deserialize)]
51struct Inner {
52    #[serde(flatten)]
53    fields: Value,
54
55    #[serde(skip)]
56    size_cache: AtomicCell<Option<NonZeroUsize>>,
57
58    #[serde(skip)]
59    json_encoded_size_cache: AtomicCell<Option<NonZeroJsonSize>>,
60}
61
62impl Inner {
63    fn invalidate(&self) {
64        self.size_cache.store(None);
65        self.json_encoded_size_cache.store(None);
66    }
67
68    fn as_value(&self) -> &Value {
69        &self.fields
70    }
71}
72
73impl ByteSizeOf for Inner {
74    fn size_of(&self) -> usize {
75        self.size_cache
76            .load()
77            .unwrap_or_else(|| {
78                let size = size_of::<Self>() + self.allocated_bytes();
79                // The size of self will always be non-zero, and
80                // adding the allocated bytes cannot make it overflow
81                // since `usize` has a range the same as pointer
82                // space. Hence, the expect below cannot fail.
83                let size = NonZeroUsize::new(size).expect("Size cannot be zero");
84                self.size_cache.store(Some(size));
85                size
86            })
87            .into()
88    }
89
90    fn allocated_bytes(&self) -> usize {
91        self.fields.allocated_bytes()
92    }
93}
94
95impl EstimatedJsonEncodedSizeOf for Inner {
96    fn estimated_json_encoded_size_of(&self) -> JsonSize {
97        self.json_encoded_size_cache
98            .load()
99            .unwrap_or_else(|| {
100                let size = self.fields.estimated_json_encoded_size_of();
101                let size = NonZeroJsonSize::new(size).expect("Size cannot be zero");
102
103                self.json_encoded_size_cache.store(Some(size));
104                size
105            })
106            .into()
107    }
108}
109
110impl Clone for Inner {
111    fn clone(&self) -> Self {
112        Self {
113            fields: self.fields.clone(),
114            // This clone is only ever used in combination with
115            // `Arc::make_mut`, so don't bother fetching the size
116            // cache to copy it since it will be invalidated anyways.
117            size_cache: None.into(),
118
119            // This clone is only ever used in combination with
120            // `Arc::make_mut`, so don't bother fetching the size
121            // cache to copy it since it will be invalidated anyways.
122            json_encoded_size_cache: None.into(),
123        }
124    }
125}
126
127impl Default for Inner {
128    fn default() -> Self {
129        Self {
130            // **IMPORTANT:** Due to numerous legacy reasons this **must** be a Map variant.
131            fields: Value::Object(Default::default()),
132            size_cache: Default::default(),
133            json_encoded_size_cache: Default::default(),
134        }
135    }
136}
137
138impl From<Value> for Inner {
139    fn from(fields: Value) -> Self {
140        Self {
141            fields,
142            size_cache: Default::default(),
143            json_encoded_size_cache: Default::default(),
144        }
145    }
146}
147
148impl PartialEq for Inner {
149    fn eq(&self, other: &Self) -> bool {
150        self.fields.eq(&other.fields)
151    }
152}
153
154#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
155pub struct LogEvent {
156    #[serde(flatten)]
157    inner: Arc<Inner>,
158
159    #[serde(skip)]
160    metadata: EventMetadata,
161}
162
163impl LogEvent {
164    /// This used to be the implementation for `LogEvent::from(&'str)`, but this is now only
165    /// valid for `LogNamespace::Legacy`
166    pub fn from_str_legacy(msg: impl Into<String>) -> Self {
167        let mut log = LogEvent::default();
168        log.maybe_insert(log_schema().message_key_target_path(), msg.into());
169
170        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
171            log.insert(timestamp_key, Utc::now());
172        }
173
174        log
175    }
176
177    /// This used to be the implementation for `LogEvent::from(Bytes)`, but this is now only
178    /// valid for `LogNamespace::Legacy`
179    pub fn from_bytes_legacy(msg: &Bytes) -> Self {
180        Self::from_str_legacy(String::from_utf8_lossy(msg.as_ref()).to_string())
181    }
182
183    pub fn value(&self) -> &Value {
184        self.inner.as_ref().as_value()
185    }
186
187    pub fn value_mut(&mut self) -> &mut Value {
188        let result = Arc::make_mut(&mut self.inner);
189        // We MUST invalidate the inner size cache when making a
190        // mutable copy, since the _next_ action will modify the data.
191        result.invalidate();
192        &mut result.fields
193    }
194
195    pub fn metadata(&self) -> &EventMetadata {
196        &self.metadata
197    }
198
199    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
200        &mut self.metadata
201    }
202
203    /// This detects the log namespace used at runtime by checking for the existence
204    /// of the read-only "vector" metadata, which only exists (and is required to exist)
205    /// with the `Vector` log namespace.
206    pub fn namespace(&self) -> LogNamespace {
207        if self.contains((PathPrefix::Metadata, path!("vector"))) {
208            LogNamespace::Vector
209        } else {
210            LogNamespace::Legacy
211        }
212    }
213}
214
215impl ByteSizeOf for LogEvent {
216    fn allocated_bytes(&self) -> usize {
217        self.inner.size_of() + self.metadata.allocated_bytes()
218    }
219}
220
221impl Finalizable for LogEvent {
222    fn take_finalizers(&mut self) -> EventFinalizers {
223        self.metadata.take_finalizers()
224    }
225}
226
227impl EstimatedJsonEncodedSizeOf for LogEvent {
228    fn estimated_json_encoded_size_of(&self) -> JsonSize {
229        self.inner.estimated_json_encoded_size_of()
230    }
231}
232
233impl GetEventCountTags for LogEvent {
234    fn get_tags(&self) -> TaggedEventsSent {
235        let source = if telemetry().tags().emit_source {
236            self.metadata().source_id().cloned().into()
237        } else {
238            OptionalTag::Ignored
239        };
240
241        let service = if telemetry().tags().emit_service {
242            self.get_by_meaning("service")
243                .map(|value| value.to_string_lossy().to_string())
244                .into()
245        } else {
246            OptionalTag::Ignored
247        };
248
249        TaggedEventsSent { source, service }
250    }
251}
252
253impl LogEvent {
254    #[must_use]
255    pub fn new_with_metadata(metadata: EventMetadata) -> Self {
256        Self {
257            inner: Default::default(),
258            metadata,
259        }
260    }
261
262    ///  Create a `LogEvent` from a `Value` and `EventMetadata`
263    pub fn from_parts(value: Value, metadata: EventMetadata) -> Self {
264        Self {
265            inner: Arc::new(value.into()),
266            metadata,
267        }
268    }
269
270    ///  Create a `LogEvent` from an `ObjectMap` and `EventMetadata`
271    pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self {
272        let inner = Arc::new(Inner::from(Value::Object(map)));
273        Self { inner, metadata }
274    }
275
276    /// Convert a `LogEvent` into a tuple of its components
277    pub fn into_parts(mut self) -> (Value, EventMetadata) {
278        self.value_mut();
279
280        let value = Arc::try_unwrap(self.inner)
281            .unwrap_or_else(|_| unreachable!("inner fields already cloned after owning"))
282            .fields;
283        let metadata = self.metadata;
284        (value, metadata)
285    }
286
287    #[must_use]
288    pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
289        self.metadata = self.metadata.with_batch_notifier(batch);
290        self
291    }
292
293    #[must_use]
294    pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
295        self.metadata = self.metadata.with_batch_notifier_option(batch);
296        self
297    }
298
299    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
300        self.metadata.add_finalizer(finalizer);
301    }
302
303    /// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
304    /// # Errors
305    /// Will return an error if path parsing failed.
306    pub fn parse_path_and_get_value(
307        &self,
308        path: impl AsRef<str>,
309    ) -> Result<Option<&Value>, PathParseError> {
310        parse_target_path(path.as_ref()).map(|path| self.get(&path))
311    }
312
313    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
314    pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
315        match key.prefix() {
316            PathPrefix::Event => self.inner.fields.get(key.value_path()),
317            PathPrefix::Metadata => self.metadata.value().get(key.value_path()),
318        }
319    }
320
321    /// Retrieves the value of a field based on it's meaning.
322    /// This will first check if the value has previously been dropped. It is worth being
323    /// aware that if the field has been dropped and then somehow re-added, we still fetch
324    /// the dropped value here.
325    pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
326        self.metadata().dropped_field(&meaning).or_else(|| {
327            self.metadata()
328                .schema_definition()
329                .meaning_path(meaning.as_ref())
330                .and_then(|path| self.get(path))
331        })
332    }
333
334    /// Retrieves the mutable value of a field based on it's meaning.
335    /// Note that this does _not_ check the dropped fields, unlike `get_by_meaning`, since the
336    /// purpose of the mutable reference is to be able to modify the value and modifying the dropped
337    /// fields has no effect on the resulting event.
338    pub fn get_mut_by_meaning(&mut self, meaning: impl AsRef<str>) -> Option<&mut Value> {
339        Arc::clone(self.metadata.schema_definition())
340            .meaning_path(meaning.as_ref())
341            .and_then(|path| self.get_mut(path))
342    }
343
344    /// Retrieves the target path of a field based on the specified `meaning`.
345    pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
346        self.metadata()
347            .schema_definition()
348            .meaning_path(meaning.as_ref())
349    }
350
351    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
352    pub fn get_mut<'a>(&mut self, path: impl TargetPath<'a>) -> Option<&mut Value> {
353        match path.prefix() {
354            PathPrefix::Event => self.value_mut().get_mut(path.value_path()),
355            PathPrefix::Metadata => self.metadata.value_mut().get_mut(path.value_path()),
356        }
357    }
358
359    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
360    pub fn contains<'a>(&self, path: impl TargetPath<'a>) -> bool {
361        match path.prefix() {
362            PathPrefix::Event => self.value().contains(path.value_path()),
363            PathPrefix::Metadata => self.metadata.value().contains(path.value_path()),
364        }
365    }
366
367    /// Parse the specified `path` and if there are no parsing errors, attempt to insert the specified `value`.
368    ///
369    /// # Errors
370    /// Will return an error if path parsing failed.
371    pub fn parse_path_and_insert(
372        &mut self,
373        path: impl AsRef<str>,
374        value: impl Into<Value>,
375    ) -> Result<Option<Value>, PathParseError> {
376        let target_path = parse_target_path(path.as_ref())?;
377        Ok(self.insert(&target_path, value))
378    }
379
380    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
381    pub fn insert<'a>(
382        &mut self,
383        path: impl TargetPath<'a>,
384        value: impl Into<Value>,
385    ) -> Option<Value> {
386        match path.prefix() {
387            PathPrefix::Event => self.value_mut().insert(path.value_path(), value.into()),
388            PathPrefix::Metadata => self
389                .metadata
390                .value_mut()
391                .insert(path.value_path(), value.into()),
392        }
393    }
394
395    pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
396        if let Some(path) = path {
397            self.insert(path, value);
398        }
399    }
400
401    // deprecated - using this means the schema is unknown
402    pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into<Value>) {
403        if !self.contains(path.clone()) {
404            self.insert(path, value);
405        }
406    }
407
408    /// Rename a key
409    ///
410    /// If `to_key` already exists in the structure its value will be overwritten.
411    pub fn rename_key<'a>(&mut self, from: impl TargetPath<'a>, to: impl TargetPath<'a>) {
412        if let Some(val) = self.remove(from) {
413            self.insert(to, val);
414        }
415    }
416
417    pub fn remove<'a>(&mut self, path: impl TargetPath<'a>) -> Option<Value> {
418        self.remove_prune(path, false)
419    }
420
421    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
422    pub fn remove_prune<'a>(&mut self, path: impl TargetPath<'a>, prune: bool) -> Option<Value> {
423        match path.prefix() {
424            PathPrefix::Event => self.value_mut().remove(path.value_path(), prune),
425            PathPrefix::Metadata => self.metadata.value_mut().remove(path.value_path(), prune),
426        }
427    }
428
429    pub fn keys(&self) -> Option<impl Iterator<Item = KeyString> + '_> {
430        match &self.inner.fields {
431            Value::Object(map) => Some(util::log::keys(map)),
432            _ => None,
433        }
434    }
435
436    /// If the event root value is a map, build and return an iterator to event field and value pairs.
437    /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
438    pub fn all_event_fields(
439        &self,
440    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
441        self.as_map().map(all_fields)
442    }
443
444    /// Similar to [`LogEvent::all_event_fields`], but doesn't traverse individual array elements.
445    pub fn all_event_fields_skip_array_elements(
446        &self,
447    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
448        self.as_map().map(all_fields_skip_array_elements)
449    }
450
451    /// If the metadata root value is a map, build and return an iterator to metadata field and value pairs.
452    /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
453    pub fn all_metadata_fields(
454        &self,
455    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
456        match self.metadata.value() {
457            Value::Object(metadata_map) => Some(all_metadata_fields(metadata_map)),
458            _ => None,
459        }
460    }
461
462    /// Returns an iterator of all fields if the value is an Object. Otherwise, a single field is
463    /// returned with a "message" key. Field names that are could be interpreted as alternate paths
464    /// (i.e. containing periods, square brackets, etc) are quoted.
465    pub fn convert_to_fields(&self) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
466        if let Some(map) = self.as_map() {
467            util::log::all_fields(map)
468        } else {
469            util::log::all_fields_non_object_root(self.value())
470        }
471    }
472
473    /// Returns an iterator of all fields if the value is an Object. Otherwise, a single field is
474    /// returned with a "message" key. Field names are not quoted.
475    pub fn convert_to_fields_unquoted(
476        &self,
477    ) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
478        if let Some(map) = self.as_map() {
479            util::log::all_fields_unquoted(map)
480        } else {
481            util::log::all_fields_non_object_root(self.value())
482        }
483    }
484
485    pub fn is_empty_object(&self) -> bool {
486        if let Some(map) = self.as_map() {
487            map.is_empty()
488        } else {
489            false
490        }
491    }
492
493    pub fn as_map(&self) -> Option<&ObjectMap> {
494        match self.value() {
495            Value::Object(map) => Some(map),
496            _ => None,
497        }
498    }
499
500    pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> {
501        match self.value_mut() {
502            Value::Object(map) => Some(map),
503            _ => None,
504        }
505    }
506
507    /// Merge all fields specified at `fields` from `incoming` to `current`.
508    /// Note that `fields` containing dots and other special characters will be treated as a single segment.
509    pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
510        for field in fields {
511            let field_path = event_path!(field.as_ref());
512            let Some(incoming_val) = incoming.remove(field_path) else {
513                continue;
514            };
515            match self.get_mut(field_path) {
516                None => {
517                    self.insert(field_path, incoming_val);
518                }
519                Some(current_val) => current_val.merge(incoming_val),
520            }
521        }
522        self.metadata.merge(incoming.metadata);
523    }
524}
525
526/// Log Namespace utility methods. These can only be used when an event has a
527/// valid schema definition set (which should be on every event in transforms and sinks).
528impl LogEvent {
529    /// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace)
530    /// or from the message key set on the "Global Log Schema" (Legacy namespace).
531    pub fn message_path(&self) -> Option<&OwnedTargetPath> {
532        match self.namespace() {
533            LogNamespace::Vector => self.find_key_by_meaning("message"),
534            LogNamespace::Legacy => log_schema().message_key_target_path(),
535        }
536    }
537
538    /// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
539    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
540    pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
541        match self.namespace() {
542            LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
543            LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
544        }
545    }
546
547    /// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace)
548    /// or from the host key set on the "Global Log Schema" (Legacy namespace).
549    pub fn host_path(&self) -> Option<&OwnedTargetPath> {
550        match self.namespace() {
551            LogNamespace::Vector => self.find_key_by_meaning("host"),
552            LogNamespace::Legacy => log_schema().host_key_target_path(),
553        }
554    }
555
556    /// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
557    /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
558    pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
559        match self.namespace() {
560            LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
561            LogNamespace::Legacy => log_schema().source_type_key_target_path(),
562        }
563    }
564
565    /// Fetches the `message` of the event. This is either from the "message" semantic meaning (Vector namespace)
566    /// or from the message key set on the "Global Log Schema" (Legacy namespace).
567    pub fn get_message(&self) -> Option<&Value> {
568        match self.namespace() {
569            LogNamespace::Vector => self.get_by_meaning("message"),
570            LogNamespace::Legacy => log_schema()
571                .message_key_target_path()
572                .and_then(|key| self.get(key)),
573        }
574    }
575
576    /// Fetches the `timestamp` of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
577    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
578    pub fn get_timestamp(&self) -> Option<&Value> {
579        match self.namespace() {
580            LogNamespace::Vector => self.get_by_meaning("timestamp"),
581            LogNamespace::Legacy => log_schema()
582                .timestamp_key_target_path()
583                .and_then(|key| self.get(key)),
584        }
585    }
586
587    /// Removes the `timestamp` from the event. This is either from the "timestamp" semantic meaning (Vector namespace)
588    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
589    pub fn remove_timestamp(&mut self) -> Option<Value> {
590        self.timestamp_path()
591            .cloned()
592            .and_then(|key| self.remove(&key))
593    }
594
595    /// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace)
596    /// or from the host key set on the "Global Log Schema" (Legacy namespace).
597    pub fn get_host(&self) -> Option<&Value> {
598        match self.namespace() {
599            LogNamespace::Vector => self.get_by_meaning("host"),
600            LogNamespace::Legacy => log_schema()
601                .host_key_target_path()
602                .and_then(|key| self.get(key)),
603        }
604    }
605
606    /// Fetches the `source_type` of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
607    /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
608    pub fn get_source_type(&self) -> Option<&Value> {
609        match self.namespace() {
610            LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
611            LogNamespace::Legacy => log_schema()
612                .source_type_key_target_path()
613                .and_then(|key| self.get(key)),
614        }
615    }
616}
617
618impl MaybeAsLogMut for LogEvent {
619    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
620        Some(self)
621    }
622}
623
624impl EventDataEq for LogEvent {
625    fn event_data_eq(&self, other: &Self) -> bool {
626        self.inner.fields == other.inner.fields && self.metadata.event_data_eq(&other.metadata)
627    }
628}
629
630#[cfg(any(test, feature = "test"))]
631mod test_utils {
632    use super::{Bytes, LogEvent, Utc, log_schema};
633
634    // these rely on the global log schema, which is no longer supported when using the
635    // "LogNamespace::Vector" namespace.
636    // The tests that rely on this are testing the "Legacy" log namespace. As these
637    // tests are updated, they should be migrated away from using these implementations
638    // to make it more clear which namespace is being used
639
640    impl From<Bytes> for LogEvent {
641        fn from(message: Bytes) -> Self {
642            let mut log = LogEvent::default();
643            log.maybe_insert(log_schema().message_key_target_path(), message);
644            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
645                log.insert(timestamp_key, Utc::now());
646            }
647            log
648        }
649    }
650
651    impl From<&str> for LogEvent {
652        fn from(message: &str) -> Self {
653            message.to_owned().into()
654        }
655    }
656
657    impl From<String> for LogEvent {
658        fn from(message: String) -> Self {
659            Bytes::from(message).into()
660        }
661    }
662}
663
664impl From<Value> for LogEvent {
665    fn from(value: Value) -> Self {
666        Self::from_parts(value, EventMetadata::default())
667    }
668}
669
670impl From<ObjectMap> for LogEvent {
671    fn from(map: ObjectMap) -> Self {
672        Self::from_parts(Value::Object(map), EventMetadata::default())
673    }
674}
675
676impl From<HashMap<KeyString, Value>> for LogEvent {
677    fn from(map: HashMap<KeyString, Value>) -> Self {
678        Self::from_parts(
679            Value::Object(map.into_iter().collect::<ObjectMap>()),
680            EventMetadata::default(),
681        )
682    }
683}
684
685impl TryFrom<serde_json::Value> for LogEvent {
686    type Error = crate::Error;
687
688    fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
689        match map {
690            serde_json::Value::Object(fields) => Ok(LogEvent::from(
691                fields
692                    .into_iter()
693                    .map(|(k, v)| (k.into(), v.into()))
694                    .collect::<ObjectMap>(),
695            )),
696            _ => Err(crate::Error::from(
697                "Attempted to convert non-Object JSON into a LogEvent.",
698            )),
699        }
700    }
701}
702
703impl TryInto<serde_json::Value> for LogEvent {
704    type Error = crate::Error;
705
706    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
707        Ok(serde_json::to_value(&self.inner.fields)?)
708    }
709}
710
711#[cfg(any(test, feature = "test"))]
712impl<T> std::ops::Index<T> for LogEvent
713where
714    T: AsRef<str>,
715{
716    type Output = Value;
717
718    fn index(&self, key: T) -> &Value {
719        self.parse_path_and_get_value(key.as_ref())
720            .ok()
721            .flatten()
722            .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
723    }
724}
725
726impl<K, V> Extend<(K, V)> for LogEvent
727where
728    K: AsRef<str>,
729    V: Into<Value>,
730{
731    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
732        for (k, v) in iter {
733            if let Ok(path) = parse_target_path(k.as_ref()) {
734                self.insert(&path, v.into());
735            }
736        }
737    }
738}
739
740// Allow converting any kind of appropriate key/value iterator directly into a LogEvent.
741impl<K: AsRef<str>, V: Into<Value>> FromIterator<(K, V)> for LogEvent {
742    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
743        let mut log_event = Self::default();
744        log_event.extend(iter);
745        log_event
746    }
747}
748
749impl Serialize for LogEvent {
750    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
751    where
752        S: Serializer,
753    {
754        self.value().serialize(serializer)
755    }
756}
757
758// Tracing owned target paths used for tracing to log event conversions.
759struct TracingTargetPaths {
760    pub(crate) timestamp: OwnedTargetPath,
761    pub(crate) kind: OwnedTargetPath,
762    pub(crate) module_path: OwnedTargetPath,
763    pub(crate) level: OwnedTargetPath,
764    pub(crate) target: OwnedTargetPath,
765}
766
767/// Lazily initialized singleton.
768static TRACING_TARGET_PATHS: LazyLock<TracingTargetPaths> = LazyLock::new(|| TracingTargetPaths {
769    timestamp: OwnedTargetPath::event(owned_value_path!("timestamp")),
770    kind: OwnedTargetPath::event(owned_value_path!("metadata", "kind")),
771    level: OwnedTargetPath::event(owned_value_path!("metadata", "level")),
772    module_path: OwnedTargetPath::event(owned_value_path!("metadata", "module_path")),
773    target: OwnedTargetPath::event(owned_value_path!("metadata", "target")),
774});
775
776impl From<&tracing::Event<'_>> for LogEvent {
777    fn from(event: &tracing::Event<'_>) -> Self {
778        let now = chrono::Utc::now();
779        let mut maker = LogEvent::default();
780        event.record(&mut maker);
781
782        let mut log = maker;
783        log.insert(&TRACING_TARGET_PATHS.timestamp, now);
784
785        let meta = event.metadata();
786        log.insert(
787            &TRACING_TARGET_PATHS.kind,
788            if meta.is_event() {
789                Value::Bytes("event".to_string().into())
790            } else if meta.is_span() {
791                Value::Bytes("span".to_string().into())
792            } else {
793                Value::Null
794            },
795        );
796        log.insert(&TRACING_TARGET_PATHS.level, meta.level().to_string());
797        log.insert(
798            &TRACING_TARGET_PATHS.module_path,
799            meta.module_path()
800                .map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
801        );
802        log.insert(&TRACING_TARGET_PATHS.target, meta.target().to_string());
803        log
804    }
805}
806
807/// Note that `tracing::field::Field` containing dots and other special characters will be treated as a single segment.
808impl tracing::field::Visit for LogEvent {
809    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
810        self.insert(event_path!(field.name()), value.to_string());
811    }
812
813    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
814        self.insert(event_path!(field.name()), format!("{value:?}"));
815    }
816
817    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
818        self.insert(event_path!(field.name()), value);
819    }
820
821    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
822        let field_path = event_path!(field.name());
823        let converted: Result<i64, _> = value.try_into();
824        match converted {
825            Ok(value) => self.insert(field_path, value),
826            Err(_) => self.insert(field_path, value.to_string()),
827        };
828    }
829
830    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
831        self.insert(event_path!(field.name()), value);
832    }
833}
834
835#[cfg(test)]
836mod test {
837    use lookup::event_path;
838    use uuid::Version;
839    use vrl::{btreemap, value};
840
841    use super::*;
842    use crate::test_util::open_fixture;
843
844    // The following two tests assert that renaming a key has no effect if the
845    // keys are equivalent, whether the key exists in the log or not.
846    #[test]
847    fn rename_key_flat_equiv_exists() {
848        let value = value!({
849            one: 1,
850            two: 2
851        });
852
853        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
854        base.rename_key(event_path!("one"), event_path!("one"));
855        let (actual_fields, _) = base.into_parts();
856
857        assert_eq!(value, actual_fields);
858    }
859    #[test]
860    fn rename_key_flat_equiv_not_exists() {
861        let value = value!({
862            one: 1,
863            two: 2
864        });
865
866        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
867        base.rename_key(event_path!("three"), event_path!("three"));
868        let (actual_fields, _) = base.into_parts();
869
870        assert_eq!(value, actual_fields);
871    }
872    // Assert that renaming a key has no effect if the key does not originally
873    // exist in the log, when the to -> from keys are not identical.
874    #[test]
875    fn rename_key_flat_not_exists() {
876        let value = value!({
877            one: 1,
878            two: 2
879        });
880
881        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
882        base.rename_key(event_path!("three"), event_path!("four"));
883        let (actual_fields, _) = base.into_parts();
884
885        assert_eq!(value, actual_fields);
886    }
887    // Assert that renaming a key has the effect of moving the value from one
888    // key name to another if the key exists.
889    #[test]
890    fn rename_key_flat_no_overlap() {
891        let value = value!({
892            one: 1,
893            two: 2
894        });
895
896        let mut expected_value = value.clone();
897        let one = expected_value.remove("one", true).unwrap();
898        expected_value.insert("three", one);
899
900        let mut base = LogEvent::from_parts(value, EventMetadata::default());
901        base.rename_key(event_path!("one"), event_path!("three"));
902        let (actual_fields, _) = base.into_parts();
903
904        assert_eq!(expected_value, actual_fields);
905    }
906    // Assert that renaming a key has the effect of moving the value from one
907    // key name to another if the key exists and will overwrite another key if
908    // it exists.
909    #[test]
910    fn rename_key_flat_overlap() {
911        let value = value!({
912            one: 1,
913            two: 2
914        });
915
916        let mut expected_value = value.clone();
917        let val = expected_value.remove("one", true).unwrap();
918        expected_value.insert("two", val);
919
920        let mut base = LogEvent::from_parts(value, EventMetadata::default());
921        base.rename_key(event_path!("one"), event_path!("two"));
922        let (actual_value, _) = base.into_parts();
923
924        assert_eq!(expected_value, actual_value);
925    }
926
927    #[test]
928    fn insert() {
929        let mut log = LogEvent::default();
930
931        let old = log.insert("foo", "foo");
932
933        assert_eq!(log.get("foo"), Some(&"foo".into()));
934        assert_eq!(old, None);
935    }
936
937    #[test]
938    fn insert_existing() {
939        let mut log = LogEvent::default();
940        log.insert("foo", "foo");
941
942        let old = log.insert("foo", "bar");
943
944        assert_eq!(log.get("foo"), Some(&"bar".into()));
945        assert_eq!(old, Some("foo".into()));
946    }
947
948    #[test]
949    fn try_insert() {
950        let mut log = LogEvent::default();
951
952        log.try_insert("foo", "foo");
953
954        assert_eq!(log.get("foo"), Some(&"foo".into()));
955    }
956
957    #[test]
958    fn try_insert_existing() {
959        let mut log = LogEvent::default();
960        log.insert("foo", "foo");
961
962        log.try_insert("foo", "bar");
963
964        assert_eq!(log.get("foo"), Some(&"foo".into()));
965    }
966
967    #[test]
968    fn try_insert_dotted() {
969        let mut log = LogEvent::default();
970
971        log.try_insert("foo.bar", "foo");
972
973        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
974        assert_eq!(log.get(event_path!("foo.bar")), None);
975    }
976
977    #[test]
978    fn try_insert_existing_dotted() {
979        let mut log = LogEvent::default();
980        log.insert("foo.bar", "foo");
981
982        log.try_insert("foo.bar", "bar");
983
984        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
985        assert_eq!(log.get(event_path!("foo.bar")), None);
986    }
987
988    #[test]
989    fn try_insert_flat() {
990        let mut log = LogEvent::default();
991
992        log.try_insert(event_path!("foo"), "foo");
993
994        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
995    }
996
997    #[test]
998    fn try_insert_flat_existing() {
999        let mut log = LogEvent::default();
1000        log.insert(event_path!("foo"), "foo");
1001
1002        log.try_insert(event_path!("foo"), "bar");
1003
1004        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
1005    }
1006
1007    #[test]
1008    fn try_insert_flat_dotted() {
1009        let mut log = LogEvent::default();
1010
1011        log.try_insert(event_path!("foo.bar"), "foo");
1012
1013        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1014        assert_eq!(log.get("foo.bar"), None);
1015    }
1016
1017    #[test]
1018    fn try_insert_flat_existing_dotted() {
1019        let mut log = LogEvent::default();
1020        log.insert(event_path!("foo.bar"), "foo");
1021
1022        log.try_insert(event_path!("foo.bar"), "bar");
1023
1024        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1025        assert_eq!(log.get("foo.bar"), None);
1026    }
1027
1028    // This test iterates over the `tests/data/fixtures/log_event` folder and:
1029    //
1030    //   * Ensures the EventLog parsed from bytes and turned into a
1031    //   serde_json::Value are equal to the item being just plain parsed as
1032    //   json.
1033    //
1034    // Basically: This test makes sure we aren't mutilating any content users
1035    // might be sending.
1036    #[test]
1037    fn json_value_to_vector_log_event_to_json_value() {
1038        const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event";
1039
1040        for fixture_file in std::fs::read_dir(FIXTURE_ROOT).unwrap() {
1041            match fixture_file {
1042                Ok(fixture_file) => {
1043                    let path = fixture_file.path();
1044                    tracing::trace!(?path, "Opening.");
1045                    let serde_value = open_fixture(&path).unwrap();
1046
1047                    let vector_value = LogEvent::try_from(serde_value.clone()).unwrap();
1048                    let serde_value_again: serde_json::Value = vector_value.try_into().unwrap();
1049
1050                    assert_eq!(serde_value, serde_value_again);
1051                }
1052                _ => panic!("This test should never read Err'ing test fixtures."),
1053            }
1054        }
1055    }
1056
1057    fn assert_merge_value(
1058        current: impl Into<Value>,
1059        incoming: impl Into<Value>,
1060        expected: impl Into<Value>,
1061    ) {
1062        let mut merged = current.into();
1063        merged.merge(incoming.into());
1064        assert_eq!(merged, expected.into());
1065    }
1066
1067    #[test]
1068    fn merge_value_works_correctly() {
1069        assert_merge_value("hello ", "world", "hello world");
1070
1071        assert_merge_value(true, false, false);
1072        assert_merge_value(false, true, true);
1073
1074        assert_merge_value("my_val", true, true);
1075        assert_merge_value(true, "my_val", "my_val");
1076
1077        assert_merge_value(1, 2, 2);
1078    }
1079
1080    #[test]
1081    fn merge_event_combines_values_accordingly() {
1082        // Specify the fields that will be merged.
1083        // Only the ones listed will be merged from the `incoming` event
1084        // to the `current`.
1085        let fields_to_merge = vec![
1086            "merge".to_string(),
1087            "merge_a".to_string(),
1088            "merge_b".to_string(),
1089            "merge_c".to_string(),
1090        ];
1091
1092        let current = {
1093            let mut log = LogEvent::default();
1094
1095            log.insert("merge", "hello "); // will be concatenated with the `merged` from `incoming`.
1096            log.insert("do_not_merge", "my_first_value"); // will remain as is, since it's not selected for merging.
1097
1098            log.insert("merge_a", true); // will be overwritten with the `merge_a` from `incoming` (since it's a non-bytes kind).
1099            log.insert("merge_b", 123i64); // will be overwritten with the `merge_b` from `incoming` (since it's a non-bytes kind).
1100
1101            log.insert("a", true); // will remain as is since it's not selected for merge.
1102            log.insert("b", 123i64); // will remain as is since it's not selected for merge.
1103
1104            // `c` is not present in the `current`, and not selected for merge,
1105            // so it won't be included in the final event.
1106
1107            log
1108        };
1109
1110        let incoming = {
1111            let mut log = LogEvent::default();
1112
1113            log.insert("merge", "world"); // will be concatenated to the `merge` from `current`.
1114            log.insert("do_not_merge", "my_second_value"); // will be ignored, since it's not selected for merge.
1115
1116            log.insert("merge_b", 456i64); // will be merged in as `456`.
1117            log.insert("merge_c", false); // will be merged in as `false`.
1118
1119            // `a` will remain as-is, since it's not marked for merge and
1120            // neither is it specified in the `incoming` event.
1121            log.insert("b", 456i64); // `b` not marked for merge, will not change.
1122            log.insert("c", true); // `c` not marked for merge, will be ignored.
1123
1124            log
1125        };
1126
1127        let mut merged = current;
1128        merged.merge(incoming, &fields_to_merge);
1129
1130        let expected = {
1131            let mut log = LogEvent::default();
1132            log.insert("merge", "hello world");
1133            log.insert("do_not_merge", "my_first_value");
1134            log.insert("a", true);
1135            log.insert("b", 123i64);
1136            log.insert("merge_a", true);
1137            log.insert("merge_b", 456i64);
1138            log.insert("merge_c", false);
1139            log
1140        };
1141
1142        vector_common::assert_event_data_eq!(merged, expected);
1143    }
1144
1145    #[test]
1146    fn event_fields_iter() {
1147        let mut log = LogEvent::default();
1148        log.insert("a", 0);
1149        log.insert("a.b", 1);
1150        log.insert("c", 2);
1151        let actual: Vec<(KeyString, Value)> = log
1152            .all_event_fields()
1153            .unwrap()
1154            .map(|(s, v)| (s, v.clone()))
1155            .collect();
1156        assert_eq!(
1157            actual,
1158            vec![("a.b".into(), 1.into()), ("c".into(), 2.into())]
1159        );
1160    }
1161
1162    #[test]
1163    fn metadata_fields_iter() {
1164        let mut log = LogEvent::default();
1165        log.insert("%a", 0);
1166        log.insert("%a.b", 1);
1167        log.insert("%c", 2);
1168        let actual: Vec<(KeyString, Value)> = log
1169            .all_metadata_fields()
1170            .unwrap()
1171            .map(|(s, v)| (s, v.clone()))
1172            .collect();
1173        assert_eq!(
1174            actual,
1175            vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())]
1176        );
1177    }
1178
1179    #[test]
1180    fn skip_array_elements() {
1181        let log = LogEvent::from(Value::from(btreemap! {
1182            "arr" => [1],
1183            "obj" => btreemap! {
1184                "arr" => [1,2,3]
1185            },
1186        }));
1187
1188        let actual: Vec<(KeyString, Value)> = log
1189            .all_event_fields_skip_array_elements()
1190            .unwrap()
1191            .map(|(s, v)| (s, v.clone()))
1192            .collect();
1193        assert_eq!(
1194            actual,
1195            vec![
1196                ("arr".into(), [1].into()),
1197                ("obj.arr".into(), [1, 2, 3].into())
1198            ]
1199        );
1200    }
1201
1202    #[test]
1203    fn metadata_set_unique_uuid_v7_source_event_id() {
1204        // Check if event id is UUID v7
1205        let log1 = LogEvent::default();
1206        assert_eq!(
1207            log1.metadata()
1208                .source_event_id()
1209                .expect("source_event_id should be auto-generated for new events")
1210                .get_version(),
1211            Some(Version::SortRand)
1212        );
1213
1214        // Check if event id is unique on creation
1215        let log2 = LogEvent::default();
1216        assert_ne!(
1217            log1.metadata().source_event_id(),
1218            log2.metadata().source_event_id()
1219        );
1220    }
1221}