1use std::{
2 borrow::Cow,
3 collections::BTreeMap,
4 convert::TryFrom,
5 marker::PhantomData,
6 num::{NonZero, TryFromIntError},
7};
8
9use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix, lookup_v2::OwnedSegment};
10use snafu::Snafu;
11use vrl::{
12 compiler::{ProgramInfo, SecretTarget, Target, value::VrlValueConvert},
13 prelude::Collection,
14 value::{Kind, ObjectMap, Value},
15};
16
17use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent, metric::TagValue};
18use crate::{
19 config::{LogNamespace, log_schema},
20 schema::Definition,
21};
22
23const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .interval_ms, .timestamp, .kind, .tags";
24
25const VALID_METRIC_PATHS_GET: &str =
27 ".name, .namespace, .interval_ms, .timestamp, .kind, .tags, .type";
28
29const MAX_METRIC_PATH_DEPTH: usize = 3;
34
35#[allow(clippy::large_enum_variant)]
37#[derive(Debug, Clone)]
38pub enum VrlTarget {
39 LogEvent(Value, EventMetadata),
42 Metric {
43 metric: Metric,
44 value: Value,
45 multi_value_tags: bool,
46 },
47 Trace(Value, EventMetadata),
48}
49
50pub enum TargetEvents {
51 One(Event),
52 Logs(TargetIter<LogEvent>),
53 Traces(TargetIter<TraceEvent>),
54}
55
56pub struct TargetIter<T> {
57 iter: std::vec::IntoIter<Value>,
58 metadata: EventMetadata,
59 _marker: PhantomData<T>,
60 log_namespace: LogNamespace,
61}
62
63fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
64 let mut log = LogEvent::new_with_metadata(metadata);
65 log.maybe_insert(log_schema().message_key_target_path(), value);
66 log
67}
68
69impl Iterator for TargetIter<LogEvent> {
70 type Item = Event;
71
72 fn next(&mut self) -> Option<Self::Item> {
73 self.iter.next().map(|v| {
74 match self.log_namespace {
75 LogNamespace::Legacy => match v {
76 value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()),
77 value => create_log_event(value, self.metadata.clone()),
78 },
79 LogNamespace::Vector => LogEvent::from_parts(v, self.metadata.clone()),
80 }
81 .into()
82 })
83 }
84}
85
86impl Iterator for TargetIter<TraceEvent> {
87 type Item = Event;
88
89 fn next(&mut self) -> Option<Self::Item> {
90 self.iter.next().map(|v| {
91 match v {
92 value @ Value::Object(_) => {
93 TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone()))
94 }
95 value => TraceEvent::from(create_log_event(value, self.metadata.clone())),
96 }
97 .into()
98 })
99 }
100}
101
102impl VrlTarget {
103 pub fn new(event: Event, info: &ProgramInfo, multi_value_metric_tags: bool) -> Self {
104 match event {
105 Event::Log(event) => {
106 let (value, metadata) = event.into_parts();
107 VrlTarget::LogEvent(value, metadata)
108 }
109 Event::Metric(metric) => {
110 let value = precompute_metric_value(&metric, info, multi_value_metric_tags);
114
115 VrlTarget::Metric {
116 metric,
117 value,
118 multi_value_tags: multi_value_metric_tags,
119 }
120 }
121 Event::Trace(event) => {
122 let (fields, metadata) = event.into_parts();
123 VrlTarget::Trace(Value::Object(fields), metadata)
124 }
125 }
126 }
127
128 pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
130 let log_namespaces = input.log_namespaces().clone();
131
132 let merged_arrays = merge_array_definitions(input);
134 Definition::combine_log_namespaces(
135 &log_namespaces,
136 move_field_definitions_into_message(merged_arrays.clone()),
137 merged_arrays,
138 )
139 }
140
141 pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
146 match self {
147 VrlTarget::LogEvent(value, metadata) => match value {
148 value @ Value::Object(_) => {
149 TargetEvents::One(LogEvent::from_parts(value, metadata).into())
150 }
151
152 Value::Array(values) => TargetEvents::Logs(TargetIter {
153 iter: values.into_iter(),
154 metadata,
155 _marker: PhantomData,
156 log_namespace,
157 }),
158
159 v => match log_namespace {
160 LogNamespace::Vector => {
161 TargetEvents::One(LogEvent::from_parts(v, metadata).into())
162 }
163 LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()),
164 },
165 },
166 VrlTarget::Trace(value, metadata) => match value {
167 value @ Value::Object(_) => {
168 let log = LogEvent::from_parts(value, metadata);
169 TargetEvents::One(TraceEvent::from(log).into())
170 }
171
172 Value::Array(values) => TargetEvents::Traces(TargetIter {
173 iter: values.into_iter(),
174 metadata,
175 _marker: PhantomData,
176 log_namespace,
177 }),
178
179 v => TargetEvents::One(create_log_event(v, metadata).into()),
180 },
181 VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)),
182 }
183 }
184
185 fn metadata(&self) -> &EventMetadata {
186 match self {
187 VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
188 VrlTarget::Metric { metric, .. } => metric.metadata(),
189 }
190 }
191
192 fn metadata_mut(&mut self) -> &mut EventMetadata {
193 match self {
194 VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
195 VrlTarget::Metric { metric, .. } => metric.metadata_mut(),
196 }
197 }
198}
199
200fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
203 let mut message = definition.event_kind().clone();
204 message.remove_object();
205 message.remove_array();
206
207 if !message.is_never()
208 && let Some(message_key) = log_schema().message_key()
209 {
210 let message = Kind::object(Collection::from(BTreeMap::from([(
213 message_key.to_string().into(),
214 message,
215 )])));
216
217 definition.event_kind_mut().remove_bytes();
218 definition.event_kind_mut().remove_integer();
219 definition.event_kind_mut().remove_float();
220 definition.event_kind_mut().remove_boolean();
221 definition.event_kind_mut().remove_timestamp();
222 definition.event_kind_mut().remove_regex();
223 definition.event_kind_mut().remove_null();
224
225 *definition.event_kind_mut() = definition.event_kind().union(message);
226 }
227
228 definition
229}
230
231fn merge_array_definitions(mut definition: Definition) -> Definition {
238 if let Some(array) = definition.event_kind().as_array() {
239 let array_kinds = array.reduced_kind();
240
241 let kind = definition.event_kind_mut();
242 kind.remove_array();
243 *kind = kind.union(array_kinds);
244 }
245
246 definition
247}
248
249fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
250 if multi_value_tags {
251 let values = if let Value::Array(values) = value {
252 values.as_slice()
253 } else {
254 std::slice::from_ref(value)
255 };
256
257 let tag_values = values
258 .iter()
259 .filter_map(|value| match value {
260 Value::Bytes(bytes) => {
261 Some(TagValue::Value(String::from_utf8_lossy(bytes).to_string()))
262 }
263 Value::Null => Some(TagValue::Bare),
264 _ => None,
265 })
266 .collect::<Vec<_>>();
267
268 metric.set_multi_value_tag(name, tag_values);
269 } else {
270 if let Ok(tag_value) = value.try_bytes_utf8_lossy().map(Cow::into_owned) {
272 metric.replace_tag(name, tag_value);
273 } else if value.is_null() {
274 metric.set_multi_value_tag(name, vec![TagValue::Bare]);
275 }
276 }
277}
278
279impl Target for VrlTarget {
280 fn target_insert(&mut self, target_path: &OwnedTargetPath, value: Value) -> Result<(), String> {
281 let path = &target_path.path;
282 match target_path.prefix {
283 PathPrefix::Event => match self {
284 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
285 log.insert(path, value);
286 Ok(())
287 }
288 VrlTarget::Metric {
289 metric,
290 value: metric_value,
291 multi_value_tags,
292 } => {
293 if path.is_root() {
294 return Err(MetricPathError::SetPathError.to_string());
295 }
296
297 if let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
298 match paths.as_slice() {
299 ["tags"] => {
300 let value =
301 value.clone().try_object().map_err(|e| e.to_string())?;
302
303 metric.remove_tags();
304 for (field, value) in &value {
305 set_metric_tag_values(
306 field[..].into(),
307 value,
308 metric,
309 *multi_value_tags,
310 );
311 }
312 }
313 ["tags", field] => {
314 set_metric_tag_values(
315 (*field).to_owned(),
316 &value,
317 metric,
318 *multi_value_tags,
319 );
320 }
321 ["name"] => {
322 let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
323 metric.series.name.name =
324 String::from_utf8_lossy(&value).into_owned();
325 }
326 ["namespace"] => {
327 let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
328 metric.series.name.namespace =
329 Some(String::from_utf8_lossy(&value).into_owned());
330 }
331 ["interval_ms"] => {
332 let value: i64 =
333 value.clone().try_into_i64().map_err(|e| e.to_string())?;
334 let value: u32 = value
335 .try_into()
336 .map_err(|e: TryFromIntError| e.to_string())?;
337 let value = NonZero::try_from(value).map_err(|e| e.to_string())?;
338 metric.data.time.interval_ms = Some(value);
339 }
340 ["timestamp"] => {
341 let value =
342 value.clone().try_timestamp().map_err(|e| e.to_string())?;
343 metric.data.time.timestamp = Some(value);
344 }
345 ["kind"] => {
346 metric.data.kind = MetricKind::try_from(value.clone())?;
347 }
348 _ => {
349 return Err(MetricPathError::InvalidPath {
350 path: &path.to_string(),
351 expected: VALID_METRIC_PATHS_SET,
352 }
353 .to_string());
354 }
355 }
356
357 metric_value.insert(path, value);
358
359 return Ok(());
360 }
361
362 Err(MetricPathError::InvalidPath {
363 path: &path.to_string(),
364 expected: VALID_METRIC_PATHS_SET,
365 }
366 .to_string())
367 }
368 },
369 PathPrefix::Metadata => {
370 self.metadata_mut()
371 .value_mut()
372 .insert(&target_path.path, value);
373 Ok(())
374 }
375 }
376 }
377
378 #[allow(clippy::redundant_closure_for_method_calls)] fn target_get(&self, target_path: &OwnedTargetPath) -> Result<Option<&Value>, String> {
380 match target_path.prefix {
381 PathPrefix::Event => match self {
382 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
383 Ok(log.get(&target_path.path))
384 }
385 VrlTarget::Metric { value, .. } => target_get_metric(&target_path.path, value),
386 },
387 PathPrefix::Metadata => Ok(self.metadata().value().get(&target_path.path)),
388 }
389 }
390
391 fn target_get_mut(
392 &mut self,
393 target_path: &OwnedTargetPath,
394 ) -> Result<Option<&mut Value>, String> {
395 match target_path.prefix {
396 PathPrefix::Event => match self {
397 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
398 Ok(log.get_mut(&target_path.path))
399 }
400 VrlTarget::Metric { value, .. } => target_get_mut_metric(&target_path.path, value),
401 },
402 PathPrefix::Metadata => Ok(self.metadata_mut().value_mut().get_mut(&target_path.path)),
403 }
404 }
405
406 fn target_remove(
407 &mut self,
408 target_path: &OwnedTargetPath,
409 compact: bool,
410 ) -> Result<Option<vrl::value::Value>, String> {
411 match target_path.prefix {
412 PathPrefix::Event => match self {
413 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
414 Ok(log.remove(&target_path.path, compact))
415 }
416 VrlTarget::Metric {
417 metric,
418 value,
419 multi_value_tags: _,
420 } => {
421 if target_path.path.is_root() {
422 return Err(MetricPathError::SetPathError.to_string());
423 }
424
425 if let Some(paths) = target_path
426 .path
427 .to_alternative_components(MAX_METRIC_PATH_DEPTH)
428 {
429 let removed_value = match paths.as_slice() {
430 ["namespace"] => metric.series.name.namespace.take().map(Into::into),
431 ["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
432 ["interval_ms"] => metric
433 .data
434 .time
435 .interval_ms
436 .take()
437 .map(u32::from)
438 .map(Into::into),
439 ["tags"] => metric.series.tags.take().map(|map| {
440 map.into_iter_single()
441 .map(|(k, v)| (k, v.into()))
442 .collect::<vrl::value::Value>()
443 }),
444 ["tags", field] => metric.remove_tag(field).map(Into::into),
445 _ => {
446 return Err(MetricPathError::InvalidPath {
447 path: &target_path.path.to_string(),
448 expected: VALID_METRIC_PATHS_SET,
449 }
450 .to_string());
451 }
452 };
453
454 value.remove(&target_path.path, false);
455
456 Ok(removed_value)
457 } else {
458 Ok(None)
459 }
460 }
461 },
462 PathPrefix::Metadata => Ok(self
463 .metadata_mut()
464 .value_mut()
465 .remove(&target_path.path, compact)),
466 }
467 }
468}
469
470impl SecretTarget for VrlTarget {
471 fn get_secret(&self, key: &str) -> Option<&str> {
472 self.metadata().secrets().get_secret(key)
473 }
474
475 fn insert_secret(&mut self, key: &str, value: &str) {
476 self.metadata_mut().secrets_mut().insert_secret(key, value);
477 }
478
479 fn remove_secret(&mut self, key: &str) {
480 self.metadata_mut().secrets_mut().remove_secret(key);
481 }
482}
483
484fn target_get_metric<'a>(
497 path: &OwnedValuePath,
498 value: &'a Value,
499) -> Result<Option<&'a Value>, String> {
500 if path.is_root() {
501 return Ok(Some(value));
502 }
503
504 let value = value.get(path);
505
506 let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
507 return Ok(None);
508 };
509
510 match paths.as_slice() {
511 ["name"]
512 | ["kind"]
513 | ["type"]
514 | ["tags", _]
515 | ["namespace"]
516 | ["timestamp"]
517 | ["interval_ms"]
518 | ["tags"] => Ok(value),
519 _ => Err(MetricPathError::InvalidPath {
520 path: &path.to_string(),
521 expected: VALID_METRIC_PATHS_GET,
522 }
523 .to_string()),
524 }
525}
526
527fn target_get_mut_metric<'a>(
528 path: &OwnedValuePath,
529 value: &'a mut Value,
530) -> Result<Option<&'a mut Value>, String> {
531 if path.is_root() {
532 return Ok(Some(value));
533 }
534
535 let value = value.get_mut(path);
536
537 let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
538 return Ok(None);
539 };
540
541 match paths.as_slice() {
542 ["name"]
543 | ["kind"]
544 | ["tags", _]
545 | ["namespace"]
546 | ["timestamp"]
547 | ["interval_ms"]
548 | ["tags"] => Ok(value),
549 _ => Err(MetricPathError::InvalidPath {
550 path: &path.to_string(),
551 expected: VALID_METRIC_PATHS_SET,
552 }
553 .to_string()),
554 }
555}
556
557fn precompute_metric_value(metric: &Metric, info: &ProgramInfo, multi_value_tags: bool) -> Value {
562 struct MetricProperty {
563 property: &'static str,
564 getter: fn(&Metric) -> Option<Value>,
565 set: bool,
566 }
567
568 impl MetricProperty {
569 fn new(property: &'static str, getter: fn(&Metric) -> Option<Value>) -> Self {
570 Self {
571 property,
572 getter,
573 set: false,
574 }
575 }
576
577 fn insert(&mut self, metric: &Metric, map: &mut ObjectMap) {
578 if self.set {
579 return;
580 }
581 if let Some(value) = (self.getter)(metric) {
582 map.insert(self.property.into(), value);
583 self.set = true;
584 }
585 }
586 }
587
588 fn get_single_value_tags(metric: &Metric) -> Option<Value> {
589 metric.tags().cloned().map(|tags| {
590 tags.into_iter_single()
591 .map(|(tag, value)| (tag.into(), value.into()))
592 .collect::<ObjectMap>()
593 .into()
594 })
595 }
596
597 fn get_multi_value_tags(metric: &Metric) -> Option<Value> {
598 metric.tags().cloned().map(|tags| {
599 tags.iter_sets()
600 .map(|(tag, tag_set)| {
601 let array_values: Vec<Value> = tag_set
602 .iter()
603 .map(|v| match v {
604 Some(s) => Value::Bytes(s.as_bytes().to_vec().into()),
605 None => Value::Null,
606 })
607 .collect();
608 (tag.into(), Value::Array(array_values))
609 })
610 .collect::<ObjectMap>()
611 .into()
612 })
613 }
614
615 let mut name = MetricProperty::new("name", |metric| Some(metric.name().to_owned().into()));
616 let mut kind = MetricProperty::new("kind", |metric| Some(metric.kind().into()));
617 let mut type_ = MetricProperty::new("type", |metric| Some(metric.value().clone().into()));
618 let mut namespace = MetricProperty::new("namespace", |metric| {
619 metric.namespace().map(String::from).map(Into::into)
620 });
621 let mut interval_ms =
622 MetricProperty::new("interval_ms", |metric| metric.interval_ms().map(Into::into));
623 let mut timestamp =
624 MetricProperty::new("timestamp", |metric| metric.timestamp().map(Into::into));
625 let mut tags = MetricProperty::new(
626 "tags",
627 if multi_value_tags {
628 get_multi_value_tags
629 } else {
630 get_single_value_tags
631 },
632 );
633
634 let mut map = ObjectMap::default();
635
636 for target_path in &info.target_queries {
637 if target_path == &OwnedTargetPath::event_root() {
639 let mut properties = [
640 &mut name,
641 &mut kind,
642 &mut type_,
643 &mut namespace,
644 &mut interval_ms,
645 &mut timestamp,
646 &mut tags,
647 ];
648 for property in &mut properties {
649 property.insert(metric, &mut map);
650 }
651 break;
652 }
653
654 if let Some(OwnedSegment::Field(field)) = target_path.path.segments.first() {
657 let property = match field.as_ref() {
658 "name" => Some(&mut name),
659 "kind" => Some(&mut kind),
660 "type" => Some(&mut type_),
661 "namespace" => Some(&mut namespace),
662 "timestamp" => Some(&mut timestamp),
663 "interval_ms" => Some(&mut interval_ms),
664 "tags" => Some(&mut tags),
665 _ => None,
666 };
667 if let Some(property) = property {
668 property.insert(metric, &mut map);
669 }
670 }
671 }
672
673 map.into()
674}
675
676#[derive(Debug, Snafu)]
677enum MetricPathError<'a> {
678 #[snafu(display("cannot set root path"))]
679 SetPathError,
680
681 #[snafu(display("invalid path {}: expected one of {}", path, expected))]
682 InvalidPath { path: &'a str, expected: &'a str },
683}
684
685#[cfg(test)]
686mod test {
687 use chrono::{Utc, offset::TimeZone};
688 use lookup::owned_value_path;
689 use similar_asserts::assert_eq;
690 use vrl::{btreemap, value::kind::Index};
691
692 use super::{super::MetricValue, *};
693 use crate::metric_tags;
694
695 #[test]
696 fn test_field_definitions_in_message() {
697 let definition =
698 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
699 assert_eq!(
700 Definition::new_with_default_metadata(
701 Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
702 [LogNamespace::Legacy]
703 ),
704 move_field_definitions_into_message(definition)
705 );
706
707 let definition = Definition::new_with_default_metadata(
709 Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
710 [LogNamespace::Legacy],
711 );
712 assert_eq!(
713 Definition::new_with_default_metadata(
714 Kind::object(BTreeMap::from([(
715 "message".into(),
716 Kind::bytes().or_integer()
717 )])),
718 [LogNamespace::Legacy]
719 ),
720 move_field_definitions_into_message(definition)
721 );
722 }
723
724 #[test]
725 fn test_merged_array_definitions_simple() {
726 let object: BTreeMap<vrl::value::kind::Field, Kind> = [
729 ("carrot".into(), Kind::bytes()),
730 ("potato".into(), Kind::integer()),
731 ]
732 .into();
733
734 let kind = Kind::array(Collection::from_unknown(Kind::object(object)));
735
736 let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
737
738 let kind = Kind::object(BTreeMap::from([
739 ("carrot".into(), Kind::bytes()),
740 ("potato".into(), Kind::integer()),
741 ]));
742
743 let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
744 let merged = merge_array_definitions(definition);
745
746 assert_eq!(wanted, merged);
747 }
748
749 #[test]
750 fn test_merged_array_definitions_complex() {
751 let object: BTreeMap<vrl::value::kind::Field, Kind> = [
754 ("carrot".into(), Kind::bytes()),
755 ("potato".into(), Kind::integer()),
756 ]
757 .into();
758
759 let array: BTreeMap<Index, Kind> = [
760 (Index::from(0), Kind::integer()),
761 (Index::from(1), Kind::boolean()),
762 (
763 Index::from(2),
764 Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
765 ),
766 ]
767 .into();
768
769 let mut kind = Kind::bytes();
770 kind.add_object(object);
771 kind.add_array(array);
772
773 let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
774
775 let mut kind = Kind::bytes();
776 kind.add_integer();
777 kind.add_boolean();
778 kind.add_object(BTreeMap::from([
779 ("carrot".into(), Kind::bytes().or_undefined()),
780 ("potato".into(), Kind::integer().or_undefined()),
781 ("peas".into(), Kind::bytes().or_undefined()),
782 ]));
783
784 let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
785 let merged = merge_array_definitions(definition);
786
787 assert_eq!(wanted, merged);
788 }
789
790 #[test]
791 fn log_get() {
792 let cases = vec![
793 (
794 BTreeMap::new(),
795 owned_value_path!(),
796 Ok(Some(BTreeMap::new().into())),
797 ),
798 (
799 BTreeMap::from([("foo".into(), "bar".into())]),
800 owned_value_path!(),
801 Ok(Some(BTreeMap::from([("foo".into(), "bar".into())]).into())),
802 ),
803 (
804 BTreeMap::from([("foo".into(), "bar".into())]),
805 owned_value_path!("foo"),
806 Ok(Some("bar".into())),
807 ),
808 (
809 BTreeMap::from([("foo".into(), "bar".into())]),
810 owned_value_path!("bar"),
811 Ok(None),
812 ),
813 (
814 btreemap! { "foo" => vec![btreemap! { "bar" => true }] },
815 owned_value_path!("foo", 0, "bar"),
816 Ok(Some(true.into())),
817 ),
818 (
819 btreemap! { "foo" => btreemap! { "bar baz" => btreemap! { "baz" => 2 } } },
820 owned_value_path!("foo", r"bar baz", "baz"),
821 Ok(Some(2.into())),
822 ),
823 ];
824
825 for (value, path, expect) in cases {
826 let value: ObjectMap = value;
827 let info = ProgramInfo {
828 fallible: false,
829 abortable: false,
830 target_queries: vec![],
831 target_assignments: vec![],
832 };
833 let target = VrlTarget::new(Event::Log(LogEvent::from(value)), &info, false);
834 let path = OwnedTargetPath::event(path);
835
836 assert_eq!(
837 Target::target_get(&target, &path).map(Option::<&Value>::cloned),
838 expect
839 );
840 }
841 }
842
843 #[allow(clippy::too_many_lines)]
844 #[test]
845 fn log_insert() {
846 let cases = vec![
847 (
848 BTreeMap::from([("foo".into(), "bar".into())]),
849 owned_value_path!(0),
850 btreemap! { "baz" => "qux" }.into(),
851 btreemap! { "baz" => "qux" },
852 Ok(()),
853 ),
854 (
855 BTreeMap::from([("foo".into(), "bar".into())]),
856 owned_value_path!("foo"),
857 "baz".into(),
858 btreemap! { "foo" => "baz" },
859 Ok(()),
860 ),
861 (
862 BTreeMap::from([("foo".into(), "bar".into())]),
863 owned_value_path!("foo", 2, "bar baz", "a", "b"),
864 true.into(),
865 btreemap! {
866 "foo" => vec![
867 Value::Null,
868 Value::Null,
869 btreemap! {
870 "bar baz" => btreemap! { "a" => btreemap! { "b" => true } },
871 }.into()
872 ]
873 },
874 Ok(()),
875 ),
876 (
877 btreemap! { "foo" => vec![0, 1, 2] },
878 owned_value_path!("foo", 5),
879 "baz".into(),
880 btreemap! {
881 "foo" => vec![
882 0.into(),
883 1.into(),
884 2.into(),
885 Value::Null,
886 Value::Null,
887 Value::from("baz"),
888 ],
889 },
890 Ok(()),
891 ),
892 (
893 BTreeMap::from([("foo".into(), "bar".into())]),
894 owned_value_path!("foo", 0),
895 "baz".into(),
896 btreemap! { "foo" => vec!["baz"] },
897 Ok(()),
898 ),
899 (
900 btreemap! { "foo" => Value::Array(vec![]) },
901 owned_value_path!("foo", 0),
902 "baz".into(),
903 btreemap! { "foo" => vec!["baz"] },
904 Ok(()),
905 ),
906 (
907 btreemap! { "foo" => Value::Array(vec![0.into()]) },
908 owned_value_path!("foo", 0),
909 "baz".into(),
910 btreemap! { "foo" => vec!["baz"] },
911 Ok(()),
912 ),
913 (
914 btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
915 owned_value_path!("foo", 0),
916 "baz".into(),
917 btreemap! { "foo" => Value::Array(vec!["baz".into(), 1.into()]) },
918 Ok(()),
919 ),
920 (
921 btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
922 owned_value_path!("foo", 1),
923 "baz".into(),
924 btreemap! { "foo" => Value::Array(vec![0.into(), "baz".into()]) },
925 Ok(()),
926 ),
927 ];
928
929 for (object, path, value, expect, result) in cases {
930 let object: ObjectMap = object;
931 let info = ProgramInfo {
932 fallible: false,
933 abortable: false,
934 target_queries: vec![],
935 target_assignments: vec![],
936 };
937 let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
938 let expect = LogEvent::from(expect);
939 let value: Value = value;
940 let path = OwnedTargetPath::event(path);
941
942 assert_eq!(
943 Target::target_insert(&mut target, &path, value.clone()),
944 result
945 );
946 assert_eq!(
947 Target::target_get(&target, &path).map(Option::<&Value>::cloned),
948 Ok(Some(value))
949 );
950 assert_eq!(
951 match target.into_events(LogNamespace::Legacy) {
952 TargetEvents::One(event) => vec![event],
953 TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
954 TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
955 }
956 .first()
957 .cloned()
958 .unwrap(),
959 Event::Log(expect)
960 );
961 }
962 }
963
964 #[test]
965 fn log_remove() {
966 let cases = vec![
967 (
968 BTreeMap::from([("foo".into(), "bar".into())]),
969 owned_value_path!("foo"),
970 false,
971 Some(BTreeMap::new().into()),
972 ),
973 (
974 BTreeMap::from([("foo".into(), "bar".into())]),
975 owned_value_path!(r"foo bar", "foo"),
976 false,
977 Some(btreemap! { "foo" => "bar"}.into()),
978 ),
979 (
980 btreemap! { "foo" => "bar", "baz" => "qux" },
981 owned_value_path!(),
982 false,
983 Some(BTreeMap::new().into()),
984 ),
985 (
986 btreemap! { "foo" => "bar", "baz" => "qux" },
987 owned_value_path!(),
988 true,
989 Some(BTreeMap::new().into()),
990 ),
991 (
992 btreemap! { "foo" => vec![0] },
993 owned_value_path!("foo", 0),
994 false,
995 Some(btreemap! { "foo" => Value::Array(vec![]) }.into()),
996 ),
997 (
998 btreemap! { "foo" => vec![0] },
999 owned_value_path!("foo", 0),
1000 true,
1001 Some(BTreeMap::new().into()),
1002 ),
1003 (
1004 btreemap! {
1005 "foo" => btreemap! { "bar baz" => vec![0] },
1006 "bar" => "baz",
1007 },
1008 owned_value_path!("foo", r"bar baz", 0),
1009 false,
1010 Some(
1011 btreemap! {
1012 "foo" => btreemap! { "bar baz" => Value::Array(vec![]) },
1013 "bar" => "baz",
1014 }
1015 .into(),
1016 ),
1017 ),
1018 (
1019 btreemap! {
1020 "foo" => btreemap! { "bar baz" => vec![0] },
1021 "bar" => "baz",
1022 },
1023 owned_value_path!("foo", r"bar baz", 0),
1024 true,
1025 Some(btreemap! { "bar" => "baz" }.into()),
1026 ),
1027 ];
1028
1029 for (object, path, compact, expect) in cases {
1030 let info = ProgramInfo {
1031 fallible: false,
1032 abortable: false,
1033 target_queries: vec![],
1034 target_assignments: vec![],
1035 };
1036 let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
1037 let path = OwnedTargetPath::event(path);
1038 let removed = Target::target_get(&target, &path).unwrap().cloned();
1039
1040 assert_eq!(
1041 Target::target_remove(&mut target, &path, compact),
1042 Ok(removed)
1043 );
1044 assert_eq!(
1045 Target::target_get(&target, &OwnedTargetPath::event_root())
1046 .map(Option::<&Value>::cloned),
1047 Ok(expect)
1048 );
1049 }
1050 }
1051
1052 #[test]
1053 fn log_into_events() {
1054 use vrl::btreemap;
1055
1056 let cases = vec![
1057 (
1058 Value::from(btreemap! {"foo" => "bar"}),
1059 vec![btreemap! {"foo" => "bar"}],
1060 ),
1061 (Value::from(1), vec![btreemap! {"message" => 1}]),
1062 (Value::from("2"), vec![btreemap! {"message" => "2"}]),
1063 (Value::from(true), vec![btreemap! {"message" => true}]),
1064 (
1065 Value::from(vec![
1066 Value::from(1),
1067 Value::from("2"),
1068 Value::from(true),
1069 Value::from(btreemap! {"foo" => "bar"}),
1070 ]),
1071 vec![
1072 btreemap! {"message" => 1},
1073 btreemap! {"message" => "2"},
1074 btreemap! {"message" => true},
1075 btreemap! {"foo" => "bar"},
1076 ],
1077 ),
1078 ];
1079
1080 for (value, expect) in cases {
1081 let metadata = EventMetadata::default();
1082 let info = ProgramInfo {
1083 fallible: false,
1084 abortable: false,
1085 target_queries: vec![],
1086 target_assignments: vec![],
1087 };
1088 let mut target = VrlTarget::new(
1089 Event::Log(LogEvent::new_with_metadata(metadata.clone())),
1090 &info,
1091 false,
1092 );
1093
1094 Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();
1095
1096 assert_eq!(
1097 match target.into_events(LogNamespace::Legacy) {
1098 TargetEvents::One(event) => vec![event],
1099 TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
1100 TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
1101 },
1102 expect
1103 .into_iter()
1104 .map(|v| Event::Log(LogEvent::from_map(v, metadata.clone())))
1105 .collect::<Vec<_>>()
1106 );
1107 }
1108 }
1109
1110 #[test]
1111 fn metric_all_fields() {
1112 let metric = Metric::new(
1113 "zub",
1114 MetricKind::Absolute,
1115 MetricValue::Counter { value: 1.23 },
1116 )
1117 .with_namespace(Some("zoob"))
1118 .with_tags(Some(metric_tags!("tig" => "tog")))
1119 .with_timestamp(Some(
1120 Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0)
1121 .single()
1122 .expect("invalid timestamp"),
1123 ))
1124 .with_interval_ms(Some(NonZero::<u32>::new(507).unwrap()));
1125
1126 let info = ProgramInfo {
1127 fallible: false,
1128 abortable: false,
1129 target_queries: vec![
1130 OwnedTargetPath::event(owned_value_path!("name")),
1131 OwnedTargetPath::event(owned_value_path!("namespace")),
1132 OwnedTargetPath::event(owned_value_path!("interval_ms")),
1133 OwnedTargetPath::event(owned_value_path!("timestamp")),
1134 OwnedTargetPath::event(owned_value_path!("kind")),
1135 OwnedTargetPath::event(owned_value_path!("type")),
1136 OwnedTargetPath::event(owned_value_path!("tags")),
1137 ],
1138 target_assignments: vec![],
1139 };
1140 let target = VrlTarget::new(Event::Metric(metric), &info, false);
1141
1142 assert_eq!(
1143 Ok(Some(
1144 btreemap! {
1145 "name" => "zub",
1146 "namespace" => "zoob",
1147 "interval_ms" => 507,
1148 "timestamp" => Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0).single().expect("invalid timestamp"),
1149 "tags" => btreemap! { "tig" => "tog" },
1150 "kind" => "absolute",
1151 "type" => "counter",
1152 }
1153 .into()
1154 )),
1155 target
1156 .target_get(&OwnedTargetPath::event_root())
1157 .map(Option::<&Value>::cloned)
1158 );
1159 }
1160
1161 #[test]
1162 fn metric_fields() {
1163 struct Case {
1164 path: OwnedValuePath,
1165 current: Option<Value>,
1166 new: Value,
1167 delete: bool,
1168 }
1169
1170 let metric = Metric::new(
1171 "name",
1172 MetricKind::Absolute,
1173 MetricValue::Counter { value: 1.23 },
1174 )
1175 .with_tags(Some(metric_tags!("tig" => "tog")));
1176
1177 let cases = vec![
1178 Case {
1179 path: owned_value_path!("name"),
1180 current: Some(Value::from("name")),
1181 new: Value::from("namefoo"),
1182 delete: false,
1183 },
1184 Case {
1185 path: owned_value_path!("namespace"),
1186 current: None,
1187 new: "namespacefoo".into(),
1188 delete: true,
1189 },
1190 Case {
1191 path: owned_value_path!("timestamp"),
1192 current: None,
1193 new: Utc
1194 .with_ymd_and_hms(2020, 12, 8, 12, 0, 0)
1195 .single()
1196 .expect("invalid timestamp")
1197 .into(),
1198 delete: true,
1199 },
1200 Case {
1201 path: owned_value_path!("interval_ms"),
1202 current: None,
1203 new: 123_456.into(),
1204 delete: true,
1205 },
1206 Case {
1207 path: owned_value_path!("kind"),
1208 current: Some(Value::from("absolute")),
1209 new: "incremental".into(),
1210 delete: false,
1211 },
1212 Case {
1213 path: owned_value_path!("tags", "thing"),
1214 current: None,
1215 new: "footag".into(),
1216 delete: true,
1217 },
1218 ];
1219
1220 let info = ProgramInfo {
1221 fallible: false,
1222 abortable: false,
1223 target_queries: vec![
1224 OwnedTargetPath::event(owned_value_path!("name")),
1225 OwnedTargetPath::event(owned_value_path!("namespace")),
1226 OwnedTargetPath::event(owned_value_path!("timestamp")),
1227 OwnedTargetPath::event(owned_value_path!("interval_ms")),
1228 OwnedTargetPath::event(owned_value_path!("kind")),
1229 ],
1230 target_assignments: vec![],
1231 };
1232 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1233
1234 for Case {
1235 path,
1236 current,
1237 new,
1238 delete,
1239 } in cases
1240 {
1241 let path = OwnedTargetPath::event(path);
1242
1243 assert_eq!(
1244 Ok(current),
1245 target.target_get(&path).map(Option::<&Value>::cloned)
1246 );
1247 assert_eq!(Ok(()), target.target_insert(&path, new.clone()));
1248 assert_eq!(
1249 Ok(Some(new.clone())),
1250 target.target_get(&path).map(Option::<&Value>::cloned)
1251 );
1252
1253 if delete {
1254 assert_eq!(Ok(Some(new)), target.target_remove(&path, true));
1255 assert_eq!(
1256 Ok(None),
1257 target.target_get(&path).map(Option::<&Value>::cloned)
1258 );
1259 }
1260 }
1261 }
1262
1263 #[test]
1264 fn metric_set_tags() {
1265 let metric = Metric::new(
1266 "name",
1267 MetricKind::Absolute,
1268 MetricValue::Counter { value: 1.23 },
1269 )
1270 .with_tags(Some(metric_tags!("tig" => "tog")));
1271
1272 let info = ProgramInfo {
1273 fallible: false,
1274 abortable: false,
1275 target_queries: vec![],
1276 target_assignments: vec![],
1277 };
1278 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1279 let _result = target.target_insert(
1280 &OwnedTargetPath::event(owned_value_path!("tags")),
1281 Value::Object(BTreeMap::from([("a".into(), "b".into())])),
1282 );
1283
1284 match target {
1285 VrlTarget::Metric {
1286 metric,
1287 value: _,
1288 multi_value_tags: _,
1289 } => {
1290 assert!(metric.tags().is_some());
1291 assert_eq!(metric.tags().unwrap(), &crate::metric_tags!("a" => "b"));
1292 }
1293 _ => panic!("must be a metric"),
1294 }
1295 }
1296
1297 #[test]
1298 fn metric_invalid_paths() {
1299 let metric = Metric::new(
1300 "name",
1301 MetricKind::Absolute,
1302 MetricValue::Counter { value: 1.23 },
1303 );
1304
1305 let validpaths_get = [
1306 ".name",
1307 ".namespace",
1308 ".interval_ms",
1309 ".timestamp",
1310 ".kind",
1311 ".tags",
1312 ".type",
1313 ];
1314
1315 let validpaths_set = [
1316 ".name",
1317 ".namespace",
1318 ".interval_ms",
1319 ".timestamp",
1320 ".kind",
1321 ".tags",
1322 ];
1323
1324 let info = ProgramInfo {
1325 fallible: false,
1326 abortable: false,
1327 target_queries: vec![],
1328 target_assignments: vec![],
1329 };
1330 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1331
1332 assert_eq!(
1333 Err(format!(
1334 "invalid path zork: expected one of {}",
1335 validpaths_get.join(", ")
1336 )),
1337 target.target_get(&OwnedTargetPath::event(owned_value_path!("zork")))
1338 );
1339
1340 assert_eq!(
1341 Err(format!(
1342 "invalid path zork: expected one of {}",
1343 validpaths_set.join(", ")
1344 )),
1345 target.target_insert(
1346 &OwnedTargetPath::event(owned_value_path!("zork")),
1347 "thing".into()
1348 )
1349 );
1350
1351 assert_eq!(
1352 Err(format!(
1353 "invalid path zork: expected one of {}",
1354 validpaths_set.join(", ")
1355 )),
1356 target.target_remove(&OwnedTargetPath::event(owned_value_path!("zork")), true)
1357 );
1358
1359 assert_eq!(
1360 Err(format!(
1361 "invalid path tags.foo.flork: expected one of {}",
1362 validpaths_get.join(", ")
1363 )),
1364 target.target_get(&OwnedTargetPath::event(owned_value_path!(
1365 "tags", "foo", "flork"
1366 )))
1367 );
1368 }
1369
1370 #[test]
1371 fn test_metric_insert_get_multi_value_tag() {
1372 let metric = Metric::new(
1373 "name",
1374 MetricKind::Absolute,
1375 MetricValue::Counter { value: 1.23 },
1376 );
1377 let info = ProgramInfo {
1378 fallible: false,
1379 abortable: false,
1380 target_queries: vec![],
1381 target_assignments: vec![],
1382 };
1383
1384 let mut target = VrlTarget::new(Event::Metric(metric), &info, true);
1385
1386 let value = Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()]);
1387 target
1388 .target_insert(
1389 &OwnedTargetPath::event(owned_value_path!("tags", "foo")),
1390 value,
1391 )
1392 .unwrap();
1393
1394 let vrl_tags_value = target
1395 .target_get(&OwnedTargetPath::event(owned_value_path!("tags")))
1396 .unwrap()
1397 .unwrap();
1398
1399 assert_eq!(
1400 vrl_tags_value,
1401 &Value::Object(BTreeMap::from([(
1402 "foo".into(),
1403 Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()])
1404 )]))
1405 );
1406
1407 let VrlTarget::Metric { metric, .. } = target else {
1408 unreachable!()
1409 };
1410
1411 assert_eq!(metric.tag_value("foo"), Some("b".into()));
1413 }
1414}