1use std::borrow::Cow;
2use std::num::{NonZero, TryFromIntError};
3use std::{collections::BTreeMap, convert::TryFrom, marker::PhantomData};
4
5use lookup::lookup_v2::OwnedSegment;
6use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix};
7use snafu::Snafu;
8use vrl::compiler::value::VrlValueConvert;
9use vrl::compiler::{ProgramInfo, SecretTarget, Target};
10use vrl::prelude::Collection;
11use vrl::value::{Kind, ObjectMap, Value};
12
13use super::{metric::TagValue, Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent};
14use crate::config::{log_schema, LogNamespace};
15use crate::schema::Definition;
16
17const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .interval_ms, .timestamp, .kind, .tags";
18
19const VALID_METRIC_PATHS_GET: &str =
21 ".name, .namespace, .interval_ms, .timestamp, .kind, .tags, .type";
22
23const MAX_METRIC_PATH_DEPTH: usize = 3;
28
29#[allow(clippy::large_enum_variant)]
31#[derive(Debug, Clone)]
32pub enum VrlTarget {
33 LogEvent(Value, EventMetadata),
36 Metric {
37 metric: Metric,
38 value: Value,
39 multi_value_tags: bool,
40 },
41 Trace(Value, EventMetadata),
42}
43
44pub enum TargetEvents {
45 One(Event),
46 Logs(TargetIter<LogEvent>),
47 Traces(TargetIter<TraceEvent>),
48}
49
50pub struct TargetIter<T> {
51 iter: std::vec::IntoIter<Value>,
52 metadata: EventMetadata,
53 _marker: PhantomData<T>,
54 log_namespace: LogNamespace,
55}
56
57fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
58 let mut log = LogEvent::new_with_metadata(metadata);
59 log.maybe_insert(log_schema().message_key_target_path(), value);
60 log
61}
62
63impl Iterator for TargetIter<LogEvent> {
64 type Item = Event;
65
66 fn next(&mut self) -> Option<Self::Item> {
67 self.iter.next().map(|v| {
68 match self.log_namespace {
69 LogNamespace::Legacy => match v {
70 value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()),
71 value => create_log_event(value, self.metadata.clone()),
72 },
73 LogNamespace::Vector => LogEvent::from_parts(v, self.metadata.clone()),
74 }
75 .into()
76 })
77 }
78}
79
80impl Iterator for TargetIter<TraceEvent> {
81 type Item = Event;
82
83 fn next(&mut self) -> Option<Self::Item> {
84 self.iter.next().map(|v| {
85 match v {
86 value @ Value::Object(_) => {
87 TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone()))
88 }
89 value => TraceEvent::from(create_log_event(value, self.metadata.clone())),
90 }
91 .into()
92 })
93 }
94}
95
96impl VrlTarget {
97 pub fn new(event: Event, info: &ProgramInfo, multi_value_metric_tags: bool) -> Self {
98 match event {
99 Event::Log(event) => {
100 let (value, metadata) = event.into_parts();
101 VrlTarget::LogEvent(value, metadata)
102 }
103 Event::Metric(metric) => {
104 let value = precompute_metric_value(&metric, info, multi_value_metric_tags);
108
109 VrlTarget::Metric {
110 metric,
111 value,
112 multi_value_tags: multi_value_metric_tags,
113 }
114 }
115 Event::Trace(event) => {
116 let (fields, metadata) = event.into_parts();
117 VrlTarget::Trace(Value::Object(fields), metadata)
118 }
119 }
120 }
121
122 pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
124 let log_namespaces = input.log_namespaces().clone();
125
126 let merged_arrays = merge_array_definitions(input);
128 Definition::combine_log_namespaces(
129 &log_namespaces,
130 move_field_definitions_into_message(merged_arrays.clone()),
131 merged_arrays,
132 )
133 }
134
135 pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
140 match self {
141 VrlTarget::LogEvent(value, metadata) => match value {
142 value @ Value::Object(_) => {
143 TargetEvents::One(LogEvent::from_parts(value, metadata).into())
144 }
145
146 Value::Array(values) => TargetEvents::Logs(TargetIter {
147 iter: values.into_iter(),
148 metadata,
149 _marker: PhantomData,
150 log_namespace,
151 }),
152
153 v => match log_namespace {
154 LogNamespace::Vector => {
155 TargetEvents::One(LogEvent::from_parts(v, metadata).into())
156 }
157 LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()),
158 },
159 },
160 VrlTarget::Trace(value, metadata) => match value {
161 value @ Value::Object(_) => {
162 let log = LogEvent::from_parts(value, metadata);
163 TargetEvents::One(TraceEvent::from(log).into())
164 }
165
166 Value::Array(values) => TargetEvents::Traces(TargetIter {
167 iter: values.into_iter(),
168 metadata,
169 _marker: PhantomData,
170 log_namespace,
171 }),
172
173 v => TargetEvents::One(create_log_event(v, metadata).into()),
174 },
175 VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)),
176 }
177 }
178
179 fn metadata(&self) -> &EventMetadata {
180 match self {
181 VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
182 VrlTarget::Metric { metric, .. } => metric.metadata(),
183 }
184 }
185
186 fn metadata_mut(&mut self) -> &mut EventMetadata {
187 match self {
188 VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
189 VrlTarget::Metric { metric, .. } => metric.metadata_mut(),
190 }
191 }
192}
193
194fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
197 let mut message = definition.event_kind().clone();
198 message.remove_object();
199 message.remove_array();
200
201 if !message.is_never() {
202 if let Some(message_key) = log_schema().message_key() {
203 let message = Kind::object(Collection::from(BTreeMap::from([(
206 message_key.to_string().into(),
207 message,
208 )])));
209
210 definition.event_kind_mut().remove_bytes();
211 definition.event_kind_mut().remove_integer();
212 definition.event_kind_mut().remove_float();
213 definition.event_kind_mut().remove_boolean();
214 definition.event_kind_mut().remove_timestamp();
215 definition.event_kind_mut().remove_regex();
216 definition.event_kind_mut().remove_null();
217
218 *definition.event_kind_mut() = definition.event_kind().union(message);
219 }
220 }
221
222 definition
223}
224
225fn merge_array_definitions(mut definition: Definition) -> Definition {
232 if let Some(array) = definition.event_kind().as_array() {
233 let array_kinds = array.reduced_kind();
234
235 let kind = definition.event_kind_mut();
236 kind.remove_array();
237 *kind = kind.union(array_kinds);
238 }
239
240 definition
241}
242
243fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
244 if multi_value_tags {
245 let values = if let Value::Array(values) = value {
246 values.as_slice()
247 } else {
248 std::slice::from_ref(value)
249 };
250
251 let tag_values = values
252 .iter()
253 .filter_map(|value| match value {
254 Value::Bytes(bytes) => {
255 Some(TagValue::Value(String::from_utf8_lossy(bytes).to_string()))
256 }
257 Value::Null => Some(TagValue::Bare),
258 _ => None,
259 })
260 .collect::<Vec<_>>();
261
262 metric.set_multi_value_tag(name, tag_values);
263 } else {
264 if let Ok(tag_value) = value.try_bytes_utf8_lossy().map(Cow::into_owned) {
266 metric.replace_tag(name, tag_value);
267 } else if value.is_null() {
268 metric.set_multi_value_tag(name, vec![TagValue::Bare]);
269 }
270 }
271}
272
273impl Target for VrlTarget {
274 fn target_insert(&mut self, target_path: &OwnedTargetPath, value: Value) -> Result<(), String> {
275 let path = &target_path.path;
276 match target_path.prefix {
277 PathPrefix::Event => match self {
278 VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
279 log.insert(path, value);
280 Ok(())
281 }
282 VrlTarget::Metric {
283 ref mut metric,
284 value: metric_value,
285 multi_value_tags,
286 } => {
287 if path.is_root() {
288 return Err(MetricPathError::SetPathError.to_string());
289 }
290
291 if let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
292 match paths.as_slice() {
293 ["tags"] => {
294 let value =
295 value.clone().try_object().map_err(|e| e.to_string())?;
296
297 metric.remove_tags();
298 for (field, value) in &value {
299 set_metric_tag_values(
300 field[..].into(),
301 value,
302 metric,
303 *multi_value_tags,
304 );
305 }
306 }
307 ["tags", field] => {
308 set_metric_tag_values(
309 (*field).to_owned(),
310 &value,
311 metric,
312 *multi_value_tags,
313 );
314 }
315 ["name"] => {
316 let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
317 metric.series.name.name =
318 String::from_utf8_lossy(&value).into_owned();
319 }
320 ["namespace"] => {
321 let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
322 metric.series.name.namespace =
323 Some(String::from_utf8_lossy(&value).into_owned());
324 }
325 ["interval_ms"] => {
326 let value: i64 =
327 value.clone().try_into_i64().map_err(|e| e.to_string())?;
328 let value: u32 = value
329 .try_into()
330 .map_err(|e: TryFromIntError| e.to_string())?;
331 let value = NonZero::try_from(value).map_err(|e| e.to_string())?;
332 metric.data.time.interval_ms = Some(value);
333 }
334 ["timestamp"] => {
335 let value =
336 value.clone().try_timestamp().map_err(|e| e.to_string())?;
337 metric.data.time.timestamp = Some(value);
338 }
339 ["kind"] => {
340 metric.data.kind = MetricKind::try_from(value.clone())?;
341 }
342 _ => {
343 return Err(MetricPathError::InvalidPath {
344 path: &path.to_string(),
345 expected: VALID_METRIC_PATHS_SET,
346 }
347 .to_string())
348 }
349 }
350
351 metric_value.insert(path, value);
352
353 return Ok(());
354 }
355
356 Err(MetricPathError::InvalidPath {
357 path: &path.to_string(),
358 expected: VALID_METRIC_PATHS_SET,
359 }
360 .to_string())
361 }
362 },
363 PathPrefix::Metadata => {
364 self.metadata_mut()
365 .value_mut()
366 .insert(&target_path.path, value);
367 Ok(())
368 }
369 }
370 }
371
372 #[allow(clippy::redundant_closure_for_method_calls)] fn target_get(&self, target_path: &OwnedTargetPath) -> Result<Option<&Value>, String> {
374 match target_path.prefix {
375 PathPrefix::Event => match self {
376 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
377 Ok(log.get(&target_path.path))
378 }
379 VrlTarget::Metric { value, .. } => target_get_metric(&target_path.path, value),
380 },
381 PathPrefix::Metadata => Ok(self.metadata().value().get(&target_path.path)),
382 }
383 }
384
385 fn target_get_mut(
386 &mut self,
387 target_path: &OwnedTargetPath,
388 ) -> Result<Option<&mut Value>, String> {
389 match target_path.prefix {
390 PathPrefix::Event => match self {
391 VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
392 Ok(log.get_mut(&target_path.path))
393 }
394 VrlTarget::Metric { value, .. } => target_get_mut_metric(&target_path.path, value),
395 },
396 PathPrefix::Metadata => Ok(self.metadata_mut().value_mut().get_mut(&target_path.path)),
397 }
398 }
399
400 fn target_remove(
401 &mut self,
402 target_path: &OwnedTargetPath,
403 compact: bool,
404 ) -> Result<Option<vrl::value::Value>, String> {
405 match target_path.prefix {
406 PathPrefix::Event => match self {
407 VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
408 Ok(log.remove(&target_path.path, compact))
409 }
410 VrlTarget::Metric {
411 ref mut metric,
412 value,
413 multi_value_tags: _,
414 } => {
415 if target_path.path.is_root() {
416 return Err(MetricPathError::SetPathError.to_string());
417 }
418
419 if let Some(paths) = target_path
420 .path
421 .to_alternative_components(MAX_METRIC_PATH_DEPTH)
422 {
423 let removed_value = match paths.as_slice() {
424 ["namespace"] => metric.series.name.namespace.take().map(Into::into),
425 ["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
426 ["interval_ms"] => metric
427 .data
428 .time
429 .interval_ms
430 .take()
431 .map(u32::from)
432 .map(Into::into),
433 ["tags"] => metric.series.tags.take().map(|map| {
434 map.into_iter_single()
435 .map(|(k, v)| (k, v.into()))
436 .collect::<vrl::value::Value>()
437 }),
438 ["tags", field] => metric.remove_tag(field).map(Into::into),
439 _ => {
440 return Err(MetricPathError::InvalidPath {
441 path: &target_path.path.to_string(),
442 expected: VALID_METRIC_PATHS_SET,
443 }
444 .to_string())
445 }
446 };
447
448 value.remove(&target_path.path, false);
449
450 Ok(removed_value)
451 } else {
452 Ok(None)
453 }
454 }
455 },
456 PathPrefix::Metadata => Ok(self
457 .metadata_mut()
458 .value_mut()
459 .remove(&target_path.path, compact)),
460 }
461 }
462}
463
464impl SecretTarget for VrlTarget {
465 fn get_secret(&self, key: &str) -> Option<&str> {
466 self.metadata().secrets().get_secret(key)
467 }
468
469 fn insert_secret(&mut self, key: &str, value: &str) {
470 self.metadata_mut().secrets_mut().insert_secret(key, value);
471 }
472
473 fn remove_secret(&mut self, key: &str) {
474 self.metadata_mut().secrets_mut().remove_secret(key);
475 }
476}
477
478fn target_get_metric<'a>(
491 path: &OwnedValuePath,
492 value: &'a Value,
493) -> Result<Option<&'a Value>, String> {
494 if path.is_root() {
495 return Ok(Some(value));
496 }
497
498 let value = value.get(path);
499
500 let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
501 return Ok(None);
502 };
503
504 match paths.as_slice() {
505 ["name"]
506 | ["kind"]
507 | ["type"]
508 | ["tags", _]
509 | ["namespace"]
510 | ["timestamp"]
511 | ["interval_ms"]
512 | ["tags"] => Ok(value),
513 _ => Err(MetricPathError::InvalidPath {
514 path: &path.to_string(),
515 expected: VALID_METRIC_PATHS_GET,
516 }
517 .to_string()),
518 }
519}
520
521fn target_get_mut_metric<'a>(
522 path: &OwnedValuePath,
523 value: &'a mut Value,
524) -> Result<Option<&'a mut Value>, String> {
525 if path.is_root() {
526 return Ok(Some(value));
527 }
528
529 let value = value.get_mut(path);
530
531 let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
532 return Ok(None);
533 };
534
535 match paths.as_slice() {
536 ["name"]
537 | ["kind"]
538 | ["tags", _]
539 | ["namespace"]
540 | ["timestamp"]
541 | ["interval_ms"]
542 | ["tags"] => Ok(value),
543 _ => Err(MetricPathError::InvalidPath {
544 path: &path.to_string(),
545 expected: VALID_METRIC_PATHS_SET,
546 }
547 .to_string()),
548 }
549}
550
551fn precompute_metric_value(metric: &Metric, info: &ProgramInfo, multi_value_tags: bool) -> Value {
556 struct MetricProperty {
557 property: &'static str,
558 getter: fn(&Metric) -> Option<Value>,
559 set: bool,
560 }
561
562 impl MetricProperty {
563 fn new(property: &'static str, getter: fn(&Metric) -> Option<Value>) -> Self {
564 Self {
565 property,
566 getter,
567 set: false,
568 }
569 }
570
571 fn insert(&mut self, metric: &Metric, map: &mut ObjectMap) {
572 if self.set {
573 return;
574 }
575 if let Some(value) = (self.getter)(metric) {
576 map.insert(self.property.into(), value);
577 self.set = true;
578 }
579 }
580 }
581
582 fn get_single_value_tags(metric: &Metric) -> Option<Value> {
583 metric.tags().cloned().map(|tags| {
584 tags.into_iter_single()
585 .map(|(tag, value)| (tag.into(), value.into()))
586 .collect::<ObjectMap>()
587 .into()
588 })
589 }
590
591 fn get_multi_value_tags(metric: &Metric) -> Option<Value> {
592 metric.tags().cloned().map(|tags| {
593 tags.iter_sets()
594 .map(|(tag, tag_set)| {
595 let array_values: Vec<Value> = tag_set
596 .iter()
597 .map(|v| match v {
598 Some(s) => Value::Bytes(s.as_bytes().to_vec().into()),
599 None => Value::Null,
600 })
601 .collect();
602 (tag.into(), Value::Array(array_values))
603 })
604 .collect::<ObjectMap>()
605 .into()
606 })
607 }
608
609 let mut name = MetricProperty::new("name", |metric| Some(metric.name().to_owned().into()));
610 let mut kind = MetricProperty::new("kind", |metric| Some(metric.kind().into()));
611 let mut type_ = MetricProperty::new("type", |metric| Some(metric.value().clone().into()));
612 let mut namespace = MetricProperty::new("namespace", |metric| {
613 metric.namespace().map(String::from).map(Into::into)
614 });
615 let mut interval_ms =
616 MetricProperty::new("interval_ms", |metric| metric.interval_ms().map(Into::into));
617 let mut timestamp =
618 MetricProperty::new("timestamp", |metric| metric.timestamp().map(Into::into));
619 let mut tags = MetricProperty::new(
620 "tags",
621 if multi_value_tags {
622 get_multi_value_tags
623 } else {
624 get_single_value_tags
625 },
626 );
627
628 let mut map = ObjectMap::default();
629
630 for target_path in &info.target_queries {
631 if target_path == &OwnedTargetPath::event_root() {
633 let mut properties = [
634 &mut name,
635 &mut kind,
636 &mut type_,
637 &mut namespace,
638 &mut interval_ms,
639 &mut timestamp,
640 &mut tags,
641 ];
642 properties
643 .iter_mut()
644 .for_each(|property| property.insert(metric, &mut map));
645 break;
646 }
647
648 if let Some(OwnedSegment::Field(field)) = target_path.path.segments.first() {
651 let property = match field.as_ref() {
652 "name" => Some(&mut name),
653 "kind" => Some(&mut kind),
654 "type" => Some(&mut type_),
655 "namespace" => Some(&mut namespace),
656 "timestamp" => Some(&mut timestamp),
657 "interval_ms" => Some(&mut interval_ms),
658 "tags" => Some(&mut tags),
659 _ => None,
660 };
661 if let Some(property) = property {
662 property.insert(metric, &mut map);
663 }
664 }
665 }
666
667 map.into()
668}
669
670#[derive(Debug, Snafu)]
671enum MetricPathError<'a> {
672 #[snafu(display("cannot set root path"))]
673 SetPathError,
674
675 #[snafu(display("invalid path {}: expected one of {}", path, expected))]
676 InvalidPath { path: &'a str, expected: &'a str },
677}
678
679#[cfg(test)]
680mod test {
681 use chrono::{offset::TimeZone, Utc};
682 use lookup::owned_value_path;
683 use similar_asserts::assert_eq;
684 use vrl::btreemap;
685 use vrl::value::kind::Index;
686
687 use super::super::MetricValue;
688 use super::*;
689 use crate::metric_tags;
690
691 #[test]
692 fn test_field_definitions_in_message() {
693 let definition =
694 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
695 assert_eq!(
696 Definition::new_with_default_metadata(
697 Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
698 [LogNamespace::Legacy]
699 ),
700 move_field_definitions_into_message(definition)
701 );
702
703 let definition = Definition::new_with_default_metadata(
705 Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
706 [LogNamespace::Legacy],
707 );
708 assert_eq!(
709 Definition::new_with_default_metadata(
710 Kind::object(BTreeMap::from([(
711 "message".into(),
712 Kind::bytes().or_integer()
713 )])),
714 [LogNamespace::Legacy]
715 ),
716 move_field_definitions_into_message(definition)
717 );
718 }
719
720 #[test]
721 fn test_merged_array_definitions_simple() {
722 let object: BTreeMap<vrl::value::kind::Field, Kind> = [
725 ("carrot".into(), Kind::bytes()),
726 ("potato".into(), Kind::integer()),
727 ]
728 .into();
729
730 let kind = Kind::array(Collection::from_unknown(Kind::object(object)));
731
732 let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
733
734 let kind = Kind::object(BTreeMap::from([
735 ("carrot".into(), Kind::bytes()),
736 ("potato".into(), Kind::integer()),
737 ]));
738
739 let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
740 let merged = merge_array_definitions(definition);
741
742 assert_eq!(wanted, merged);
743 }
744
745 #[test]
746 fn test_merged_array_definitions_complex() {
747 let object: BTreeMap<vrl::value::kind::Field, Kind> = [
750 ("carrot".into(), Kind::bytes()),
751 ("potato".into(), Kind::integer()),
752 ]
753 .into();
754
755 let array: BTreeMap<Index, Kind> = [
756 (Index::from(0), Kind::integer()),
757 (Index::from(1), Kind::boolean()),
758 (
759 Index::from(2),
760 Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
761 ),
762 ]
763 .into();
764
765 let mut kind = Kind::bytes();
766 kind.add_object(object);
767 kind.add_array(array);
768
769 let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
770
771 let mut kind = Kind::bytes();
772 kind.add_integer();
773 kind.add_boolean();
774 kind.add_object(BTreeMap::from([
775 ("carrot".into(), Kind::bytes().or_undefined()),
776 ("potato".into(), Kind::integer().or_undefined()),
777 ("peas".into(), Kind::bytes().or_undefined()),
778 ]));
779
780 let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
781 let merged = merge_array_definitions(definition);
782
783 assert_eq!(wanted, merged);
784 }
785
786 #[test]
787 fn log_get() {
788 let cases = vec![
789 (
790 BTreeMap::new(),
791 owned_value_path!(),
792 Ok(Some(BTreeMap::new().into())),
793 ),
794 (
795 BTreeMap::from([("foo".into(), "bar".into())]),
796 owned_value_path!(),
797 Ok(Some(BTreeMap::from([("foo".into(), "bar".into())]).into())),
798 ),
799 (
800 BTreeMap::from([("foo".into(), "bar".into())]),
801 owned_value_path!("foo"),
802 Ok(Some("bar".into())),
803 ),
804 (
805 BTreeMap::from([("foo".into(), "bar".into())]),
806 owned_value_path!("bar"),
807 Ok(None),
808 ),
809 (
810 btreemap! { "foo" => vec![btreemap! { "bar" => true }] },
811 owned_value_path!("foo", 0, "bar"),
812 Ok(Some(true.into())),
813 ),
814 (
815 btreemap! { "foo" => btreemap! { "bar baz" => btreemap! { "baz" => 2 } } },
816 owned_value_path!("foo", r"bar baz", "baz"),
817 Ok(Some(2.into())),
818 ),
819 ];
820
821 for (value, path, expect) in cases {
822 let value: ObjectMap = value;
823 let info = ProgramInfo {
824 fallible: false,
825 abortable: false,
826 target_queries: vec![],
827 target_assignments: vec![],
828 };
829 let target = VrlTarget::new(Event::Log(LogEvent::from(value)), &info, false);
830 let path = OwnedTargetPath::event(path);
831
832 assert_eq!(
833 Target::target_get(&target, &path).map(Option::<&Value>::cloned),
834 expect
835 );
836 }
837 }
838
839 #[allow(clippy::too_many_lines)]
840 #[test]
841 fn log_insert() {
842 let cases = vec![
843 (
844 BTreeMap::from([("foo".into(), "bar".into())]),
845 owned_value_path!(0),
846 btreemap! { "baz" => "qux" }.into(),
847 btreemap! { "baz" => "qux" },
848 Ok(()),
849 ),
850 (
851 BTreeMap::from([("foo".into(), "bar".into())]),
852 owned_value_path!("foo"),
853 "baz".into(),
854 btreemap! { "foo" => "baz" },
855 Ok(()),
856 ),
857 (
858 BTreeMap::from([("foo".into(), "bar".into())]),
859 owned_value_path!("foo", 2, "bar baz", "a", "b"),
860 true.into(),
861 btreemap! {
862 "foo" => vec![
863 Value::Null,
864 Value::Null,
865 btreemap! {
866 "bar baz" => btreemap! { "a" => btreemap! { "b" => true } },
867 }.into()
868 ]
869 },
870 Ok(()),
871 ),
872 (
873 btreemap! { "foo" => vec![0, 1, 2] },
874 owned_value_path!("foo", 5),
875 "baz".into(),
876 btreemap! {
877 "foo" => vec![
878 0.into(),
879 1.into(),
880 2.into(),
881 Value::Null,
882 Value::Null,
883 Value::from("baz"),
884 ],
885 },
886 Ok(()),
887 ),
888 (
889 BTreeMap::from([("foo".into(), "bar".into())]),
890 owned_value_path!("foo", 0),
891 "baz".into(),
892 btreemap! { "foo" => vec!["baz"] },
893 Ok(()),
894 ),
895 (
896 btreemap! { "foo" => Value::Array(vec![]) },
897 owned_value_path!("foo", 0),
898 "baz".into(),
899 btreemap! { "foo" => vec!["baz"] },
900 Ok(()),
901 ),
902 (
903 btreemap! { "foo" => Value::Array(vec![0.into()]) },
904 owned_value_path!("foo", 0),
905 "baz".into(),
906 btreemap! { "foo" => vec!["baz"] },
907 Ok(()),
908 ),
909 (
910 btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
911 owned_value_path!("foo", 0),
912 "baz".into(),
913 btreemap! { "foo" => Value::Array(vec!["baz".into(), 1.into()]) },
914 Ok(()),
915 ),
916 (
917 btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
918 owned_value_path!("foo", 1),
919 "baz".into(),
920 btreemap! { "foo" => Value::Array(vec![0.into(), "baz".into()]) },
921 Ok(()),
922 ),
923 ];
924
925 for (object, path, value, expect, result) in cases {
926 let object: ObjectMap = object;
927 let info = ProgramInfo {
928 fallible: false,
929 abortable: false,
930 target_queries: vec![],
931 target_assignments: vec![],
932 };
933 let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
934 let expect = LogEvent::from(expect);
935 let value: Value = value;
936 let path = OwnedTargetPath::event(path);
937
938 assert_eq!(
939 Target::target_insert(&mut target, &path, value.clone()),
940 result
941 );
942 assert_eq!(
943 Target::target_get(&target, &path).map(Option::<&Value>::cloned),
944 Ok(Some(value))
945 );
946 assert_eq!(
947 match target.into_events(LogNamespace::Legacy) {
948 TargetEvents::One(event) => vec![event],
949 TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
950 TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
951 }
952 .first()
953 .cloned()
954 .unwrap(),
955 Event::Log(expect)
956 );
957 }
958 }
959
960 #[test]
961 fn log_remove() {
962 let cases = vec![
963 (
964 BTreeMap::from([("foo".into(), "bar".into())]),
965 owned_value_path!("foo"),
966 false,
967 Some(BTreeMap::new().into()),
968 ),
969 (
970 BTreeMap::from([("foo".into(), "bar".into())]),
971 owned_value_path!(r"foo bar", "foo"),
972 false,
973 Some(btreemap! { "foo" => "bar"}.into()),
974 ),
975 (
976 btreemap! { "foo" => "bar", "baz" => "qux" },
977 owned_value_path!(),
978 false,
979 Some(BTreeMap::new().into()),
980 ),
981 (
982 btreemap! { "foo" => "bar", "baz" => "qux" },
983 owned_value_path!(),
984 true,
985 Some(BTreeMap::new().into()),
986 ),
987 (
988 btreemap! { "foo" => vec![0] },
989 owned_value_path!("foo", 0),
990 false,
991 Some(btreemap! { "foo" => Value::Array(vec![]) }.into()),
992 ),
993 (
994 btreemap! { "foo" => vec![0] },
995 owned_value_path!("foo", 0),
996 true,
997 Some(BTreeMap::new().into()),
998 ),
999 (
1000 btreemap! {
1001 "foo" => btreemap! { "bar baz" => vec![0] },
1002 "bar" => "baz",
1003 },
1004 owned_value_path!("foo", r"bar baz", 0),
1005 false,
1006 Some(
1007 btreemap! {
1008 "foo" => btreemap! { "bar baz" => Value::Array(vec![]) },
1009 "bar" => "baz",
1010 }
1011 .into(),
1012 ),
1013 ),
1014 (
1015 btreemap! {
1016 "foo" => btreemap! { "bar baz" => vec![0] },
1017 "bar" => "baz",
1018 },
1019 owned_value_path!("foo", r"bar baz", 0),
1020 true,
1021 Some(btreemap! { "bar" => "baz" }.into()),
1022 ),
1023 ];
1024
1025 for (object, path, compact, expect) in cases {
1026 let info = ProgramInfo {
1027 fallible: false,
1028 abortable: false,
1029 target_queries: vec![],
1030 target_assignments: vec![],
1031 };
1032 let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
1033 let path = OwnedTargetPath::event(path);
1034 let removed = Target::target_get(&target, &path).unwrap().cloned();
1035
1036 assert_eq!(
1037 Target::target_remove(&mut target, &path, compact),
1038 Ok(removed)
1039 );
1040 assert_eq!(
1041 Target::target_get(&target, &OwnedTargetPath::event_root())
1042 .map(Option::<&Value>::cloned),
1043 Ok(expect)
1044 );
1045 }
1046 }
1047
1048 #[test]
1049 fn log_into_events() {
1050 use vrl::btreemap;
1051
1052 let cases = vec![
1053 (
1054 Value::from(btreemap! {"foo" => "bar"}),
1055 vec![btreemap! {"foo" => "bar"}],
1056 ),
1057 (Value::from(1), vec![btreemap! {"message" => 1}]),
1058 (Value::from("2"), vec![btreemap! {"message" => "2"}]),
1059 (Value::from(true), vec![btreemap! {"message" => true}]),
1060 (
1061 Value::from(vec![
1062 Value::from(1),
1063 Value::from("2"),
1064 Value::from(true),
1065 Value::from(btreemap! {"foo" => "bar"}),
1066 ]),
1067 vec![
1068 btreemap! {"message" => 1},
1069 btreemap! {"message" => "2"},
1070 btreemap! {"message" => true},
1071 btreemap! {"foo" => "bar"},
1072 ],
1073 ),
1074 ];
1075
1076 for (value, expect) in cases {
1077 let metadata = EventMetadata::default();
1078 let info = ProgramInfo {
1079 fallible: false,
1080 abortable: false,
1081 target_queries: vec![],
1082 target_assignments: vec![],
1083 };
1084 let mut target = VrlTarget::new(
1085 Event::Log(LogEvent::new_with_metadata(metadata.clone())),
1086 &info,
1087 false,
1088 );
1089
1090 Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();
1091
1092 assert_eq!(
1093 match target.into_events(LogNamespace::Legacy) {
1094 TargetEvents::One(event) => vec![event],
1095 TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
1096 TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
1097 },
1098 expect
1099 .into_iter()
1100 .map(|v| Event::Log(LogEvent::from_map(v, metadata.clone())))
1101 .collect::<Vec<_>>()
1102 );
1103 }
1104 }
1105
1106 #[test]
1107 fn metric_all_fields() {
1108 let metric = Metric::new(
1109 "zub",
1110 MetricKind::Absolute,
1111 MetricValue::Counter { value: 1.23 },
1112 )
1113 .with_namespace(Some("zoob"))
1114 .with_tags(Some(metric_tags!("tig" => "tog")))
1115 .with_timestamp(Some(
1116 Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0)
1117 .single()
1118 .expect("invalid timestamp"),
1119 ))
1120 .with_interval_ms(Some(NonZero::<u32>::new(507).unwrap()));
1121
1122 let info = ProgramInfo {
1123 fallible: false,
1124 abortable: false,
1125 target_queries: vec![
1126 OwnedTargetPath::event(owned_value_path!("name")),
1127 OwnedTargetPath::event(owned_value_path!("namespace")),
1128 OwnedTargetPath::event(owned_value_path!("interval_ms")),
1129 OwnedTargetPath::event(owned_value_path!("timestamp")),
1130 OwnedTargetPath::event(owned_value_path!("kind")),
1131 OwnedTargetPath::event(owned_value_path!("type")),
1132 OwnedTargetPath::event(owned_value_path!("tags")),
1133 ],
1134 target_assignments: vec![],
1135 };
1136 let target = VrlTarget::new(Event::Metric(metric), &info, false);
1137
1138 assert_eq!(
1139 Ok(Some(
1140 btreemap! {
1141 "name" => "zub",
1142 "namespace" => "zoob",
1143 "interval_ms" => 507,
1144 "timestamp" => Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0).single().expect("invalid timestamp"),
1145 "tags" => btreemap! { "tig" => "tog" },
1146 "kind" => "absolute",
1147 "type" => "counter",
1148 }
1149 .into()
1150 )),
1151 target
1152 .target_get(&OwnedTargetPath::event_root())
1153 .map(Option::<&Value>::cloned)
1154 );
1155 }
1156
1157 #[test]
1158 fn metric_fields() {
1159 struct Case {
1160 path: OwnedValuePath,
1161 current: Option<Value>,
1162 new: Value,
1163 delete: bool,
1164 }
1165
1166 let metric = Metric::new(
1167 "name",
1168 MetricKind::Absolute,
1169 MetricValue::Counter { value: 1.23 },
1170 )
1171 .with_tags(Some(metric_tags!("tig" => "tog")));
1172
1173 let cases = vec![
1174 Case {
1175 path: owned_value_path!("name"),
1176 current: Some(Value::from("name")),
1177 new: Value::from("namefoo"),
1178 delete: false,
1179 },
1180 Case {
1181 path: owned_value_path!("namespace"),
1182 current: None,
1183 new: "namespacefoo".into(),
1184 delete: true,
1185 },
1186 Case {
1187 path: owned_value_path!("timestamp"),
1188 current: None,
1189 new: Utc
1190 .with_ymd_and_hms(2020, 12, 8, 12, 0, 0)
1191 .single()
1192 .expect("invalid timestamp")
1193 .into(),
1194 delete: true,
1195 },
1196 Case {
1197 path: owned_value_path!("interval_ms"),
1198 current: None,
1199 new: 123_456.into(),
1200 delete: true,
1201 },
1202 Case {
1203 path: owned_value_path!("kind"),
1204 current: Some(Value::from("absolute")),
1205 new: "incremental".into(),
1206 delete: false,
1207 },
1208 Case {
1209 path: owned_value_path!("tags", "thing"),
1210 current: None,
1211 new: "footag".into(),
1212 delete: true,
1213 },
1214 ];
1215
1216 let info = ProgramInfo {
1217 fallible: false,
1218 abortable: false,
1219 target_queries: vec![
1220 OwnedTargetPath::event(owned_value_path!("name")),
1221 OwnedTargetPath::event(owned_value_path!("namespace")),
1222 OwnedTargetPath::event(owned_value_path!("timestamp")),
1223 OwnedTargetPath::event(owned_value_path!("interval_ms")),
1224 OwnedTargetPath::event(owned_value_path!("kind")),
1225 ],
1226 target_assignments: vec![],
1227 };
1228 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1229
1230 for Case {
1231 path,
1232 current,
1233 new,
1234 delete,
1235 } in cases
1236 {
1237 let path = OwnedTargetPath::event(path);
1238
1239 assert_eq!(
1240 Ok(current),
1241 target.target_get(&path).map(Option::<&Value>::cloned)
1242 );
1243 assert_eq!(Ok(()), target.target_insert(&path, new.clone()));
1244 assert_eq!(
1245 Ok(Some(new.clone())),
1246 target.target_get(&path).map(Option::<&Value>::cloned)
1247 );
1248
1249 if delete {
1250 assert_eq!(Ok(Some(new)), target.target_remove(&path, true));
1251 assert_eq!(
1252 Ok(None),
1253 target.target_get(&path).map(Option::<&Value>::cloned)
1254 );
1255 }
1256 }
1257 }
1258
1259 #[test]
1260 fn metric_set_tags() {
1261 let metric = Metric::new(
1262 "name",
1263 MetricKind::Absolute,
1264 MetricValue::Counter { value: 1.23 },
1265 )
1266 .with_tags(Some(metric_tags!("tig" => "tog")));
1267
1268 let info = ProgramInfo {
1269 fallible: false,
1270 abortable: false,
1271 target_queries: vec![],
1272 target_assignments: vec![],
1273 };
1274 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1275 let _result = target.target_insert(
1276 &OwnedTargetPath::event(owned_value_path!("tags")),
1277 Value::Object(BTreeMap::from([("a".into(), "b".into())])),
1278 );
1279
1280 match target {
1281 VrlTarget::Metric {
1282 metric,
1283 value: _,
1284 multi_value_tags: _,
1285 } => {
1286 assert!(metric.tags().is_some());
1287 assert_eq!(metric.tags().unwrap(), &crate::metric_tags!("a" => "b"));
1288 }
1289 _ => panic!("must be a metric"),
1290 }
1291 }
1292
1293 #[test]
1294 fn metric_invalid_paths() {
1295 let metric = Metric::new(
1296 "name",
1297 MetricKind::Absolute,
1298 MetricValue::Counter { value: 1.23 },
1299 );
1300
1301 let validpaths_get = [
1302 ".name",
1303 ".namespace",
1304 ".interval_ms",
1305 ".timestamp",
1306 ".kind",
1307 ".tags",
1308 ".type",
1309 ];
1310
1311 let validpaths_set = [
1312 ".name",
1313 ".namespace",
1314 ".interval_ms",
1315 ".timestamp",
1316 ".kind",
1317 ".tags",
1318 ];
1319
1320 let info = ProgramInfo {
1321 fallible: false,
1322 abortable: false,
1323 target_queries: vec![],
1324 target_assignments: vec![],
1325 };
1326 let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1327
1328 assert_eq!(
1329 Err(format!(
1330 "invalid path zork: expected one of {}",
1331 validpaths_get.join(", ")
1332 )),
1333 target.target_get(&OwnedTargetPath::event(owned_value_path!("zork")))
1334 );
1335
1336 assert_eq!(
1337 Err(format!(
1338 "invalid path zork: expected one of {}",
1339 validpaths_set.join(", ")
1340 )),
1341 target.target_insert(
1342 &OwnedTargetPath::event(owned_value_path!("zork")),
1343 "thing".into()
1344 )
1345 );
1346
1347 assert_eq!(
1348 Err(format!(
1349 "invalid path zork: expected one of {}",
1350 validpaths_set.join(", ")
1351 )),
1352 target.target_remove(&OwnedTargetPath::event(owned_value_path!("zork")), true)
1353 );
1354
1355 assert_eq!(
1356 Err(format!(
1357 "invalid path tags.foo.flork: expected one of {}",
1358 validpaths_get.join(", ")
1359 )),
1360 target.target_get(&OwnedTargetPath::event(owned_value_path!(
1361 "tags", "foo", "flork"
1362 )))
1363 );
1364 }
1365
1366 #[test]
1367 fn test_metric_insert_get_multi_value_tag() {
1368 let metric = Metric::new(
1369 "name",
1370 MetricKind::Absolute,
1371 MetricValue::Counter { value: 1.23 },
1372 );
1373 let info = ProgramInfo {
1374 fallible: false,
1375 abortable: false,
1376 target_queries: vec![],
1377 target_assignments: vec![],
1378 };
1379
1380 let mut target = VrlTarget::new(Event::Metric(metric), &info, true);
1381
1382 let value = Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()]);
1383 target
1384 .target_insert(
1385 &OwnedTargetPath::event(owned_value_path!("tags", "foo")),
1386 value,
1387 )
1388 .unwrap();
1389
1390 let vrl_tags_value = target
1391 .target_get(&OwnedTargetPath::event(owned_value_path!("tags")))
1392 .unwrap()
1393 .unwrap();
1394
1395 assert_eq!(
1396 vrl_tags_value,
1397 &Value::Object(BTreeMap::from([(
1398 "foo".into(),
1399 Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()])
1400 )]))
1401 );
1402
1403 let VrlTarget::Metric { metric, .. } = target else {
1404 unreachable!()
1405 };
1406
1407 assert_eq!(metric.tag_value("foo"), Some("b".into()));
1409 }
1410}