1use std::{
2 collections::HashMap,
3 convert::{TryFrom, TryInto},
4 fmt::Debug,
5 iter::FromIterator,
6 mem::size_of,
7 num::NonZeroUsize,
8 sync::{Arc, LazyLock},
9};
10
11use bytes::Bytes;
12use chrono::Utc;
13use crossbeam_utils::atomic::AtomicCell;
14use lookup::{PathPrefix, lookup_v2::TargetPath, metadata_path, path};
15use serde::{Deserialize, Serialize, Serializer};
16use vector_common::{
17 EventDataEq,
18 byte_size_of::ByteSizeOf,
19 internal_event::{OptionalTag, TaggedEventsSent},
20 json_size::{JsonSize, NonZeroJsonSize},
21 request_metadata::GetEventCountTags,
22};
23use vrl::{
24 event_path, owned_value_path,
25 path::{OwnedTargetPath, PathParseError, parse_target_path},
26};
27
28use super::{
29 EventFinalizers, Finalizable, KeyString, ObjectMap, Value,
30 estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
31 finalization::{BatchNotifier, EventFinalizer},
32 metadata::EventMetadata,
33 util,
34};
35use crate::{
36 config::{LogNamespace, log_schema, telemetry},
37 event::{
38 MaybeAsLogMut,
39 util::log::{all_fields, all_fields_skip_array_elements, all_metadata_fields},
40 },
41};
42
43static VECTOR_SOURCE_TYPE_PATH: LazyLock<Option<OwnedTargetPath>> = LazyLock::new(|| {
44 Some(OwnedTargetPath::metadata(owned_value_path!(
45 "vector",
46 "source_type"
47 )))
48});
49
50#[derive(Debug, Deserialize)]
51struct Inner {
52 #[serde(flatten)]
53 fields: Value,
54
55 #[serde(skip)]
56 size_cache: AtomicCell<Option<NonZeroUsize>>,
57
58 #[serde(skip)]
59 json_encoded_size_cache: AtomicCell<Option<NonZeroJsonSize>>,
60}
61
62impl Inner {
63 fn invalidate(&self) {
64 self.size_cache.store(None);
65 self.json_encoded_size_cache.store(None);
66 }
67
68 fn as_value(&self) -> &Value {
69 &self.fields
70 }
71}
72
73impl ByteSizeOf for Inner {
74 fn size_of(&self) -> usize {
75 self.size_cache
76 .load()
77 .unwrap_or_else(|| {
78 let size = size_of::<Self>() + self.allocated_bytes();
79 let size = NonZeroUsize::new(size).expect("Size cannot be zero");
84 self.size_cache.store(Some(size));
85 size
86 })
87 .into()
88 }
89
90 fn allocated_bytes(&self) -> usize {
91 self.fields.allocated_bytes()
92 }
93}
94
95impl EstimatedJsonEncodedSizeOf for Inner {
96 fn estimated_json_encoded_size_of(&self) -> JsonSize {
97 self.json_encoded_size_cache
98 .load()
99 .unwrap_or_else(|| {
100 let size = self.fields.estimated_json_encoded_size_of();
101 let size = NonZeroJsonSize::new(size).expect("Size cannot be zero");
102
103 self.json_encoded_size_cache.store(Some(size));
104 size
105 })
106 .into()
107 }
108}
109
110impl Clone for Inner {
111 fn clone(&self) -> Self {
112 Self {
113 fields: self.fields.clone(),
114 size_cache: None.into(),
118
119 json_encoded_size_cache: None.into(),
123 }
124 }
125}
126
127impl Default for Inner {
128 fn default() -> Self {
129 Self {
130 fields: Value::Object(Default::default()),
132 size_cache: Default::default(),
133 json_encoded_size_cache: Default::default(),
134 }
135 }
136}
137
138impl From<Value> for Inner {
139 fn from(fields: Value) -> Self {
140 Self {
141 fields,
142 size_cache: Default::default(),
143 json_encoded_size_cache: Default::default(),
144 }
145 }
146}
147
148impl PartialEq for Inner {
149 fn eq(&self, other: &Self) -> bool {
150 self.fields.eq(&other.fields)
151 }
152}
153
154#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
155pub struct LogEvent {
156 #[serde(flatten)]
157 inner: Arc<Inner>,
158
159 #[serde(skip)]
160 metadata: EventMetadata,
161}
162
163impl LogEvent {
164 pub fn from_str_legacy(msg: impl Into<String>) -> Self {
167 let mut log = LogEvent::default();
168 log.maybe_insert(log_schema().message_key_target_path(), msg.into());
169
170 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
171 log.insert(timestamp_key, Utc::now());
172 }
173
174 log
175 }
176
177 pub fn from_bytes_legacy(msg: &Bytes) -> Self {
180 Self::from_str_legacy(String::from_utf8_lossy(msg.as_ref()).to_string())
181 }
182
183 pub fn value(&self) -> &Value {
184 self.inner.as_ref().as_value()
185 }
186
187 pub fn value_mut(&mut self) -> &mut Value {
188 let result = Arc::make_mut(&mut self.inner);
189 result.invalidate();
192 &mut result.fields
193 }
194
195 pub fn metadata(&self) -> &EventMetadata {
196 &self.metadata
197 }
198
199 pub fn metadata_mut(&mut self) -> &mut EventMetadata {
200 &mut self.metadata
201 }
202
203 pub fn namespace(&self) -> LogNamespace {
207 if self.contains((PathPrefix::Metadata, path!("vector"))) {
208 LogNamespace::Vector
209 } else {
210 LogNamespace::Legacy
211 }
212 }
213}
214
215impl ByteSizeOf for LogEvent {
216 fn allocated_bytes(&self) -> usize {
217 self.inner.size_of() + self.metadata.allocated_bytes()
218 }
219}
220
221impl Finalizable for LogEvent {
222 fn take_finalizers(&mut self) -> EventFinalizers {
223 self.metadata.take_finalizers()
224 }
225}
226
227impl EstimatedJsonEncodedSizeOf for LogEvent {
228 fn estimated_json_encoded_size_of(&self) -> JsonSize {
229 self.inner.estimated_json_encoded_size_of()
230 }
231}
232
233impl GetEventCountTags for LogEvent {
234 fn get_tags(&self) -> TaggedEventsSent {
235 let source = if telemetry().tags().emit_source {
236 self.metadata().source_id().cloned().into()
237 } else {
238 OptionalTag::Ignored
239 };
240
241 let service = if telemetry().tags().emit_service {
242 self.get_by_meaning("service")
243 .map(|value| value.to_string_lossy().to_string())
244 .into()
245 } else {
246 OptionalTag::Ignored
247 };
248
249 TaggedEventsSent { source, service }
250 }
251}
252
253impl LogEvent {
254 #[must_use]
255 pub fn new_with_metadata(metadata: EventMetadata) -> Self {
256 Self {
257 inner: Default::default(),
258 metadata,
259 }
260 }
261
262 pub fn from_parts(value: Value, metadata: EventMetadata) -> Self {
264 Self {
265 inner: Arc::new(value.into()),
266 metadata,
267 }
268 }
269
270 pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self {
272 let inner = Arc::new(Inner::from(Value::Object(map)));
273 Self { inner, metadata }
274 }
275
276 pub fn into_parts(mut self) -> (Value, EventMetadata) {
278 self.value_mut();
279
280 let value = Arc::try_unwrap(self.inner)
281 .unwrap_or_else(|_| unreachable!("inner fields already cloned after owning"))
282 .fields;
283 let metadata = self.metadata;
284 (value, metadata)
285 }
286
287 #[must_use]
288 pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
289 self.metadata = self.metadata.with_batch_notifier(batch);
290 self
291 }
292
293 #[must_use]
294 pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
295 self.metadata = self.metadata.with_batch_notifier_option(batch);
296 self
297 }
298
299 pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
300 self.metadata.add_finalizer(finalizer);
301 }
302
303 pub fn parse_path_and_get_value(
307 &self,
308 path: impl AsRef<str>,
309 ) -> Result<Option<&Value>, PathParseError> {
310 parse_target_path(path.as_ref()).map(|path| self.get(&path))
311 }
312
313 #[allow(clippy::needless_pass_by_value)] pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
315 match key.prefix() {
316 PathPrefix::Event => self.inner.fields.get(key.value_path()),
317 PathPrefix::Metadata => self.metadata.value().get(key.value_path()),
318 }
319 }
320
321 pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
326 self.metadata().dropped_field(&meaning).or_else(|| {
327 self.metadata()
328 .schema_definition()
329 .meaning_path(meaning.as_ref())
330 .and_then(|path| self.get(path))
331 })
332 }
333
334 pub fn get_mut_by_meaning(&mut self, meaning: impl AsRef<str>) -> Option<&mut Value> {
339 Arc::clone(self.metadata.schema_definition())
340 .meaning_path(meaning.as_ref())
341 .and_then(|path| self.get_mut(path))
342 }
343
344 pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
346 self.metadata()
347 .schema_definition()
348 .meaning_path(meaning.as_ref())
349 }
350
351 #[allow(clippy::needless_pass_by_value)] pub fn get_mut<'a>(&mut self, path: impl TargetPath<'a>) -> Option<&mut Value> {
353 match path.prefix() {
354 PathPrefix::Event => self.value_mut().get_mut(path.value_path()),
355 PathPrefix::Metadata => self.metadata.value_mut().get_mut(path.value_path()),
356 }
357 }
358
359 #[allow(clippy::needless_pass_by_value)] pub fn contains<'a>(&self, path: impl TargetPath<'a>) -> bool {
361 match path.prefix() {
362 PathPrefix::Event => self.value().contains(path.value_path()),
363 PathPrefix::Metadata => self.metadata.value().contains(path.value_path()),
364 }
365 }
366
367 pub fn parse_path_and_insert(
372 &mut self,
373 path: impl AsRef<str>,
374 value: impl Into<Value>,
375 ) -> Result<Option<Value>, PathParseError> {
376 let target_path = parse_target_path(path.as_ref())?;
377 Ok(self.insert(&target_path, value))
378 }
379
380 #[allow(clippy::needless_pass_by_value)] pub fn insert<'a>(
382 &mut self,
383 path: impl TargetPath<'a>,
384 value: impl Into<Value>,
385 ) -> Option<Value> {
386 match path.prefix() {
387 PathPrefix::Event => self.value_mut().insert(path.value_path(), value.into()),
388 PathPrefix::Metadata => self
389 .metadata
390 .value_mut()
391 .insert(path.value_path(), value.into()),
392 }
393 }
394
395 pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
396 if let Some(path) = path {
397 self.insert(path, value);
398 }
399 }
400
401 pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into<Value>) {
403 if !self.contains(path.clone()) {
404 self.insert(path, value);
405 }
406 }
407
408 pub fn rename_key<'a>(&mut self, from: impl TargetPath<'a>, to: impl TargetPath<'a>) {
412 if let Some(val) = self.remove(from) {
413 self.insert(to, val);
414 }
415 }
416
417 pub fn remove<'a>(&mut self, path: impl TargetPath<'a>) -> Option<Value> {
418 self.remove_prune(path, false)
419 }
420
421 #[allow(clippy::needless_pass_by_value)] pub fn remove_prune<'a>(&mut self, path: impl TargetPath<'a>, prune: bool) -> Option<Value> {
423 match path.prefix() {
424 PathPrefix::Event => self.value_mut().remove(path.value_path(), prune),
425 PathPrefix::Metadata => self.metadata.value_mut().remove(path.value_path(), prune),
426 }
427 }
428
429 pub fn keys(&self) -> Option<impl Iterator<Item = KeyString> + '_> {
430 match &self.inner.fields {
431 Value::Object(map) => Some(util::log::keys(map)),
432 _ => None,
433 }
434 }
435
436 pub fn all_event_fields(
439 &self,
440 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
441 self.as_map().map(all_fields)
442 }
443
444 pub fn all_event_fields_skip_array_elements(
446 &self,
447 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
448 self.as_map().map(all_fields_skip_array_elements)
449 }
450
451 pub fn all_metadata_fields(
454 &self,
455 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
456 match self.metadata.value() {
457 Value::Object(metadata_map) => Some(all_metadata_fields(metadata_map)),
458 _ => None,
459 }
460 }
461
462 pub fn convert_to_fields(&self) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
466 if let Some(map) = self.as_map() {
467 util::log::all_fields(map)
468 } else {
469 util::log::all_fields_non_object_root(self.value())
470 }
471 }
472
473 pub fn convert_to_fields_unquoted(
476 &self,
477 ) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
478 if let Some(map) = self.as_map() {
479 util::log::all_fields_unquoted(map)
480 } else {
481 util::log::all_fields_non_object_root(self.value())
482 }
483 }
484
485 pub fn is_empty_object(&self) -> bool {
486 if let Some(map) = self.as_map() {
487 map.is_empty()
488 } else {
489 false
490 }
491 }
492
493 pub fn as_map(&self) -> Option<&ObjectMap> {
494 match self.value() {
495 Value::Object(map) => Some(map),
496 _ => None,
497 }
498 }
499
500 pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> {
501 match self.value_mut() {
502 Value::Object(map) => Some(map),
503 _ => None,
504 }
505 }
506
507 pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
510 for field in fields {
511 let field_path = event_path!(field.as_ref());
512 let Some(incoming_val) = incoming.remove(field_path) else {
513 continue;
514 };
515 match self.get_mut(field_path) {
516 None => {
517 self.insert(field_path, incoming_val);
518 }
519 Some(current_val) => current_val.merge(incoming_val),
520 }
521 }
522 self.metadata.merge(incoming.metadata);
523 }
524}
525
526impl LogEvent {
529 pub fn message_path(&self) -> Option<&OwnedTargetPath> {
532 match self.namespace() {
533 LogNamespace::Vector => self.find_key_by_meaning("message"),
534 LogNamespace::Legacy => log_schema().message_key_target_path(),
535 }
536 }
537
538 pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
541 match self.namespace() {
542 LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
543 LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
544 }
545 }
546
547 pub fn host_path(&self) -> Option<&OwnedTargetPath> {
550 match self.namespace() {
551 LogNamespace::Vector => self.find_key_by_meaning("host"),
552 LogNamespace::Legacy => log_schema().host_key_target_path(),
553 }
554 }
555
556 pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
559 match self.namespace() {
560 LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
561 LogNamespace::Legacy => log_schema().source_type_key_target_path(),
562 }
563 }
564
565 pub fn get_message(&self) -> Option<&Value> {
568 match self.namespace() {
569 LogNamespace::Vector => self.get_by_meaning("message"),
570 LogNamespace::Legacy => log_schema()
571 .message_key_target_path()
572 .and_then(|key| self.get(key)),
573 }
574 }
575
576 pub fn get_timestamp(&self) -> Option<&Value> {
579 match self.namespace() {
580 LogNamespace::Vector => self.get_by_meaning("timestamp"),
581 LogNamespace::Legacy => log_schema()
582 .timestamp_key_target_path()
583 .and_then(|key| self.get(key)),
584 }
585 }
586
587 pub fn remove_timestamp(&mut self) -> Option<Value> {
590 self.timestamp_path()
591 .cloned()
592 .and_then(|key| self.remove(&key))
593 }
594
595 pub fn get_host(&self) -> Option<&Value> {
598 match self.namespace() {
599 LogNamespace::Vector => self.get_by_meaning("host"),
600 LogNamespace::Legacy => log_schema()
601 .host_key_target_path()
602 .and_then(|key| self.get(key)),
603 }
604 }
605
606 pub fn get_source_type(&self) -> Option<&Value> {
609 match self.namespace() {
610 LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
611 LogNamespace::Legacy => log_schema()
612 .source_type_key_target_path()
613 .and_then(|key| self.get(key)),
614 }
615 }
616}
617
618impl MaybeAsLogMut for LogEvent {
619 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
620 Some(self)
621 }
622}
623
624impl EventDataEq for LogEvent {
625 fn event_data_eq(&self, other: &Self) -> bool {
626 self.inner.fields == other.inner.fields && self.metadata.event_data_eq(&other.metadata)
627 }
628}
629
630#[cfg(any(test, feature = "test"))]
631mod test_utils {
632 use super::{Bytes, LogEvent, Utc, log_schema};
633
634 impl From<Bytes> for LogEvent {
641 fn from(message: Bytes) -> Self {
642 let mut log = LogEvent::default();
643 log.maybe_insert(log_schema().message_key_target_path(), message);
644 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
645 log.insert(timestamp_key, Utc::now());
646 }
647 log
648 }
649 }
650
651 impl From<&str> for LogEvent {
652 fn from(message: &str) -> Self {
653 message.to_owned().into()
654 }
655 }
656
657 impl From<String> for LogEvent {
658 fn from(message: String) -> Self {
659 Bytes::from(message).into()
660 }
661 }
662}
663
664impl From<Value> for LogEvent {
665 fn from(value: Value) -> Self {
666 Self::from_parts(value, EventMetadata::default())
667 }
668}
669
670impl From<ObjectMap> for LogEvent {
671 fn from(map: ObjectMap) -> Self {
672 Self::from_parts(Value::Object(map), EventMetadata::default())
673 }
674}
675
676impl From<HashMap<KeyString, Value>> for LogEvent {
677 fn from(map: HashMap<KeyString, Value>) -> Self {
678 Self::from_parts(
679 Value::Object(map.into_iter().collect::<ObjectMap>()),
680 EventMetadata::default(),
681 )
682 }
683}
684
685impl TryFrom<serde_json::Value> for LogEvent {
686 type Error = crate::Error;
687
688 fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
689 match map {
690 serde_json::Value::Object(fields) => Ok(LogEvent::from(
691 fields
692 .into_iter()
693 .map(|(k, v)| (k.into(), v.into()))
694 .collect::<ObjectMap>(),
695 )),
696 _ => Err(crate::Error::from(
697 "Attempted to convert non-Object JSON into a LogEvent.",
698 )),
699 }
700 }
701}
702
703impl TryInto<serde_json::Value> for LogEvent {
704 type Error = crate::Error;
705
706 fn try_into(self) -> Result<serde_json::Value, Self::Error> {
707 Ok(serde_json::to_value(&self.inner.fields)?)
708 }
709}
710
711#[cfg(any(test, feature = "test"))]
712impl<T> std::ops::Index<T> for LogEvent
713where
714 T: AsRef<str>,
715{
716 type Output = Value;
717
718 fn index(&self, key: T) -> &Value {
719 self.parse_path_and_get_value(key.as_ref())
720 .ok()
721 .flatten()
722 .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
723 }
724}
725
726impl<K, V> Extend<(K, V)> for LogEvent
727where
728 K: AsRef<str>,
729 V: Into<Value>,
730{
731 fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
732 for (k, v) in iter {
733 if let Ok(path) = parse_target_path(k.as_ref()) {
734 self.insert(&path, v.into());
735 }
736 }
737 }
738}
739
740impl<K: AsRef<str>, V: Into<Value>> FromIterator<(K, V)> for LogEvent {
742 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
743 let mut log_event = Self::default();
744 log_event.extend(iter);
745 log_event
746 }
747}
748
749impl Serialize for LogEvent {
750 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
751 where
752 S: Serializer,
753 {
754 self.value().serialize(serializer)
755 }
756}
757
758struct TracingTargetPaths {
760 pub(crate) timestamp: OwnedTargetPath,
761 pub(crate) kind: OwnedTargetPath,
762 pub(crate) module_path: OwnedTargetPath,
763 pub(crate) level: OwnedTargetPath,
764 pub(crate) target: OwnedTargetPath,
765}
766
767static TRACING_TARGET_PATHS: LazyLock<TracingTargetPaths> = LazyLock::new(|| TracingTargetPaths {
769 timestamp: OwnedTargetPath::event(owned_value_path!("timestamp")),
770 kind: OwnedTargetPath::event(owned_value_path!("metadata", "kind")),
771 level: OwnedTargetPath::event(owned_value_path!("metadata", "level")),
772 module_path: OwnedTargetPath::event(owned_value_path!("metadata", "module_path")),
773 target: OwnedTargetPath::event(owned_value_path!("metadata", "target")),
774});
775
776impl From<&tracing::Event<'_>> for LogEvent {
777 fn from(event: &tracing::Event<'_>) -> Self {
778 let now = chrono::Utc::now();
779 let mut maker = LogEvent::default();
780 event.record(&mut maker);
781
782 let mut log = maker;
783 log.insert(&TRACING_TARGET_PATHS.timestamp, now);
784
785 let meta = event.metadata();
786 log.insert(
787 &TRACING_TARGET_PATHS.kind,
788 if meta.is_event() {
789 Value::Bytes("event".to_string().into())
790 } else if meta.is_span() {
791 Value::Bytes("span".to_string().into())
792 } else {
793 Value::Null
794 },
795 );
796 log.insert(&TRACING_TARGET_PATHS.level, meta.level().to_string());
797 log.insert(
798 &TRACING_TARGET_PATHS.module_path,
799 meta.module_path()
800 .map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
801 );
802 log.insert(&TRACING_TARGET_PATHS.target, meta.target().to_string());
803 log
804 }
805}
806
807impl tracing::field::Visit for LogEvent {
809 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
810 self.insert(event_path!(field.name()), value.to_string());
811 }
812
813 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
814 self.insert(event_path!(field.name()), format!("{value:?}"));
815 }
816
817 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
818 self.insert(event_path!(field.name()), value);
819 }
820
821 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
822 let field_path = event_path!(field.name());
823 let converted: Result<i64, _> = value.try_into();
824 match converted {
825 Ok(value) => self.insert(field_path, value),
826 Err(_) => self.insert(field_path, value.to_string()),
827 };
828 }
829
830 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
831 self.insert(event_path!(field.name()), value);
832 }
833}
834
835#[cfg(test)]
836mod test {
837 use lookup::event_path;
838 use uuid::Version;
839 use vrl::{btreemap, value};
840
841 use super::*;
842 use crate::test_util::open_fixture;
843
844 #[test]
847 fn rename_key_flat_equiv_exists() {
848 let value = value!({
849 one: 1,
850 two: 2
851 });
852
853 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
854 base.rename_key(event_path!("one"), event_path!("one"));
855 let (actual_fields, _) = base.into_parts();
856
857 assert_eq!(value, actual_fields);
858 }
859 #[test]
860 fn rename_key_flat_equiv_not_exists() {
861 let value = value!({
862 one: 1,
863 two: 2
864 });
865
866 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
867 base.rename_key(event_path!("three"), event_path!("three"));
868 let (actual_fields, _) = base.into_parts();
869
870 assert_eq!(value, actual_fields);
871 }
872 #[test]
875 fn rename_key_flat_not_exists() {
876 let value = value!({
877 one: 1,
878 two: 2
879 });
880
881 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
882 base.rename_key(event_path!("three"), event_path!("four"));
883 let (actual_fields, _) = base.into_parts();
884
885 assert_eq!(value, actual_fields);
886 }
887 #[test]
890 fn rename_key_flat_no_overlap() {
891 let value = value!({
892 one: 1,
893 two: 2
894 });
895
896 let mut expected_value = value.clone();
897 let one = expected_value.remove("one", true).unwrap();
898 expected_value.insert("three", one);
899
900 let mut base = LogEvent::from_parts(value, EventMetadata::default());
901 base.rename_key(event_path!("one"), event_path!("three"));
902 let (actual_fields, _) = base.into_parts();
903
904 assert_eq!(expected_value, actual_fields);
905 }
906 #[test]
910 fn rename_key_flat_overlap() {
911 let value = value!({
912 one: 1,
913 two: 2
914 });
915
916 let mut expected_value = value.clone();
917 let val = expected_value.remove("one", true).unwrap();
918 expected_value.insert("two", val);
919
920 let mut base = LogEvent::from_parts(value, EventMetadata::default());
921 base.rename_key(event_path!("one"), event_path!("two"));
922 let (actual_value, _) = base.into_parts();
923
924 assert_eq!(expected_value, actual_value);
925 }
926
927 #[test]
928 fn insert() {
929 let mut log = LogEvent::default();
930
931 let old = log.insert("foo", "foo");
932
933 assert_eq!(log.get("foo"), Some(&"foo".into()));
934 assert_eq!(old, None);
935 }
936
937 #[test]
938 fn insert_existing() {
939 let mut log = LogEvent::default();
940 log.insert("foo", "foo");
941
942 let old = log.insert("foo", "bar");
943
944 assert_eq!(log.get("foo"), Some(&"bar".into()));
945 assert_eq!(old, Some("foo".into()));
946 }
947
948 #[test]
949 fn try_insert() {
950 let mut log = LogEvent::default();
951
952 log.try_insert("foo", "foo");
953
954 assert_eq!(log.get("foo"), Some(&"foo".into()));
955 }
956
957 #[test]
958 fn try_insert_existing() {
959 let mut log = LogEvent::default();
960 log.insert("foo", "foo");
961
962 log.try_insert("foo", "bar");
963
964 assert_eq!(log.get("foo"), Some(&"foo".into()));
965 }
966
967 #[test]
968 fn try_insert_dotted() {
969 let mut log = LogEvent::default();
970
971 log.try_insert("foo.bar", "foo");
972
973 assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
974 assert_eq!(log.get(event_path!("foo.bar")), None);
975 }
976
977 #[test]
978 fn try_insert_existing_dotted() {
979 let mut log = LogEvent::default();
980 log.insert("foo.bar", "foo");
981
982 log.try_insert("foo.bar", "bar");
983
984 assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
985 assert_eq!(log.get(event_path!("foo.bar")), None);
986 }
987
988 #[test]
989 fn try_insert_flat() {
990 let mut log = LogEvent::default();
991
992 log.try_insert(event_path!("foo"), "foo");
993
994 assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
995 }
996
997 #[test]
998 fn try_insert_flat_existing() {
999 let mut log = LogEvent::default();
1000 log.insert(event_path!("foo"), "foo");
1001
1002 log.try_insert(event_path!("foo"), "bar");
1003
1004 assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
1005 }
1006
1007 #[test]
1008 fn try_insert_flat_dotted() {
1009 let mut log = LogEvent::default();
1010
1011 log.try_insert(event_path!("foo.bar"), "foo");
1012
1013 assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1014 assert_eq!(log.get("foo.bar"), None);
1015 }
1016
1017 #[test]
1018 fn try_insert_flat_existing_dotted() {
1019 let mut log = LogEvent::default();
1020 log.insert(event_path!("foo.bar"), "foo");
1021
1022 log.try_insert(event_path!("foo.bar"), "bar");
1023
1024 assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1025 assert_eq!(log.get("foo.bar"), None);
1026 }
1027
1028 #[test]
1037 fn json_value_to_vector_log_event_to_json_value() {
1038 const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event";
1039
1040 for fixture_file in std::fs::read_dir(FIXTURE_ROOT).unwrap() {
1041 match fixture_file {
1042 Ok(fixture_file) => {
1043 let path = fixture_file.path();
1044 tracing::trace!(?path, "Opening.");
1045 let serde_value = open_fixture(&path).unwrap();
1046
1047 let vector_value = LogEvent::try_from(serde_value.clone()).unwrap();
1048 let serde_value_again: serde_json::Value = vector_value.try_into().unwrap();
1049
1050 assert_eq!(serde_value, serde_value_again);
1051 }
1052 _ => panic!("This test should never read Err'ing test fixtures."),
1053 }
1054 }
1055 }
1056
1057 fn assert_merge_value(
1058 current: impl Into<Value>,
1059 incoming: impl Into<Value>,
1060 expected: impl Into<Value>,
1061 ) {
1062 let mut merged = current.into();
1063 merged.merge(incoming.into());
1064 assert_eq!(merged, expected.into());
1065 }
1066
1067 #[test]
1068 fn merge_value_works_correctly() {
1069 assert_merge_value("hello ", "world", "hello world");
1070
1071 assert_merge_value(true, false, false);
1072 assert_merge_value(false, true, true);
1073
1074 assert_merge_value("my_val", true, true);
1075 assert_merge_value(true, "my_val", "my_val");
1076
1077 assert_merge_value(1, 2, 2);
1078 }
1079
1080 #[test]
1081 fn merge_event_combines_values_accordingly() {
1082 let fields_to_merge = vec![
1086 "merge".to_string(),
1087 "merge_a".to_string(),
1088 "merge_b".to_string(),
1089 "merge_c".to_string(),
1090 ];
1091
1092 let current = {
1093 let mut log = LogEvent::default();
1094
1095 log.insert("merge", "hello "); log.insert("do_not_merge", "my_first_value"); log.insert("merge_a", true); log.insert("merge_b", 123i64); log.insert("a", true); log.insert("b", 123i64); log
1108 };
1109
1110 let incoming = {
1111 let mut log = LogEvent::default();
1112
1113 log.insert("merge", "world"); log.insert("do_not_merge", "my_second_value"); log.insert("merge_b", 456i64); log.insert("merge_c", false); log.insert("b", 456i64); log.insert("c", true); log
1125 };
1126
1127 let mut merged = current;
1128 merged.merge(incoming, &fields_to_merge);
1129
1130 let expected = {
1131 let mut log = LogEvent::default();
1132 log.insert("merge", "hello world");
1133 log.insert("do_not_merge", "my_first_value");
1134 log.insert("a", true);
1135 log.insert("b", 123i64);
1136 log.insert("merge_a", true);
1137 log.insert("merge_b", 456i64);
1138 log.insert("merge_c", false);
1139 log
1140 };
1141
1142 vector_common::assert_event_data_eq!(merged, expected);
1143 }
1144
1145 #[test]
1146 fn event_fields_iter() {
1147 let mut log = LogEvent::default();
1148 log.insert("a", 0);
1149 log.insert("a.b", 1);
1150 log.insert("c", 2);
1151 let actual: Vec<(KeyString, Value)> = log
1152 .all_event_fields()
1153 .unwrap()
1154 .map(|(s, v)| (s, v.clone()))
1155 .collect();
1156 assert_eq!(
1157 actual,
1158 vec![("a.b".into(), 1.into()), ("c".into(), 2.into())]
1159 );
1160 }
1161
1162 #[test]
1163 fn metadata_fields_iter() {
1164 let mut log = LogEvent::default();
1165 log.insert("%a", 0);
1166 log.insert("%a.b", 1);
1167 log.insert("%c", 2);
1168 let actual: Vec<(KeyString, Value)> = log
1169 .all_metadata_fields()
1170 .unwrap()
1171 .map(|(s, v)| (s, v.clone()))
1172 .collect();
1173 assert_eq!(
1174 actual,
1175 vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())]
1176 );
1177 }
1178
1179 #[test]
1180 fn skip_array_elements() {
1181 let log = LogEvent::from(Value::from(btreemap! {
1182 "arr" => [1],
1183 "obj" => btreemap! {
1184 "arr" => [1,2,3]
1185 },
1186 }));
1187
1188 let actual: Vec<(KeyString, Value)> = log
1189 .all_event_fields_skip_array_elements()
1190 .unwrap()
1191 .map(|(s, v)| (s, v.clone()))
1192 .collect();
1193 assert_eq!(
1194 actual,
1195 vec![
1196 ("arr".into(), [1].into()),
1197 ("obj.arr".into(), [1, 2, 3].into())
1198 ]
1199 );
1200 }
1201
1202 #[test]
1203 fn metadata_set_unique_uuid_v7_source_event_id() {
1204 let log1 = LogEvent::default();
1206 assert_eq!(
1207 log1.metadata()
1208 .source_event_id()
1209 .expect("source_event_id should be auto-generated for new events")
1210 .get_version(),
1211 Some(Version::SortRand)
1212 );
1213
1214 let log2 = LogEvent::default();
1216 assert_ne!(
1217 log1.metadata().source_event_id(),
1218 log2.metadata().source_event_id()
1219 );
1220 }
1221}