1use std::{
2 collections::HashMap,
3 convert::{TryFrom, TryInto},
4 fmt::Debug,
5 iter::FromIterator,
6 mem::size_of,
7 num::NonZeroUsize,
8 sync::{Arc, LazyLock},
9};
10
11use crate::event::util::log::all_fields_skip_array_elements;
12use bytes::Bytes;
13use chrono::Utc;
14
15use crossbeam_utils::atomic::AtomicCell;
16use lookup::{lookup_v2::TargetPath, metadata_path, path, PathPrefix};
17use serde::{Deserialize, Serialize, Serializer};
18use vector_common::{
19 byte_size_of::ByteSizeOf,
20 internal_event::{OptionalTag, TaggedEventsSent},
21 json_size::{JsonSize, NonZeroJsonSize},
22 request_metadata::GetEventCountTags,
23 EventDataEq,
24};
25use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError};
26use vrl::{event_path, owned_value_path};
27
28use super::{
29 estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
30 finalization::{BatchNotifier, EventFinalizer},
31 metadata::EventMetadata,
32 util, EventFinalizers, Finalizable, KeyString, ObjectMap, Value,
33};
34use crate::config::LogNamespace;
35use crate::config::{log_schema, telemetry};
36use crate::event::util::log::{all_fields, all_metadata_fields};
37use crate::event::MaybeAsLogMut;
38
39static VECTOR_SOURCE_TYPE_PATH: LazyLock<Option<OwnedTargetPath>> = LazyLock::new(|| {
40 Some(OwnedTargetPath::metadata(owned_value_path!(
41 "vector",
42 "source_type"
43 )))
44});
45
46#[derive(Debug, Deserialize)]
47struct Inner {
48 #[serde(flatten)]
49 fields: Value,
50
51 #[serde(skip)]
52 size_cache: AtomicCell<Option<NonZeroUsize>>,
53
54 #[serde(skip)]
55 json_encoded_size_cache: AtomicCell<Option<NonZeroJsonSize>>,
56}
57
58impl Inner {
59 fn invalidate(&self) {
60 self.size_cache.store(None);
61 self.json_encoded_size_cache.store(None);
62 }
63
64 fn as_value(&self) -> &Value {
65 &self.fields
66 }
67}
68
69impl ByteSizeOf for Inner {
70 fn size_of(&self) -> usize {
71 self.size_cache
72 .load()
73 .unwrap_or_else(|| {
74 let size = size_of::<Self>() + self.allocated_bytes();
75 let size = NonZeroUsize::new(size).expect("Size cannot be zero");
80 self.size_cache.store(Some(size));
81 size
82 })
83 .into()
84 }
85
86 fn allocated_bytes(&self) -> usize {
87 self.fields.allocated_bytes()
88 }
89}
90
91impl EstimatedJsonEncodedSizeOf for Inner {
92 fn estimated_json_encoded_size_of(&self) -> JsonSize {
93 self.json_encoded_size_cache
94 .load()
95 .unwrap_or_else(|| {
96 let size = self.fields.estimated_json_encoded_size_of();
97 let size = NonZeroJsonSize::new(size).expect("Size cannot be zero");
98
99 self.json_encoded_size_cache.store(Some(size));
100 size
101 })
102 .into()
103 }
104}
105
106impl Clone for Inner {
107 fn clone(&self) -> Self {
108 Self {
109 fields: self.fields.clone(),
110 size_cache: None.into(),
114
115 json_encoded_size_cache: None.into(),
119 }
120 }
121}
122
123impl Default for Inner {
124 fn default() -> Self {
125 Self {
126 fields: Value::Object(Default::default()),
128 size_cache: Default::default(),
129 json_encoded_size_cache: Default::default(),
130 }
131 }
132}
133
134impl From<Value> for Inner {
135 fn from(fields: Value) -> Self {
136 Self {
137 fields,
138 size_cache: Default::default(),
139 json_encoded_size_cache: Default::default(),
140 }
141 }
142}
143
144impl PartialEq for Inner {
145 fn eq(&self, other: &Self) -> bool {
146 self.fields.eq(&other.fields)
147 }
148}
149
150#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
151pub struct LogEvent {
152 #[serde(flatten)]
153 inner: Arc<Inner>,
154
155 #[serde(skip)]
156 metadata: EventMetadata,
157}
158
159impl LogEvent {
160 pub fn from_str_legacy(msg: impl Into<String>) -> Self {
163 let mut log = LogEvent::default();
164 log.maybe_insert(log_schema().message_key_target_path(), msg.into());
165
166 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
167 log.insert(timestamp_key, Utc::now());
168 }
169
170 log
171 }
172
173 pub fn from_bytes_legacy(msg: &Bytes) -> Self {
176 Self::from_str_legacy(String::from_utf8_lossy(msg.as_ref()).to_string())
177 }
178
179 pub fn value(&self) -> &Value {
180 self.inner.as_ref().as_value()
181 }
182
183 pub fn value_mut(&mut self) -> &mut Value {
184 let result = Arc::make_mut(&mut self.inner);
185 result.invalidate();
188 &mut result.fields
189 }
190
191 pub fn metadata(&self) -> &EventMetadata {
192 &self.metadata
193 }
194
195 pub fn metadata_mut(&mut self) -> &mut EventMetadata {
196 &mut self.metadata
197 }
198
199 pub fn namespace(&self) -> LogNamespace {
203 if self.contains((PathPrefix::Metadata, path!("vector"))) {
204 LogNamespace::Vector
205 } else {
206 LogNamespace::Legacy
207 }
208 }
209}
210
211impl ByteSizeOf for LogEvent {
212 fn allocated_bytes(&self) -> usize {
213 self.inner.size_of() + self.metadata.allocated_bytes()
214 }
215}
216
217impl Finalizable for LogEvent {
218 fn take_finalizers(&mut self) -> EventFinalizers {
219 self.metadata.take_finalizers()
220 }
221}
222
223impl EstimatedJsonEncodedSizeOf for LogEvent {
224 fn estimated_json_encoded_size_of(&self) -> JsonSize {
225 self.inner.estimated_json_encoded_size_of()
226 }
227}
228
229impl GetEventCountTags for LogEvent {
230 fn get_tags(&self) -> TaggedEventsSent {
231 let source = if telemetry().tags().emit_source {
232 self.metadata().source_id().cloned().into()
233 } else {
234 OptionalTag::Ignored
235 };
236
237 let service = if telemetry().tags().emit_service {
238 self.get_by_meaning("service")
239 .map(|value| value.to_string_lossy().to_string())
240 .into()
241 } else {
242 OptionalTag::Ignored
243 };
244
245 TaggedEventsSent { source, service }
246 }
247}
248
249impl LogEvent {
250 #[must_use]
251 pub fn new_with_metadata(metadata: EventMetadata) -> Self {
252 Self {
253 inner: Default::default(),
254 metadata,
255 }
256 }
257
258 pub fn from_parts(value: Value, metadata: EventMetadata) -> Self {
260 Self {
261 inner: Arc::new(value.into()),
262 metadata,
263 }
264 }
265
266 pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self {
268 let inner = Arc::new(Inner::from(Value::Object(map)));
269 Self { inner, metadata }
270 }
271
272 pub fn into_parts(mut self) -> (Value, EventMetadata) {
274 self.value_mut();
275
276 let value = Arc::try_unwrap(self.inner)
277 .unwrap_or_else(|_| unreachable!("inner fields already cloned after owning"))
278 .fields;
279 let metadata = self.metadata;
280 (value, metadata)
281 }
282
283 #[must_use]
284 pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
285 self.metadata = self.metadata.with_batch_notifier(batch);
286 self
287 }
288
289 #[must_use]
290 pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
291 self.metadata = self.metadata.with_batch_notifier_option(batch);
292 self
293 }
294
295 pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
296 self.metadata.add_finalizer(finalizer);
297 }
298
299 pub fn parse_path_and_get_value(
303 &self,
304 path: impl AsRef<str>,
305 ) -> Result<Option<&Value>, PathParseError> {
306 parse_target_path(path.as_ref()).map(|path| self.get(&path))
307 }
308
309 #[allow(clippy::needless_pass_by_value)] pub fn get<'a>(&self, key: impl TargetPath<'a>) -> Option<&Value> {
311 match key.prefix() {
312 PathPrefix::Event => self.inner.fields.get(key.value_path()),
313 PathPrefix::Metadata => self.metadata.value().get(key.value_path()),
314 }
315 }
316
317 pub fn get_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&Value> {
322 self.metadata().dropped_field(&meaning).or_else(|| {
323 self.metadata()
324 .schema_definition()
325 .meaning_path(meaning.as_ref())
326 .and_then(|path| self.get(path))
327 })
328 }
329
330 pub fn get_mut_by_meaning(&mut self, meaning: impl AsRef<str>) -> Option<&mut Value> {
335 Arc::clone(self.metadata.schema_definition())
336 .meaning_path(meaning.as_ref())
337 .and_then(|path| self.get_mut(path))
338 }
339
340 pub fn find_key_by_meaning(&self, meaning: impl AsRef<str>) -> Option<&OwnedTargetPath> {
342 self.metadata()
343 .schema_definition()
344 .meaning_path(meaning.as_ref())
345 }
346
347 #[allow(clippy::needless_pass_by_value)] pub fn get_mut<'a>(&mut self, path: impl TargetPath<'a>) -> Option<&mut Value> {
349 match path.prefix() {
350 PathPrefix::Event => self.value_mut().get_mut(path.value_path()),
351 PathPrefix::Metadata => self.metadata.value_mut().get_mut(path.value_path()),
352 }
353 }
354
355 #[allow(clippy::needless_pass_by_value)] pub fn contains<'a>(&self, path: impl TargetPath<'a>) -> bool {
357 match path.prefix() {
358 PathPrefix::Event => self.value().contains(path.value_path()),
359 PathPrefix::Metadata => self.metadata.value().contains(path.value_path()),
360 }
361 }
362
363 pub fn parse_path_and_insert(
368 &mut self,
369 path: impl AsRef<str>,
370 value: impl Into<Value>,
371 ) -> Result<Option<Value>, PathParseError> {
372 let target_path = parse_target_path(path.as_ref())?;
373 Ok(self.insert(&target_path, value))
374 }
375
376 #[allow(clippy::needless_pass_by_value)] pub fn insert<'a>(
378 &mut self,
379 path: impl TargetPath<'a>,
380 value: impl Into<Value>,
381 ) -> Option<Value> {
382 match path.prefix() {
383 PathPrefix::Event => self.value_mut().insert(path.value_path(), value.into()),
384 PathPrefix::Metadata => self
385 .metadata
386 .value_mut()
387 .insert(path.value_path(), value.into()),
388 }
389 }
390
391 pub fn maybe_insert<'a>(&mut self, path: Option<impl TargetPath<'a>>, value: impl Into<Value>) {
392 if let Some(path) = path {
393 self.insert(path, value);
394 }
395 }
396
397 pub fn try_insert<'a>(&mut self, path: impl TargetPath<'a>, value: impl Into<Value>) {
399 if !self.contains(path.clone()) {
400 self.insert(path, value);
401 }
402 }
403
404 pub fn rename_key<'a>(&mut self, from: impl TargetPath<'a>, to: impl TargetPath<'a>) {
408 if let Some(val) = self.remove(from) {
409 self.insert(to, val);
410 }
411 }
412
413 pub fn remove<'a>(&mut self, path: impl TargetPath<'a>) -> Option<Value> {
414 self.remove_prune(path, false)
415 }
416
417 #[allow(clippy::needless_pass_by_value)] pub fn remove_prune<'a>(&mut self, path: impl TargetPath<'a>, prune: bool) -> Option<Value> {
419 match path.prefix() {
420 PathPrefix::Event => self.value_mut().remove(path.value_path(), prune),
421 PathPrefix::Metadata => self.metadata.value_mut().remove(path.value_path(), prune),
422 }
423 }
424
425 pub fn keys(&self) -> Option<impl Iterator<Item = KeyString> + '_> {
426 match &self.inner.fields {
427 Value::Object(map) => Some(util::log::keys(map)),
428 _ => None,
429 }
430 }
431
432 pub fn all_event_fields(
435 &self,
436 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
437 self.as_map().map(all_fields)
438 }
439
440 pub fn all_event_fields_skip_array_elements(
442 &self,
443 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
444 self.as_map().map(all_fields_skip_array_elements)
445 }
446
447 pub fn all_metadata_fields(
450 &self,
451 ) -> Option<impl Iterator<Item = (KeyString, &Value)> + Serialize> {
452 match self.metadata.value() {
453 Value::Object(metadata_map) => Some(all_metadata_fields(metadata_map)),
454 _ => None,
455 }
456 }
457
458 pub fn convert_to_fields(&self) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
462 if let Some(map) = self.as_map() {
463 util::log::all_fields(map)
464 } else {
465 util::log::all_fields_non_object_root(self.value())
466 }
467 }
468
469 pub fn convert_to_fields_unquoted(
472 &self,
473 ) -> impl Iterator<Item = (KeyString, &Value)> + Serialize {
474 if let Some(map) = self.as_map() {
475 util::log::all_fields_unquoted(map)
476 } else {
477 util::log::all_fields_non_object_root(self.value())
478 }
479 }
480
481 pub fn is_empty_object(&self) -> bool {
482 if let Some(map) = self.as_map() {
483 map.is_empty()
484 } else {
485 false
486 }
487 }
488
489 pub fn as_map(&self) -> Option<&ObjectMap> {
490 match self.value() {
491 Value::Object(map) => Some(map),
492 _ => None,
493 }
494 }
495
496 pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> {
497 match self.value_mut() {
498 Value::Object(map) => Some(map),
499 _ => None,
500 }
501 }
502
503 pub fn merge(&mut self, mut incoming: LogEvent, fields: &[impl AsRef<str>]) {
506 for field in fields {
507 let field_path = event_path!(field.as_ref());
508 let Some(incoming_val) = incoming.remove(field_path) else {
509 continue;
510 };
511 match self.get_mut(field_path) {
512 None => {
513 self.insert(field_path, incoming_val);
514 }
515 Some(current_val) => current_val.merge(incoming_val),
516 }
517 }
518 self.metadata.merge(incoming.metadata);
519 }
520}
521
522impl LogEvent {
525 pub fn message_path(&self) -> Option<&OwnedTargetPath> {
528 match self.namespace() {
529 LogNamespace::Vector => self.find_key_by_meaning("message"),
530 LogNamespace::Legacy => log_schema().message_key_target_path(),
531 }
532 }
533
534 pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
537 match self.namespace() {
538 LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
539 LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
540 }
541 }
542
543 pub fn host_path(&self) -> Option<&OwnedTargetPath> {
546 match self.namespace() {
547 LogNamespace::Vector => self.find_key_by_meaning("host"),
548 LogNamespace::Legacy => log_schema().host_key_target_path(),
549 }
550 }
551
552 pub fn source_type_path(&self) -> Option<&OwnedTargetPath> {
555 match self.namespace() {
556 LogNamespace::Vector => VECTOR_SOURCE_TYPE_PATH.as_ref(),
557 LogNamespace::Legacy => log_schema().source_type_key_target_path(),
558 }
559 }
560
561 pub fn get_message(&self) -> Option<&Value> {
564 match self.namespace() {
565 LogNamespace::Vector => self.get_by_meaning("message"),
566 LogNamespace::Legacy => log_schema()
567 .message_key_target_path()
568 .and_then(|key| self.get(key)),
569 }
570 }
571
572 pub fn get_timestamp(&self) -> Option<&Value> {
575 match self.namespace() {
576 LogNamespace::Vector => self.get_by_meaning("timestamp"),
577 LogNamespace::Legacy => log_schema()
578 .timestamp_key_target_path()
579 .and_then(|key| self.get(key)),
580 }
581 }
582
583 pub fn remove_timestamp(&mut self) -> Option<Value> {
586 self.timestamp_path()
587 .cloned()
588 .and_then(|key| self.remove(&key))
589 }
590
591 pub fn get_host(&self) -> Option<&Value> {
594 match self.namespace() {
595 LogNamespace::Vector => self.get_by_meaning("host"),
596 LogNamespace::Legacy => log_schema()
597 .host_key_target_path()
598 .and_then(|key| self.get(key)),
599 }
600 }
601
602 pub fn get_source_type(&self) -> Option<&Value> {
605 match self.namespace() {
606 LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
607 LogNamespace::Legacy => log_schema()
608 .source_type_key_target_path()
609 .and_then(|key| self.get(key)),
610 }
611 }
612}
613
614impl MaybeAsLogMut for LogEvent {
615 fn maybe_as_log_mut(&mut self) -> Option<&mut LogEvent> {
616 Some(self)
617 }
618}
619
620impl EventDataEq for LogEvent {
621 fn event_data_eq(&self, other: &Self) -> bool {
622 self.inner.fields == other.inner.fields && self.metadata.event_data_eq(&other.metadata)
623 }
624}
625
626#[cfg(any(test, feature = "test"))]
627mod test_utils {
628 use super::{log_schema, Bytes, LogEvent, Utc};
629
630 impl From<Bytes> for LogEvent {
637 fn from(message: Bytes) -> Self {
638 let mut log = LogEvent::default();
639 log.maybe_insert(log_schema().message_key_target_path(), message);
640 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
641 log.insert(timestamp_key, Utc::now());
642 }
643 log
644 }
645 }
646
647 impl From<&str> for LogEvent {
648 fn from(message: &str) -> Self {
649 message.to_owned().into()
650 }
651 }
652
653 impl From<String> for LogEvent {
654 fn from(message: String) -> Self {
655 Bytes::from(message).into()
656 }
657 }
658}
659
660impl From<Value> for LogEvent {
661 fn from(value: Value) -> Self {
662 Self::from_parts(value, EventMetadata::default())
663 }
664}
665
666impl From<ObjectMap> for LogEvent {
667 fn from(map: ObjectMap) -> Self {
668 Self::from_parts(Value::Object(map), EventMetadata::default())
669 }
670}
671
672impl From<HashMap<KeyString, Value>> for LogEvent {
673 fn from(map: HashMap<KeyString, Value>) -> Self {
674 Self::from_parts(
675 Value::Object(map.into_iter().collect::<ObjectMap>()),
676 EventMetadata::default(),
677 )
678 }
679}
680
681impl TryFrom<serde_json::Value> for LogEvent {
682 type Error = crate::Error;
683
684 fn try_from(map: serde_json::Value) -> Result<Self, Self::Error> {
685 match map {
686 serde_json::Value::Object(fields) => Ok(LogEvent::from(
687 fields
688 .into_iter()
689 .map(|(k, v)| (k.into(), v.into()))
690 .collect::<ObjectMap>(),
691 )),
692 _ => Err(crate::Error::from(
693 "Attempted to convert non-Object JSON into a LogEvent.",
694 )),
695 }
696 }
697}
698
699impl TryInto<serde_json::Value> for LogEvent {
700 type Error = crate::Error;
701
702 fn try_into(self) -> Result<serde_json::Value, Self::Error> {
703 Ok(serde_json::to_value(&self.inner.fields)?)
704 }
705}
706
707#[cfg(any(test, feature = "test"))]
708impl<T> std::ops::Index<T> for LogEvent
709where
710 T: AsRef<str>,
711{
712 type Output = Value;
713
714 fn index(&self, key: T) -> &Value {
715 self.parse_path_and_get_value(key.as_ref())
716 .ok()
717 .flatten()
718 .unwrap_or_else(|| panic!("Key is not found: {:?}", key.as_ref()))
719 }
720}
721
722impl<K, V> Extend<(K, V)> for LogEvent
723where
724 K: AsRef<str>,
725 V: Into<Value>,
726{
727 fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
728 for (k, v) in iter {
729 if let Ok(path) = parse_target_path(k.as_ref()) {
730 self.insert(&path, v.into());
731 }
732 }
733 }
734}
735
736impl<K: AsRef<str>, V: Into<Value>> FromIterator<(K, V)> for LogEvent {
738 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
739 let mut log_event = Self::default();
740 log_event.extend(iter);
741 log_event
742 }
743}
744
745impl Serialize for LogEvent {
746 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
747 where
748 S: Serializer,
749 {
750 self.value().serialize(serializer)
751 }
752}
753
754struct TracingTargetPaths {
756 pub(crate) timestamp: OwnedTargetPath,
757 pub(crate) kind: OwnedTargetPath,
758 pub(crate) module_path: OwnedTargetPath,
759 pub(crate) level: OwnedTargetPath,
760 pub(crate) target: OwnedTargetPath,
761}
762
763static TRACING_TARGET_PATHS: LazyLock<TracingTargetPaths> = LazyLock::new(|| TracingTargetPaths {
765 timestamp: OwnedTargetPath::event(owned_value_path!("timestamp")),
766 kind: OwnedTargetPath::event(owned_value_path!("metadata", "kind")),
767 level: OwnedTargetPath::event(owned_value_path!("metadata", "level")),
768 module_path: OwnedTargetPath::event(owned_value_path!("metadata", "module_path")),
769 target: OwnedTargetPath::event(owned_value_path!("metadata", "target")),
770});
771
772impl From<&tracing::Event<'_>> for LogEvent {
773 fn from(event: &tracing::Event<'_>) -> Self {
774 let now = chrono::Utc::now();
775 let mut maker = LogEvent::default();
776 event.record(&mut maker);
777
778 let mut log = maker;
779 log.insert(&TRACING_TARGET_PATHS.timestamp, now);
780
781 let meta = event.metadata();
782 log.insert(
783 &TRACING_TARGET_PATHS.kind,
784 if meta.is_event() {
785 Value::Bytes("event".to_string().into())
786 } else if meta.is_span() {
787 Value::Bytes("span".to_string().into())
788 } else {
789 Value::Null
790 },
791 );
792 log.insert(&TRACING_TARGET_PATHS.level, meta.level().to_string());
793 log.insert(
794 &TRACING_TARGET_PATHS.module_path,
795 meta.module_path()
796 .map_or(Value::Null, |mp| Value::Bytes(mp.to_string().into())),
797 );
798 log.insert(&TRACING_TARGET_PATHS.target, meta.target().to_string());
799 log
800 }
801}
802
803impl tracing::field::Visit for LogEvent {
805 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
806 self.insert(event_path!(field.name()), value.to_string());
807 }
808
809 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn Debug) {
810 self.insert(event_path!(field.name()), format!("{value:?}"));
811 }
812
813 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
814 self.insert(event_path!(field.name()), value);
815 }
816
817 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
818 let field_path = event_path!(field.name());
819 let converted: Result<i64, _> = value.try_into();
820 match converted {
821 Ok(value) => self.insert(field_path, value),
822 Err(_) => self.insert(field_path, value.to_string()),
823 };
824 }
825
826 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
827 self.insert(event_path!(field.name()), value);
828 }
829}
830
831#[cfg(test)]
832mod test {
833 use super::*;
834 use crate::test_util::open_fixture;
835 use lookup::event_path;
836 use uuid::Version;
837 use vrl::{btreemap, value};
838
839 #[test]
842 fn rename_key_flat_equiv_exists() {
843 let value = value!({
844 one: 1,
845 two: 2
846 });
847
848 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
849 base.rename_key(event_path!("one"), event_path!("one"));
850 let (actual_fields, _) = base.into_parts();
851
852 assert_eq!(value, actual_fields);
853 }
854 #[test]
855 fn rename_key_flat_equiv_not_exists() {
856 let value = value!({
857 one: 1,
858 two: 2
859 });
860
861 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
862 base.rename_key(event_path!("three"), event_path!("three"));
863 let (actual_fields, _) = base.into_parts();
864
865 assert_eq!(value, actual_fields);
866 }
867 #[test]
870 fn rename_key_flat_not_exists() {
871 let value = value!({
872 one: 1,
873 two: 2
874 });
875
876 let mut base = LogEvent::from_parts(value.clone(), EventMetadata::default());
877 base.rename_key(event_path!("three"), event_path!("four"));
878 let (actual_fields, _) = base.into_parts();
879
880 assert_eq!(value, actual_fields);
881 }
882 #[test]
885 fn rename_key_flat_no_overlap() {
886 let value = value!({
887 one: 1,
888 two: 2
889 });
890
891 let mut expected_value = value.clone();
892 let one = expected_value.remove("one", true).unwrap();
893 expected_value.insert("three", one);
894
895 let mut base = LogEvent::from_parts(value, EventMetadata::default());
896 base.rename_key(event_path!("one"), event_path!("three"));
897 let (actual_fields, _) = base.into_parts();
898
899 assert_eq!(expected_value, actual_fields);
900 }
901 #[test]
905 fn rename_key_flat_overlap() {
906 let value = value!({
907 one: 1,
908 two: 2
909 });
910
911 let mut expected_value = value.clone();
912 let val = expected_value.remove("one", true).unwrap();
913 expected_value.insert("two", val);
914
915 let mut base = LogEvent::from_parts(value, EventMetadata::default());
916 base.rename_key(event_path!("one"), event_path!("two"));
917 let (actual_value, _) = base.into_parts();
918
919 assert_eq!(expected_value, actual_value);
920 }
921
922 #[test]
923 fn insert() {
924 let mut log = LogEvent::default();
925
926 let old = log.insert("foo", "foo");
927
928 assert_eq!(log.get("foo"), Some(&"foo".into()));
929 assert_eq!(old, None);
930 }
931
932 #[test]
933 fn insert_existing() {
934 let mut log = LogEvent::default();
935 log.insert("foo", "foo");
936
937 let old = log.insert("foo", "bar");
938
939 assert_eq!(log.get("foo"), Some(&"bar".into()));
940 assert_eq!(old, Some("foo".into()));
941 }
942
943 #[test]
944 fn try_insert() {
945 let mut log = LogEvent::default();
946
947 log.try_insert("foo", "foo");
948
949 assert_eq!(log.get("foo"), Some(&"foo".into()));
950 }
951
952 #[test]
953 fn try_insert_existing() {
954 let mut log = LogEvent::default();
955 log.insert("foo", "foo");
956
957 log.try_insert("foo", "bar");
958
959 assert_eq!(log.get("foo"), Some(&"foo".into()));
960 }
961
962 #[test]
963 fn try_insert_dotted() {
964 let mut log = LogEvent::default();
965
966 log.try_insert("foo.bar", "foo");
967
968 assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
969 assert_eq!(log.get(event_path!("foo.bar")), None);
970 }
971
972 #[test]
973 fn try_insert_existing_dotted() {
974 let mut log = LogEvent::default();
975 log.insert("foo.bar", "foo");
976
977 log.try_insert("foo.bar", "bar");
978
979 assert_eq!(log.get("foo.bar"), Some(&"foo".into()));
980 assert_eq!(log.get(event_path!("foo.bar")), None);
981 }
982
983 #[test]
984 fn try_insert_flat() {
985 let mut log = LogEvent::default();
986
987 log.try_insert(event_path!("foo"), "foo");
988
989 assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
990 }
991
992 #[test]
993 fn try_insert_flat_existing() {
994 let mut log = LogEvent::default();
995 log.insert(event_path!("foo"), "foo");
996
997 log.try_insert(event_path!("foo"), "bar");
998
999 assert_eq!(log.get(event_path!("foo")), Some(&"foo".into()));
1000 }
1001
1002 #[test]
1003 fn try_insert_flat_dotted() {
1004 let mut log = LogEvent::default();
1005
1006 log.try_insert(event_path!("foo.bar"), "foo");
1007
1008 assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1009 assert_eq!(log.get("foo.bar"), None);
1010 }
1011
1012 #[test]
1013 fn try_insert_flat_existing_dotted() {
1014 let mut log = LogEvent::default();
1015 log.insert(event_path!("foo.bar"), "foo");
1016
1017 log.try_insert(event_path!("foo.bar"), "bar");
1018
1019 assert_eq!(log.get(event_path!("foo.bar")), Some(&"foo".into()));
1020 assert_eq!(log.get("foo.bar"), None);
1021 }
1022
1023 #[test]
1032 fn json_value_to_vector_log_event_to_json_value() {
1033 const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event";
1034
1035 std::fs::read_dir(FIXTURE_ROOT)
1036 .unwrap()
1037 .for_each(|fixture_file| match fixture_file {
1038 Ok(fixture_file) => {
1039 let path = fixture_file.path();
1040 tracing::trace!(?path, "Opening.");
1041 let serde_value = open_fixture(&path).unwrap();
1042
1043 let vector_value = LogEvent::try_from(serde_value.clone()).unwrap();
1044 let serde_value_again: serde_json::Value = vector_value.try_into().unwrap();
1045
1046 assert_eq!(serde_value, serde_value_again);
1047 }
1048 _ => panic!("This test should never read Err'ing test fixtures."),
1049 });
1050 }
1051
1052 fn assert_merge_value(
1053 current: impl Into<Value>,
1054 incoming: impl Into<Value>,
1055 expected: impl Into<Value>,
1056 ) {
1057 let mut merged = current.into();
1058 merged.merge(incoming.into());
1059 assert_eq!(merged, expected.into());
1060 }
1061
1062 #[test]
1063 fn merge_value_works_correctly() {
1064 assert_merge_value("hello ", "world", "hello world");
1065
1066 assert_merge_value(true, false, false);
1067 assert_merge_value(false, true, true);
1068
1069 assert_merge_value("my_val", true, true);
1070 assert_merge_value(true, "my_val", "my_val");
1071
1072 assert_merge_value(1, 2, 2);
1073 }
1074
1075 #[test]
1076 fn merge_event_combines_values_accordingly() {
1077 let fields_to_merge = vec![
1081 "merge".to_string(),
1082 "merge_a".to_string(),
1083 "merge_b".to_string(),
1084 "merge_c".to_string(),
1085 ];
1086
1087 let current = {
1088 let mut log = LogEvent::default();
1089
1090 log.insert("merge", "hello "); log.insert("do_not_merge", "my_first_value"); log.insert("merge_a", true); log.insert("merge_b", 123i64); log.insert("a", true); log.insert("b", 123i64); log
1103 };
1104
1105 let incoming = {
1106 let mut log = LogEvent::default();
1107
1108 log.insert("merge", "world"); log.insert("do_not_merge", "my_second_value"); log.insert("merge_b", 456i64); log.insert("merge_c", false); log.insert("b", 456i64); log.insert("c", true); log
1120 };
1121
1122 let mut merged = current;
1123 merged.merge(incoming, &fields_to_merge);
1124
1125 let expected = {
1126 let mut log = LogEvent::default();
1127 log.insert("merge", "hello world");
1128 log.insert("do_not_merge", "my_first_value");
1129 log.insert("a", true);
1130 log.insert("b", 123i64);
1131 log.insert("merge_a", true);
1132 log.insert("merge_b", 456i64);
1133 log.insert("merge_c", false);
1134 log
1135 };
1136
1137 vector_common::assert_event_data_eq!(merged, expected);
1138 }
1139
1140 #[test]
1141 fn event_fields_iter() {
1142 let mut log = LogEvent::default();
1143 log.insert("a", 0);
1144 log.insert("a.b", 1);
1145 log.insert("c", 2);
1146 let actual: Vec<(KeyString, Value)> = log
1147 .all_event_fields()
1148 .unwrap()
1149 .map(|(s, v)| (s, v.clone()))
1150 .collect();
1151 assert_eq!(
1152 actual,
1153 vec![("a.b".into(), 1.into()), ("c".into(), 2.into())]
1154 );
1155 }
1156
1157 #[test]
1158 fn metadata_fields_iter() {
1159 let mut log = LogEvent::default();
1160 log.insert("%a", 0);
1161 log.insert("%a.b", 1);
1162 log.insert("%c", 2);
1163 let actual: Vec<(KeyString, Value)> = log
1164 .all_metadata_fields()
1165 .unwrap()
1166 .map(|(s, v)| (s, v.clone()))
1167 .collect();
1168 assert_eq!(
1169 actual,
1170 vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())]
1171 );
1172 }
1173
1174 #[test]
1175 fn skip_array_elements() {
1176 let log = LogEvent::from(Value::from(btreemap! {
1177 "arr" => [1],
1178 "obj" => btreemap! {
1179 "arr" => [1,2,3]
1180 },
1181 }));
1182
1183 let actual: Vec<(KeyString, Value)> = log
1184 .all_event_fields_skip_array_elements()
1185 .unwrap()
1186 .map(|(s, v)| (s, v.clone()))
1187 .collect();
1188 assert_eq!(
1189 actual,
1190 vec![
1191 ("arr".into(), [1].into()),
1192 ("obj.arr".into(), [1, 2, 3].into())
1193 ]
1194 );
1195 }
1196
1197 #[test]
1198 fn metadata_set_unique_uuid_v7_source_event_id() {
1199 let log1 = LogEvent::default();
1201 assert_eq!(
1202 log1.metadata()
1203 .source_event_id()
1204 .expect("source_event_id should be auto-generated for new events")
1205 .get_version(),
1206 Some(Version::SortRand)
1207 );
1208
1209 let log2 = LogEvent::default();
1211 assert_ne!(
1212 log1.metadata().source_event_id(),
1213 log2.metadata().source_event_id()
1214 );
1215 }
1216}