vector_core/event/
log_event.rs

1use std::{
2    collections::HashMap,
3    convert::{TryFrom, TryInto},
4    fmt::Debug,
5    iter::FromIterator,
6    mem::size_of,
7    num::NonZeroUsize,
8    sync::{Arc, LazyLock},
9};
10
11use crate::event::util::log::all_fields_skip_array_elements;
12use bytes::Bytes;
13use chrono::Utc;
14
15use crossbeam_utils::atomic::AtomicCell;
16use lookup::{lookup_v2::TargetPath, metadata_path, path, PathPrefix};
17use serde::{Deserialize, Serialize, Serializer};
18use vector_common::{
19    byte_size_of::ByteSizeOf,
20    internal_event::{OptionalTag, TaggedEventsSent},
21    json_size::{JsonSize, NonZeroJsonSize},
22    request_metadata::GetEventCountTags,
23    EventDataEq,
24};
25use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};
26use vrl::{event_path, owned_value_path};
27
28use super::{
29    estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
30    finalization::{BatchNotifier, EventFinalizer},
31    metadata::EventMetadata,
32    util, EventFinalizers, Finalizable, KeyString, ObjectMap, Value,
33};
34use crate::config::LogNamespace;
35use crate::config::{log_schema, telemetry};
36use crate::event::util::log::{all_fields, all_metadata_fields};
37use crate::event::MaybeAsLogMut;
38
39static VECTOR_SOURCE_TYPE_PATH: LazyLock<Option<OwnedTargetPath>> = LazyLock::new(|| {
40    Some(OwnedTargetPath::metadata(owned_value_path!(
41        "vector",
42        "source_type"
43    )))
44});
45
46#[derive(Debug, Deserialize)]
47struct Inner {
48    #[serde(flatten)]
49    fields: Value,
50
51    #[serde(skip)]
52    size_cache: AtomicCell<Option<NonZeroUsize>>,
53
54    #[serde(skip)]
55    json_encoded_size_cache: AtomicCell<Option<NonZeroJsonSize>>,
56}
57
58impl Inner {
59    fn invalidate(&self) {
60        self.size_cache.store(None);
61        self.json_encoded_size_cache.store(None);
62    }
63
64    fn as_value(&self) -> &Value {
65        &self.fields
66    }
67}
68
69impl ByteSizeOf for Inner {
70    fn size_of(&self) -> usize {
71        self.size_cache
72            .load()
73            .unwrap_or_else(|| {
74                let size = size_of::<Self>() + self.allocated_bytes();
75                // The size of self will always be non-zero, and
76                // adding the allocated bytes cannot make it overflow
77                // since `usize` has a range the same as pointer
78                // space. Hence, the expect below cannot fail.
79                let size = NonZeroUsize::new(size).expect("Size cannot be zero");
80                self.size_cache.store(Some(size));
81                size
82            })
83            .into()
84    }
85
86    fn allocated_bytes(&self) -> usize {
87        self.fields.allocated_bytes()
88    }
89}
90
91impl EstimatedJsonEncodedSizeOf for Inner {
92    fn estimated_json_encoded_size_of(&self) -> JsonSize {
93        self.json_encoded_size_cache
94            .load()
95            .unwrap_or_else(|| {
96                let size = self.fields.estimated_json_encoded_size_of();
97                let size = NonZeroJsonSize::new(size).expect("Size cannot be zero");
98
99                self.json_encoded_size_cache.store(Some(size));
100                size
101            })
102            .into()
103    }
104}
105
106impl Clone for Inner {
107    fn clone(&self) -> Self {
108        Self {
109            fields: self.fields.clone(),
110            // This clone is only ever used in combination with
111            // `Arc::make_mut`, so don't bother fetching the size
112            // cache to copy it since it will be invalidated anyways.
113            size_cache: None.into(),
114
115            // This clone is only ever used in combination with
116            // `Arc::make_mut`, so don't bother fetching the size
117            // cache to copy it since it will be invalidated anyways.
118            json_encoded_size_cache: None.into(),
119        }
120    }
121}
122
123impl Default for Inner {
124    fn default() -> Self {
125        Self {
126            // **IMPORTANT:** Due to numerous legacy reasons this **must** be a Map variant.
127            fields: Value::Object(Default::default()),
128            size_cache: Default::default(),
129            json_encoded_size_cache: Default::default(),
130        }
131    }
132}
133
134impl From<Value> for Inner {
135    fn from(fields: Value) -> Self {
136        Self {
137            fields,
138            size_cache: Default::default(),
139            json_encoded_size_cache: Default::default(),
140        }
141    }
142}
143
144impl PartialEq for Inner {
145    fn eq(&self, other: &Self) -> bool {
146        self.fields.eq(&other.fields)
147    }
148}
149
150#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
151pub struct LogEvent {
152    #[serde(flatten)]
153    inner: Arc<Inner>,
154
155    #[serde(skip)]
156    metadata: EventMetadata,
157}
158
159impl LogEvent {
160    /// This used to be the implementation for `LogEvent::from(&'str)`, but this is now only
161    /// valid for `LogNamespace::Legacy`
162    pub fn from_str_legacy(msg: impl Into<String>) -> Self {
163        let mut log = LogEvent::default();
164        log.maybe_insert(log_schema().message_key_target_path(), msg.into());
165
166        if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
167            log.insert(timestamp_key, Utc::now());
168        }
169
170        log
171    }
172
173    /// This used to be the implementation for `LogEvent::from(Bytes)`, but this is now only
174    /// valid for `LogNamespace::Legacy`
175    pub fn from_bytes_legacy(msg: &Bytes) -> Self {
176        Self::from_str_legacy(String::from_utf8_lossy(msg.as_ref()).to_string())
177    }
178
179    pub fn value(&self) -> &Value {
180        self.inner.as_ref().as_value()
181    }
182
183    pub fn value_mut(&mut self) -> &mut Value {
184        let result = Arc::make_mut(&mut self.inner);
185        // We MUST invalidate the inner size cache when making a
186        // mutable copy, since the _next_ action will modify the data.
187        result.invalidate();
188        &mut result.fields
189    }
190
191    pub fn metadata(&self) -> &EventMetadata {
192        &self.metadata
193    }
194
195    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
196        &mut self.metadata
197    }
198
199    /// This detects the log namespace used at runtime by checking for the existence
200    /// of the read-only "vector" metadata, which only exists (and is required to exist)
201    /// with the `Vector` log namespace.
202    pub fn namespace(&self) -> LogNamespace {
203        if self.contains((PathPrefix::Metadata, path!("vector"))) {
204            LogNamespace::Vector
205        } else {
206            LogNamespace::Legacy
207        }
208    }
209}
210
211impl ByteSizeOf for LogEvent {
212    fn allocated_bytes(&self) -> usize {
213        self.inner.size_of() + self.metadata.allocated_bytes()
214    }
215}
216
217impl Finalizable for LogEvent {
218    fn take_finalizers(&mut self) -> EventFinalizers {
219        self.metadata.take_finalizers()
220    }
221}
222
223impl EstimatedJsonEncodedSizeOf for LogEvent {
224    fn estimated_json_encoded_size_of(&self) -> JsonSize {
225        self.inner.estimated_json_encoded_size_of()
226    }
227}
228
229impl GetEventCountTags for LogEvent {
230    fn get_tags(&self) -> TaggedEventsSent {
231        let source = if telemetry().tags().emit_source {
232            self.metadata().source_id().cloned().into()
233        } else {
234            OptionalTag::Ignored
235        };
236
237        let service = if telemetry().tags().emit_service {
238            self.get_by_meaning("service")
239                .map(|value| value.to_string_lossy().to_string())
240                .into()
241        } else {
242            OptionalTag::Ignored
243        };
244
245        TaggedEventsSent { source, service }
246    }
247}
248
249impl LogEvent {
250    #[must_use]
251    pub fn new_with_metadata(metadata: EventMetadata) -> Self {
252        Self {
253            inner: Default::default(),
254            metadata,
255        }
256    }
257
258    ///  Create a `LogEvent` from a `Value` and `EventMetadata`
259    pub fn from_parts(value: Value, metadata: EventMetadata) -> Self {
260        Self {
261            inner: Arc::new(value.into()),
262            metadata,
263        }
264    }
265
266    ///  Create a `LogEvent` from an `ObjectMap` and `EventMetadata`
267    pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self {
268        let inner = Arc::new(Inner::from(Value::Object(map)));
269        Self { inner, metadata }
270    }
271
272    /// Convert a `LogEvent` into a tuple of its components
273    pub fn into_parts(mut self) -> (Value, EventMetadata) {
274        self.value_mut();
275
276        let value = Arc::try_unwrap(self.inner)
277            .unwrap_or_else(|_| unreachable!("inner fields already cloned after owning"))
278            .fields;
279        let metadata = self.metadata;
280        (value, metadata)
281    }
282
283    #[must_use]
284    pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
285        self.metadata = self.metadata.with_batch_notifier(batch);
286        self
287    }
288
289    #[must_use]
290    pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
291        self.metadata = self.metadata.with_batch_notifier_option(batch);
292        self
293    }
294
295    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
296        self.metadata.add_finalizer(finalizer);
297    }
298
299    /// Parse the specified `path` and if there are no parsing errors, attempt to get a reference to a value.
300    /// # Errors
301    /// Will return an error if path parsing failed.
302    pub fn parse_path_and_get_value(
303        &self,
304        path: impl AsRef<str>,
305    ) -> Result<Option<&Value>, PathParseError> {
306        parse_target_path(path.as_ref()).map(|path| self.get(&path))
307    }
308
309    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
310    pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
311        match key.prefix() {
312            PathPrefix::Event => self.inner.fields.get(key.value_path()),
313            PathPrefix::Metadata => self.metadata.value().get(key.value_path()),
314        }
315    }
316
317    /// Retrieves the value of a field based on it's meaning.
318    /// This will first check if the value has previously been dropped. It is worth being
319    /// aware that if the field has been dropped and then somehow re-added, we still fetch
320    /// the dropped value here.
321    pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
322        self.metadata().dropped_field(&meaning).or_else(|| {
323            self.metadata()
324                .schema_definition()
325                .meaning_path(meaning.as_ref())
326                .and_then(|path| self.get(path))
327        })
328    }
329
330    /// Retrieves the mutable value of a field based on it's meaning.
331    /// Note that this does _not_ check the dropped fields, unlike `get_by_meaning`, since the
332    /// purpose of the mutable reference is to be able to modify the value and modifying the dropped
333    /// fields has no effect on the resulting event.
334    pub fn get_mut_by_meaning(&mut self, meaning: impl AsRef<str>) -> Option<&mut Value> {
335        Arc::clone(self.metadata.schema_definition())
336            .meaning_path(meaning.as_ref())
337            .and_then(|path| self.get_mut(path))
338    }
339
340    /// Retrieves the target path of a field based on the specified `meaning`.
341    pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
342        self.metadata()
343            .schema_definition()
344            .meaning_path(meaning.as_ref())
345    }
346
347    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
348    pub fn get_mut<'a>(&mut self, path: impl TargetPath<'a>) -> Option<&mut Value> {
349        match path.prefix() {
350            PathPrefix::Event => self.value_mut().get_mut(path.value_path()),
351            PathPrefix::Metadata => self.metadata.value_mut().get_mut(path.value_path()),
352        }
353    }
354
355    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
356    pub fn contains<'a>(&self, path: impl TargetPath<'a>) -> bool {
357        match path.prefix() {
358            PathPrefix::Event => self.value().contains(path.value_path()),
359            PathPrefix::Metadata => self.metadata.value().contains(path.value_path()),
360        }
361    }
362
363    /// Parse the specified `path` and if there are no parsing errors, attempt to insert the specified `value`.
364    ///
365    /// # Errors
366    /// Will return an error if path parsing failed.
367    pub fn parse_path_and_insert(
368        &mut self,
369        path: impl AsRef<str>,
370        value: impl Into<Value>,
371    ) -> Result<Option<Value>, PathParseError> {
372        let target_path = parse_target_path(path.as_ref())?;
373        Ok(self.insert(&target_path, value))
374    }
375
376    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
377    pub fn insert<'a>(
378        &mut self,
379        path: impl TargetPath<'a>,
380        value: impl Into<Value>,
381    ) -> Option<Value> {
382        match path.prefix() {
383            PathPrefix::Event => self.value_mut().insert(path.value_path(), value.into()),
384            PathPrefix::Metadata => self
385                .metadata
386                .value_mut()
387                .insert(path.value_path(), value.into()),
388        }
389    }
390
391    pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
392        if let Some(path) = path {
393            self.insert(path, value);
394        }
395    }
396
397    // deprecated - using this means the schema is unknown
398    pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into<Value>) {
399        if !self.contains(path.clone()) {
400            self.insert(path, value);
401        }
402    }
403
404    /// Rename a key
405    ///
406    /// If `to_key` already exists in the structure its value will be overwritten.
407    pub fn rename_key<'a>(&mut self, from: impl TargetPath<'a>, to: impl TargetPath<'a>) {
408        if let Some(val) = self.remove(from) {
409            self.insert(to, val);
410        }
411    }
412
413    pub fn remove<'a>(&mut self, path: impl TargetPath<'a>) -> Option<Value> {
414        self.remove_prune(path, false)
415    }
416
417    #[allow(clippy::needless_pass_by_value)] // TargetPath is always a reference
418    pub fn remove_prune<'a>(&mut self, path: impl TargetPath<'a>, prune: bool) -> Option<Value> {
419        match path.prefix() {
420            PathPrefix::Event => self.value_mut().remove(path.value_path(), prune),
421            PathPrefix::Metadata => self.metadata.value_mut().remove(path.value_path(), prune),
422        }
423    }
424
425    pub fn keys(&self) -> Option<impl Iterator<Item = KeyString> + '_> {
426        match &self.inner.fields {
427            Value::Object(map) => Some(util::log::keys(map)),
428            _ => None,
429        }
430    }
431
432    /// If the event root value is a map, build and return an iterator to event field and value pairs.
433    /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
434    pub fn all_event_fields(
435        &self,
436    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
437        self.as_map().map(all_fields)
438    }
439
440    /// Similar to [`LogEvent::all_event_fields`], but doesn't traverse individual array elements.
441    pub fn all_event_fields_skip_array_elements(
442        &self,
443    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
444        self.as_map().map(all_fields_skip_array_elements)
445    }
446
447    /// If the metadata root value is a map, build and return an iterator to metadata field and value pairs.
448    /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
449    pub fn all_metadata_fields(
450        &self,
451    ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
452        match self.metadata.value() {
453            Value::Object(metadata_map) => Some(all_metadata_fields(metadata_map)),
454            _ => None,
455        }
456    }
457
458    /// Returns an iterator of all fields if the value is an Object. Otherwise, a single field is
459    /// returned with a "message" key. Field names that are could be interpreted as alternate paths
460    /// (i.e. containing periods, square brackets, etc) are quoted.
461    pub fn convert_to_fields(&self) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
462        if let Some(map) = self.as_map() {
463            util::log::all_fields(map)
464        } else {
465            util::log::all_fields_non_object_root(self.value())
466        }
467    }
468
469    /// Returns an iterator of all fields if the value is an Object. Otherwise, a single field is
470    /// returned with a "message" key. Field names are not quoted.
471    pub fn convert_to_fields_unquoted(
472        &self,
473    ) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
474        if let Some(map) = self.as_map() {
475            util::log::all_fields_unquoted(map)
476        } else {
477            util::log::all_fields_non_object_root(self.value())
478        }
479    }
480
481    pub fn is_empty_object(&self) -> bool {
482        if let Some(map) = self.as_map() {
483            map.is_empty()
484        } else {
485            false
486        }
487    }
488
489    pub fn as_map(&self) -> Option<&ObjectMap> {
490        match self.value() {
491            Value::Object(map) => Some(map),
492            _ => None,
493        }
494    }
495
496    pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> {
497        match self.value_mut() {
498            Value::Object(map) => Some(map),
499            _ => None,
500        }
501    }
502
503    /// Merge all fields specified at `fields` from `incoming` to `current`.
504    /// Note that `fields` containing dots and other special characters will be treated as a single segment.
505    pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
506        for field in fields {
507            let field_path = event_path!(field.as_ref());
508            let Some(incoming_val) = incoming.remove(field_path) else {
509                continue;
510            };
511            match self.get_mut(field_path) {
512                None => {
513                    self.insert(field_path, incoming_val);
514                }
515                Some(current_val) => current_val.merge(incoming_val),
516            }
517        }
518        self.metadata.merge(incoming.metadata);
519    }
520}
521
522/// Log Namespace utility methods. These can only be used when an event has a
523/// valid schema definition set (which should be on every event in transforms and sinks).
524impl LogEvent {
525    /// Fetches the "message" path of the event. This is either from the "message" semantic meaning (Vector namespace)
526    /// or from the message key set on the "Global Log Schema" (Legacy namespace).
527    pub fn message_path(&self) -> Option<&OwnedTargetPath> {
528        match self.namespace() {
529            LogNamespace::Vector => self.find_key_by_meaning("message"),
530            LogNamespace::Legacy => log_schema().message_key_target_path(),
531        }
532    }
533
534    /// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
535    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
536    pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
537        match self.namespace() {
538            LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
539            LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
540        }
541    }
542
543    /// Fetches the `host` path of the event. This is either from the "host" semantic meaning (Vector namespace)
544    /// or from the host key set on the "Global Log Schema" (Legacy namespace).
545    pub fn host_path(&self) -> Option<&OwnedTargetPath> {
546        match self.namespace() {
547            LogNamespace::Vector => self.find_key_by_meaning("host"),
548            LogNamespace::Legacy => log_schema().host_key_target_path(),
549        }
550    }
551
552    /// Fetches the `source_type` path of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
553    /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
554    pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
555        match self.namespace() {
556            LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
557            LogNamespace::Legacy => log_schema().source_type_key_target_path(),
558        }
559    }
560
561    /// Fetches the `message` of the event. This is either from the "message" semantic meaning (Vector namespace)
562    /// or from the message key set on the "Global Log Schema" (Legacy namespace).
563    pub fn get_message(&self) -> Option<&Value> {
564        match self.namespace() {
565            LogNamespace::Vector => self.get_by_meaning("message"),
566            LogNamespace::Legacy => log_schema()
567                .message_key_target_path()
568                .and_then(|key| self.get(key)),
569        }
570    }
571
572    /// Fetches the `timestamp` of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
573    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
574    pub fn get_timestamp(&self) -> Option<&Value> {
575        match self.namespace() {
576            LogNamespace::Vector => self.get_by_meaning("timestamp"),
577            LogNamespace::Legacy => log_schema()
578                .timestamp_key_target_path()
579                .and_then(|key| self.get(key)),
580        }
581    }
582
583    /// Removes the `timestamp` from the event. This is either from the "timestamp" semantic meaning (Vector namespace)
584    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
585    pub fn remove_timestamp(&mut self) -> Option<Value> {
586        self.timestamp_path()
587            .cloned()
588            .and_then(|key| self.remove(&key))
589    }
590
591    /// Fetches the `host` of the event. This is either from the "host" semantic meaning (Vector namespace)
592    /// or from the host key set on the "Global Log Schema" (Legacy namespace).
593    pub fn get_host(&self) -> Option<&Value> {
594        match self.namespace() {
595            LogNamespace::Vector => self.get_by_meaning("host"),
596            LogNamespace::Legacy => log_schema()
597                .host_key_target_path()
598                .and_then(|key| self.get(key)),
599        }
600    }
601
602    /// Fetches the `source_type` of the event. This is either from the `source_type` Vector metadata field (Vector namespace)
603    /// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
604    pub fn get_source_type(&self) -> Option<&Value> {
605        match self.namespace() {
606            LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
607            LogNamespace::Legacy => log_schema()
608                .source_type_key_target_path()
609                .and_then(|key| self.get(key)),
610        }
611    }
612}
613
614impl MaybeAsLogMut for LogEvent {
615    fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
616        Some(self)
617    }
618}
619
620impl EventDataEq for LogEvent {
621    fn event_data_eq(&self, other: &Self) -> bool {
622        self.inner.fields == other.inner.fields && self.metadata.event_data_eq(&other.metadata)
623    }
624}
625
626#[cfg(any(test, feature = "test"))]
627mod test_utils {
628    use super::{log_schema, Bytes, LogEvent, Utc};
629
630    // these rely on the global log schema, which is no longer supported when using the
631    // "LogNamespace::Vector" namespace.
632    // The tests that rely on this are testing the "Legacy" log namespace. As these
633    // tests are updated, they should be migrated away from using these implementations
634    // to make it more clear which namespace is being used
635
636    impl From<Bytes> for LogEvent {
637        fn from(message: Bytes) -> Self {
638            let mut log = LogEvent::default();
639            log.maybe_insert(log_schema().message_key_target_path(), message);
640            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
641                log.insert(timestamp_key, Utc::now());
642            }
643            log
644        }
645    }
646
647    impl From<&str> for LogEvent {
648        fn from(message: &str) -> Self {
649            message.to_owned().into()
650        }
651    }
652
653    impl From<String> for LogEvent {
654        fn from(message: String) -> Self {
655            Bytes::from(message).into()
656        }
657    }
658}
659
660impl From<Value> for LogEvent {
661    fn from(value: Value) -> Self {
662        Self::from_parts(value, EventMetadata::default())
663    }
664}
665
666impl From<ObjectMap> for LogEvent {
667    fn from(map: ObjectMap) -> Self {
668        Self::from_parts(Value::Object(map), EventMetadata::default())
669    }
670}
671
672impl From<HashMap<KeyString, Value>> for LogEvent {
673    fn from(map: HashMap<KeyString, Value>) -> Self {
674        Self::from_parts(
675            Value::Object(map.into_iter().collect::<ObjectMap>()),
676            EventMetadata::default(),
677        )
678    }
679}
680
681impl TryFrom<serde_json::Value> for LogEvent {
682    type Error = crate::Error;
683
684    fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
685        match map {
686            serde_json::Value::Object(fields) => Ok(LogEvent::from(
687                fields
688                    .into_iter()
689                    .map(|(k, v)| (k.into(), v.into()))
690                    .collect::<ObjectMap>(),
691            )),
692            _ => Err(crate::Error::from(
693                "Attempted to convert non-Object JSON into a LogEvent.",
694            )),
695        }
696    }
697}
698
699impl TryInto<serde_json::Value> for LogEvent {
700    type Error = crate::Error;
701
702    fn try_into(self) -> Result<serde_json::Value, Self::Error> {
703        Ok(serde_json::to_value(&self.inner.fields)?)
704    }
705}
706
707#[cfg(any(test, feature = "test"))]
708impl<T> std::ops::Index<T> for LogEvent
709where
710    T: AsRef<str>,
711{
712    type Output = Value;
713
714    fn index(&self, key: T) -> &Value {
715        self.parse_path_and_get_value(key.as_ref())
716            .ok()
717            .flatten()
718            .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
719    }
720}
721
722impl<K, V> Extend<(K, V)> for LogEvent
723where
724    K: AsRef<str>,
725    V: Into<Value>,
726{
727    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
728        for (k, v) in iter {
729            if let Ok(path) = parse_target_path(k.as_ref()) {
730                self.insert(&path, v.into());
731            }
732        }
733    }
734}
735
736// Allow converting any kind of appropriate key/value iterator directly into a LogEvent.
737impl<K: AsRef<str>, V: Into<Value>> FromIterator<(K, V)> for LogEvent {
738    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
739        let mut log_event = Self::default();
740        log_event.extend(iter);
741        log_event
742    }
743}
744
745impl Serialize for LogEvent {
746    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
747    where
748        S: Serializer,
749    {
750        self.value().serialize(serializer)
751    }
752}
753
754// Tracing owned target paths used for tracing to log event conversions.
755struct TracingTargetPaths {
756    pub(crate) timestamp: OwnedTargetPath,
757    pub(crate) kind: OwnedTargetPath,
758    pub(crate) module_path: OwnedTargetPath,
759    pub(crate) level: OwnedTargetPath,
760    pub(crate) target: OwnedTargetPath,
761}
762
763/// Lazily initialized singleton.
764static TRACING_TARGET_PATHS: LazyLock<TracingTargetPaths> = LazyLock::new(|| TracingTargetPaths {
765    timestamp: OwnedTargetPath::event(owned_value_path!("timestamp")),
766    kind: OwnedTargetPath::event(owned_value_path!("metadata", "kind")),
767    level: OwnedTargetPath::event(owned_value_path!("metadata", "level")),
768    module_path: OwnedTargetPath::event(owned_value_path!("metadata", "module_path")),
769    target: OwnedTargetPath::event(owned_value_path!("metadata", "target")),
770});
771
772impl From<&tracing::Event<'_>> for LogEvent {
773    fn from(event: &tracing::Event<'_>) -> Self {
774        let now = chrono::Utc::now();
775        let mut maker = LogEvent::default();
776        event.record(&mut maker);
777
778        let mut log = maker;
779        log.insert(&TRACING_TARGET_PATHS.timestamp, now);
780
781        let meta = event.metadata();
782        log.insert(
783            &TRACING_TARGET_PATHS.kind,
784            if meta.is_event() {
785                Value::Bytes("event".to_string().into())
786            } else if meta.is_span() {
787                Value::Bytes("span".to_string().into())
788            } else {
789                Value::Null
790            },
791        );
792        log.insert(&TRACING_TARGET_PATHS.level, meta.level().to_string());
793        log.insert(
794            &TRACING_TARGET_PATHS.module_path,
795            meta.module_path()
796                .map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
797        );
798        log.insert(&TRACING_TARGET_PATHS.target, meta.target().to_string());
799        log
800    }
801}
802
803/// Note that `tracing::field::Field` containing dots and other special characters will be treated as a single segment.
804impl tracing::field::Visit for LogEvent {
805    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
806        self.insert(event_path!(field.name()), value.to_string());
807    }
808
809    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
810        self.insert(event_path!(field.name()), format!("{value:?}"));
811    }
812
813    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
814        self.insert(event_path!(field.name()), value);
815    }
816
817    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
818        let field_path = event_path!(field.name());
819        let converted: Result<i64, _> = value.try_into();
820        match converted {
821            Ok(value) => self.insert(field_path, value),
822            Err(_) => self.insert(field_path, value.to_string()),
823        };
824    }
825
826    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
827        self.insert(event_path!(field.name()), value);
828    }
829}
830
831#[cfg(test)]
832mod test {
833    use super::*;
834    use crate::test_util::open_fixture;
835    use lookup::event_path;
836    use uuid::Version;
837    use vrl::{btreemap, value};
838
839    // The following two tests assert that renaming a key has no effect if the
840    // keys are equivalent, whether the key exists in the log or not.
841    #[test]
842    fn rename_key_flat_equiv_exists() {
843        let value = value!({
844            one: 1,
845            two: 2
846        });
847
848        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
849        base.rename_key(event_path!("one"), event_path!("one"));
850        let (actual_fields, _) = base.into_parts();
851
852        assert_eq!(value, actual_fields);
853    }
854    #[test]
855    fn rename_key_flat_equiv_not_exists() {
856        let value = value!({
857            one: 1,
858            two: 2
859        });
860
861        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
862        base.rename_key(event_path!("three"), event_path!("three"));
863        let (actual_fields, _) = base.into_parts();
864
865        assert_eq!(value, actual_fields);
866    }
867    // Assert that renaming a key has no effect if the key does not originally
868    // exist in the log, when the to -> from keys are not identical.
869    #[test]
870    fn rename_key_flat_not_exists() {
871        let value = value!({
872            one: 1,
873            two: 2
874        });
875
876        let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
877        base.rename_key(event_path!("three"), event_path!("four"));
878        let (actual_fields, _) = base.into_parts();
879
880        assert_eq!(value, actual_fields);
881    }
882    // Assert that renaming a key has the effect of moving the value from one
883    // key name to another if the key exists.
884    #[test]
885    fn rename_key_flat_no_overlap() {
886        let value = value!({
887            one: 1,
888            two: 2
889        });
890
891        let mut expected_value = value.clone();
892        let one = expected_value.remove("one", true).unwrap();
893        expected_value.insert("three", one);
894
895        let mut base = LogEvent::from_parts(value, EventMetadata::default());
896        base.rename_key(event_path!("one"), event_path!("three"));
897        let (actual_fields, _) = base.into_parts();
898
899        assert_eq!(expected_value, actual_fields);
900    }
901    // Assert that renaming a key has the effect of moving the value from one
902    // key name to another if the key exists and will overwrite another key if
903    // it exists.
904    #[test]
905    fn rename_key_flat_overlap() {
906        let value = value!({
907            one: 1,
908            two: 2
909        });
910
911        let mut expected_value = value.clone();
912        let val = expected_value.remove("one", true).unwrap();
913        expected_value.insert("two", val);
914
915        let mut base = LogEvent::from_parts(value, EventMetadata::default());
916        base.rename_key(event_path!("one"), event_path!("two"));
917        let (actual_value, _) = base.into_parts();
918
919        assert_eq!(expected_value, actual_value);
920    }
921
922    #[test]
923    fn insert() {
924        let mut log = LogEvent::default();
925
926        let old = log.insert("foo", "foo");
927
928        assert_eq!(log.get("foo"), Some(&"foo".into()));
929        assert_eq!(old, None);
930    }
931
932    #[test]
933    fn insert_existing() {
934        let mut log = LogEvent::default();
935        log.insert("foo", "foo");
936
937        let old = log.insert("foo", "bar");
938
939        assert_eq!(log.get("foo"), Some(&"bar".into()));
940        assert_eq!(old, Some("foo".into()));
941    }
942
943    #[test]
944    fn try_insert() {
945        let mut log = LogEvent::default();
946
947        log.try_insert("foo", "foo");
948
949        assert_eq!(log.get("foo"), Some(&"foo".into()));
950    }
951
952    #[test]
953    fn try_insert_existing() {
954        let mut log = LogEvent::default();
955        log.insert("foo", "foo");
956
957        log.try_insert("foo", "bar");
958
959        assert_eq!(log.get("foo"), Some(&"foo".into()));
960    }
961
962    #[test]
963    fn try_insert_dotted() {
964        let mut log = LogEvent::default();
965
966        log.try_insert("foo.bar", "foo");
967
968        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
969        assert_eq!(log.get(event_path!("foo.bar")), None);
970    }
971
972    #[test]
973    fn try_insert_existing_dotted() {
974        let mut log = LogEvent::default();
975        log.insert("foo.bar", "foo");
976
977        log.try_insert("foo.bar", "bar");
978
979        assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
980        assert_eq!(log.get(event_path!("foo.bar")), None);
981    }
982
983    #[test]
984    fn try_insert_flat() {
985        let mut log = LogEvent::default();
986
987        log.try_insert(event_path!("foo"), "foo");
988
989        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
990    }
991
992    #[test]
993    fn try_insert_flat_existing() {
994        let mut log = LogEvent::default();
995        log.insert(event_path!("foo"), "foo");
996
997        log.try_insert(event_path!("foo"), "bar");
998
999        assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
1000    }
1001
1002    #[test]
1003    fn try_insert_flat_dotted() {
1004        let mut log = LogEvent::default();
1005
1006        log.try_insert(event_path!("foo.bar"), "foo");
1007
1008        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1009        assert_eq!(log.get("foo.bar"), None);
1010    }
1011
1012    #[test]
1013    fn try_insert_flat_existing_dotted() {
1014        let mut log = LogEvent::default();
1015        log.insert(event_path!("foo.bar"), "foo");
1016
1017        log.try_insert(event_path!("foo.bar"), "bar");
1018
1019        assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1020        assert_eq!(log.get("foo.bar"), None);
1021    }
1022
1023    // This test iterates over the `tests/data/fixtures/log_event` folder and:
1024    //
1025    //   * Ensures the EventLog parsed from bytes and turned into a
1026    //   serde_json::Value are equal to the item being just plain parsed as
1027    //   json.
1028    //
1029    // Basically: This test makes sure we aren't mutilating any content users
1030    // might be sending.
1031    #[test]
1032    fn json_value_to_vector_log_event_to_json_value() {
1033        const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event";
1034
1035        std::fs::read_dir(FIXTURE_ROOT)
1036            .unwrap()
1037            .for_each(|fixture_file| match fixture_file {
1038                Ok(fixture_file) => {
1039                    let path = fixture_file.path();
1040                    tracing::trace!(?path, "Opening.");
1041                    let serde_value = open_fixture(&path).unwrap();
1042
1043                    let vector_value = LogEvent::try_from(serde_value.clone()).unwrap();
1044                    let serde_value_again: serde_json::Value = vector_value.try_into().unwrap();
1045
1046                    assert_eq!(serde_value, serde_value_again);
1047                }
1048                _ => panic!("This test should never read Err'ing test fixtures."),
1049            });
1050    }
1051
1052    fn assert_merge_value(
1053        current: impl Into<Value>,
1054        incoming: impl Into<Value>,
1055        expected: impl Into<Value>,
1056    ) {
1057        let mut merged = current.into();
1058        merged.merge(incoming.into());
1059        assert_eq!(merged, expected.into());
1060    }
1061
1062    #[test]
1063    fn merge_value_works_correctly() {
1064        assert_merge_value("hello ", "world", "hello world");
1065
1066        assert_merge_value(true, false, false);
1067        assert_merge_value(false, true, true);
1068
1069        assert_merge_value("my_val", true, true);
1070        assert_merge_value(true, "my_val", "my_val");
1071
1072        assert_merge_value(1, 2, 2);
1073    }
1074
1075    #[test]
1076    fn merge_event_combines_values_accordingly() {
1077        // Specify the fields that will be merged.
1078        // Only the ones listed will be merged from the `incoming` event
1079        // to the `current`.
1080        let fields_to_merge = vec![
1081            "merge".to_string(),
1082            "merge_a".to_string(),
1083            "merge_b".to_string(),
1084            "merge_c".to_string(),
1085        ];
1086
1087        let current = {
1088            let mut log = LogEvent::default();
1089
1090            log.insert("merge", "hello "); // will be concatenated with the `merged` from `incoming`.
1091            log.insert("do_not_merge", "my_first_value"); // will remain as is, since it's not selected for merging.
1092
1093            log.insert("merge_a", true); // will be overwritten with the `merge_a` from `incoming` (since it's a non-bytes kind).
1094            log.insert("merge_b", 123i64); // will be overwritten with the `merge_b` from `incoming` (since it's a non-bytes kind).
1095
1096            log.insert("a", true); // will remain as is since it's not selected for merge.
1097            log.insert("b", 123i64); // will remain as is since it's not selected for merge.
1098
1099            // `c` is not present in the `current`, and not selected for merge,
1100            // so it won't be included in the final event.
1101
1102            log
1103        };
1104
1105        let incoming = {
1106            let mut log = LogEvent::default();
1107
1108            log.insert("merge", "world"); // will be concatenated to the `merge` from `current`.
1109            log.insert("do_not_merge", "my_second_value"); // will be ignored, since it's not selected for merge.
1110
1111            log.insert("merge_b", 456i64); // will be merged in as `456`.
1112            log.insert("merge_c", false); // will be merged in as `false`.
1113
1114            // `a` will remain as-is, since it's not marked for merge and
1115            // neither is it specified in the `incoming` event.
1116            log.insert("b", 456i64); // `b` not marked for merge, will not change.
1117            log.insert("c", true); // `c` not marked for merge, will be ignored.
1118
1119            log
1120        };
1121
1122        let mut merged = current;
1123        merged.merge(incoming, &fields_to_merge);
1124
1125        let expected = {
1126            let mut log = LogEvent::default();
1127            log.insert("merge", "hello world");
1128            log.insert("do_not_merge", "my_first_value");
1129            log.insert("a", true);
1130            log.insert("b", 123i64);
1131            log.insert("merge_a", true);
1132            log.insert("merge_b", 456i64);
1133            log.insert("merge_c", false);
1134            log
1135        };
1136
1137        vector_common::assert_event_data_eq!(merged, expected);
1138    }
1139
1140    #[test]
1141    fn event_fields_iter() {
1142        let mut log = LogEvent::default();
1143        log.insert("a", 0);
1144        log.insert("a.b", 1);
1145        log.insert("c", 2);
1146        let actual: Vec<(KeyString, Value)> = log
1147            .all_event_fields()
1148            .unwrap()
1149            .map(|(s, v)| (s, v.clone()))
1150            .collect();
1151        assert_eq!(
1152            actual,
1153            vec![("a.b".into(), 1.into()), ("c".into(), 2.into())]
1154        );
1155    }
1156
1157    #[test]
1158    fn metadata_fields_iter() {
1159        let mut log = LogEvent::default();
1160        log.insert("%a", 0);
1161        log.insert("%a.b", 1);
1162        log.insert("%c", 2);
1163        let actual: Vec<(KeyString, Value)> = log
1164            .all_metadata_fields()
1165            .unwrap()
1166            .map(|(s, v)| (s, v.clone()))
1167            .collect();
1168        assert_eq!(
1169            actual,
1170            vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())]
1171        );
1172    }
1173
1174    #[test]
1175    fn skip_array_elements() {
1176        let log = LogEvent::from(Value::from(btreemap! {
1177            "arr" => [1],
1178            "obj" => btreemap! {
1179                "arr" => [1,2,3]
1180            },
1181        }));
1182
1183        let actual: Vec<(KeyString, Value)> = log
1184            .all_event_fields_skip_array_elements()
1185            .unwrap()
1186            .map(|(s, v)| (s, v.clone()))
1187            .collect();
1188        assert_eq!(
1189            actual,
1190            vec![
1191                ("arr".into(), [1].into()),
1192                ("obj.arr".into(), [1, 2, 3].into())
1193            ]
1194        );
1195    }
1196
1197    #[test]
1198    fn metadata_set_unique_uuid_v7_source_event_id() {
1199        // Check if event id is UUID v7
1200        let log1 = LogEvent::default();
1201        assert_eq!(
1202            log1.metadata()
1203                .source_event_id()
1204                .expect("source_event_id should be auto-generated for new events")
1205                .get_version(),
1206            Some(Version::SortRand)
1207        );
1208
1209        // Check if event id is unique on creation
1210        let log2 = LogEvent::default();
1211        assert_ne!(
1212            log1.metadata().source_event_id(),
1213            log2.metadata().source_event_id()
1214        );
1215    }
1216}