vector_core/event/
vrl_target.rs

1use std::borrow::Cow;
2use std::num::{NonZero, TryFromIntError};
3use std::{collections::BTreeMap, convert::TryFrom, marker::PhantomData};
4
5use lookup::lookup_v2::OwnedSegment;
6use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix};
7use snafu::Snafu;
8use vrl::compiler::value::VrlValueConvert;
9use vrl::compiler::{ProgramInfo, SecretTarget, Target};
10use vrl::prelude::Collection;
11use vrl::value::{Kind, ObjectMap, Value};
12
13use super::{metric::TagValue, Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent};
14use crate::config::{log_schema, LogNamespace};
15use crate::schema::Definition;
16
17const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .interval_ms, .timestamp, .kind, .tags";
18
19/// We can get the `type` of the metric in Remap, but can't set it.
20const VALID_METRIC_PATHS_GET: &str =
21    ".name, .namespace, .interval_ms, .timestamp, .kind, .tags, .type";
22
23/// Metrics aren't interested in paths that have a length longer than 3.
24///
25/// The longest path is 2, and we need to check that a third segment doesn't exist as we don't want
26/// fields such as `.tags.host.thing`.
27const MAX_METRIC_PATH_DEPTH: usize = 3;
28
29/// An adapter to turn `Event`s into `vrl_lib::Target`s.
30#[allow(clippy::large_enum_variant)]
31#[derive(Debug, Clone)]
32pub enum VrlTarget {
33    // `LogEvent` is essentially just a destructured `event::LogEvent`, but without the semantics
34    // that `fields` must always be a `Map` variant.
35    LogEvent(Value, EventMetadata),
36    Metric {
37        metric: Metric,
38        value: Value,
39        multi_value_tags: bool,
40    },
41    Trace(Value, EventMetadata),
42}
43
44pub enum TargetEvents {
45    One(Event),
46    Logs(TargetIter<LogEvent>),
47    Traces(TargetIter<TraceEvent>),
48}
49
50pub struct TargetIter<T> {
51    iter: std::vec::IntoIter<Value>,
52    metadata: EventMetadata,
53    _marker: PhantomData<T>,
54    log_namespace: LogNamespace,
55}
56
57fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
58    let mut log = LogEvent::new_with_metadata(metadata);
59    log.maybe_insert(log_schema().message_key_target_path(), value);
60    log
61}
62
63impl Iterator for TargetIter<LogEvent> {
64    type Item = Event;
65
66    fn next(&mut self) -> Option<Self::Item> {
67        self.iter.next().map(|v| {
68            match self.log_namespace {
69                LogNamespace::Legacy => match v {
70                    value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()),
71                    value => create_log_event(value, self.metadata.clone()),
72                },
73                LogNamespace::Vector => LogEvent::from_parts(v, self.metadata.clone()),
74            }
75            .into()
76        })
77    }
78}
79
80impl Iterator for TargetIter<TraceEvent> {
81    type Item = Event;
82
83    fn next(&mut self) -> Option<Self::Item> {
84        self.iter.next().map(|v| {
85            match v {
86                value @ Value::Object(_) => {
87                    TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone()))
88                }
89                value => TraceEvent::from(create_log_event(value, self.metadata.clone())),
90            }
91            .into()
92        })
93    }
94}
95
96impl VrlTarget {
97    pub fn new(event: Event, info: &ProgramInfo, multi_value_metric_tags: bool) -> Self {
98        match event {
99            Event::Log(event) => {
100                let (value, metadata) = event.into_parts();
101                VrlTarget::LogEvent(value, metadata)
102            }
103            Event::Metric(metric) => {
104                // We pre-generate [`Value`] types for the metric fields accessed in
105                // the event. This allows us to then return references to those
106                // values, even if the field is accessed more than once.
107                let value = precompute_metric_value(&metric, info, multi_value_metric_tags);
108
109                VrlTarget::Metric {
110                    metric,
111                    value,
112                    multi_value_tags: multi_value_metric_tags,
113                }
114            }
115            Event::Trace(event) => {
116                let (fields, metadata) = event.into_parts();
117                VrlTarget::Trace(Value::Object(fields), metadata)
118            }
119        }
120    }
121
122    /// Modifies a schema in the same way that the `into_events` function modifies the event
123    pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
124        let log_namespaces = input.log_namespaces().clone();
125
126        // both namespaces merge arrays, but only `Legacy` moves field definitions into a "message" field.
127        let merged_arrays = merge_array_definitions(input);
128        Definition::combine_log_namespaces(
129            &log_namespaces,
130            move_field_definitions_into_message(merged_arrays.clone()),
131            merged_arrays,
132        )
133    }
134
135    /// Turn the target back into events.
136    ///
137    /// This returns an iterator of events as one event can be turned into multiple by assigning an
138    /// array to `.` in VRL.
139    pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
140        match self {
141            VrlTarget::LogEvent(value, metadata) => match value {
142                value @ Value::Object(_) => {
143                    TargetEvents::One(LogEvent::from_parts(value, metadata).into())
144                }
145
146                Value::Array(values) => TargetEvents::Logs(TargetIter {
147                    iter: values.into_iter(),
148                    metadata,
149                    _marker: PhantomData,
150                    log_namespace,
151                }),
152
153                v => match log_namespace {
154                    LogNamespace::Vector => {
155                        TargetEvents::One(LogEvent::from_parts(v, metadata).into())
156                    }
157                    LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()),
158                },
159            },
160            VrlTarget::Trace(value, metadata) => match value {
161                value @ Value::Object(_) => {
162                    let log = LogEvent::from_parts(value, metadata);
163                    TargetEvents::One(TraceEvent::from(log).into())
164                }
165
166                Value::Array(values) => TargetEvents::Traces(TargetIter {
167                    iter: values.into_iter(),
168                    metadata,
169                    _marker: PhantomData,
170                    log_namespace,
171                }),
172
173                v => TargetEvents::One(create_log_event(v, metadata).into()),
174            },
175            VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)),
176        }
177    }
178
179    fn metadata(&self) -> &EventMetadata {
180        match self {
181            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
182            VrlTarget::Metric { metric, .. } => metric.metadata(),
183        }
184    }
185
186    fn metadata_mut(&mut self) -> &mut EventMetadata {
187        match self {
188            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
189            VrlTarget::Metric { metric, .. } => metric.metadata_mut(),
190        }
191    }
192}
193
194/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]),
195/// or an object, that data is moved into the `message` field.
196fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
197    let mut message = definition.event_kind().clone();
198    message.remove_object();
199    message.remove_array();
200
201    if !message.is_never() {
202        if let Some(message_key) = log_schema().message_key() {
203            // We need to add the given message type to a field called `message`
204            // in the event.
205            let message = Kind::object(Collection::from(BTreeMap::from([(
206                message_key.to_string().into(),
207                message,
208            )])));
209
210            definition.event_kind_mut().remove_bytes();
211            definition.event_kind_mut().remove_integer();
212            definition.event_kind_mut().remove_float();
213            definition.event_kind_mut().remove_boolean();
214            definition.event_kind_mut().remove_timestamp();
215            definition.event_kind_mut().remove_regex();
216            definition.event_kind_mut().remove_null();
217
218            *definition.event_kind_mut() = definition.event_kind().union(message);
219        }
220    }
221
222    definition
223}
224
225/// If the transform returns an array, the elements of this array will be separated
226/// out into it's individual elements and passed downstream.
227///
228/// The potential types that the transform can output are any of the arrays
229/// elements or any non-array elements that are within the definition. All these
230/// definitions need to be merged together.
231fn merge_array_definitions(mut definition: Definition) -> Definition {
232    if let Some(array) = definition.event_kind().as_array() {
233        let array_kinds = array.reduced_kind();
234
235        let kind = definition.event_kind_mut();
236        kind.remove_array();
237        *kind = kind.union(array_kinds);
238    }
239
240    definition
241}
242
243fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
244    if multi_value_tags {
245        let values = if let Value::Array(values) = value {
246            values.as_slice()
247        } else {
248            std::slice::from_ref(value)
249        };
250
251        let tag_values = values
252            .iter()
253            .filter_map(|value| match value {
254                Value::Bytes(bytes) => {
255                    Some(TagValue::Value(String::from_utf8_lossy(bytes).to_string()))
256                }
257                Value::Null => Some(TagValue::Bare),
258                _ => None,
259            })
260            .collect::<Vec<_>>();
261
262        metric.set_multi_value_tag(name, tag_values);
263    } else {
264        // set a single tag value
265        if let Ok(tag_value) = value.try_bytes_utf8_lossy().map(Cow::into_owned) {
266            metric.replace_tag(name, tag_value);
267        } else if value.is_null() {
268            metric.set_multi_value_tag(name, vec![TagValue::Bare]);
269        }
270    }
271}
272
273impl Target for VrlTarget {
274    fn target_insert(&mut self, target_path: &OwnedTargetPath, value: Value) -> Result<(), String> {
275        let path = &target_path.path;
276        match target_path.prefix {
277            PathPrefix::Event => match self {
278                VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
279                    log.insert(path, value);
280                    Ok(())
281                }
282                VrlTarget::Metric {
283                    ref mut metric,
284                    value: metric_value,
285                    multi_value_tags,
286                } => {
287                    if path.is_root() {
288                        return Err(MetricPathError::SetPathError.to_string());
289                    }
290
291                    if let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
292                        match paths.as_slice() {
293                            ["tags"] => {
294                                let value =
295                                    value.clone().try_object().map_err(|e| e.to_string())?;
296
297                                metric.remove_tags();
298                                for (field, value) in &value {
299                                    set_metric_tag_values(
300                                        field[..].into(),
301                                        value,
302                                        metric,
303                                        *multi_value_tags,
304                                    );
305                                }
306                            }
307                            ["tags", field] => {
308                                set_metric_tag_values(
309                                    (*field).to_owned(),
310                                    &value,
311                                    metric,
312                                    *multi_value_tags,
313                                );
314                            }
315                            ["name"] => {
316                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
317                                metric.series.name.name =
318                                    String::from_utf8_lossy(&value).into_owned();
319                            }
320                            ["namespace"] => {
321                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
322                                metric.series.name.namespace =
323                                    Some(String::from_utf8_lossy(&value).into_owned());
324                            }
325                            ["interval_ms"] => {
326                                let value: i64 =
327                                    value.clone().try_into_i64().map_err(|e| e.to_string())?;
328                                let value: u32 = value
329                                    .try_into()
330                                    .map_err(|e: TryFromIntError| e.to_string())?;
331                                let value = NonZero::try_from(value).map_err(|e| e.to_string())?;
332                                metric.data.time.interval_ms = Some(value);
333                            }
334                            ["timestamp"] => {
335                                let value =
336                                    value.clone().try_timestamp().map_err(|e| e.to_string())?;
337                                metric.data.time.timestamp = Some(value);
338                            }
339                            ["kind"] => {
340                                metric.data.kind = MetricKind::try_from(value.clone())?;
341                            }
342                            _ => {
343                                return Err(MetricPathError::InvalidPath {
344                                    path: &path.to_string(),
345                                    expected: VALID_METRIC_PATHS_SET,
346                                }
347                                .to_string())
348                            }
349                        }
350
351                        metric_value.insert(path, value);
352
353                        return Ok(());
354                    }
355
356                    Err(MetricPathError::InvalidPath {
357                        path: &path.to_string(),
358                        expected: VALID_METRIC_PATHS_SET,
359                    }
360                    .to_string())
361                }
362            },
363            PathPrefix::Metadata => {
364                self.metadata_mut()
365                    .value_mut()
366                    .insert(&target_path.path, value);
367                Ok(())
368            }
369        }
370    }
371
372    #[allow(clippy::redundant_closure_for_method_calls)] // false positive
373    fn target_get(&self, target_path: &OwnedTargetPath) -> Result<Option<&Value>, String> {
374        match target_path.prefix {
375            PathPrefix::Event => match self {
376                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
377                    Ok(log.get(&target_path.path))
378                }
379                VrlTarget::Metric { value, .. } => target_get_metric(&target_path.path, value),
380            },
381            PathPrefix::Metadata => Ok(self.metadata().value().get(&target_path.path)),
382        }
383    }
384
385    fn target_get_mut(
386        &mut self,
387        target_path: &OwnedTargetPath,
388    ) -> Result<Option<&mut Value>, String> {
389        match target_path.prefix {
390            PathPrefix::Event => match self {
391                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
392                    Ok(log.get_mut(&target_path.path))
393                }
394                VrlTarget::Metric { value, .. } => target_get_mut_metric(&target_path.path, value),
395            },
396            PathPrefix::Metadata => Ok(self.metadata_mut().value_mut().get_mut(&target_path.path)),
397        }
398    }
399
400    fn target_remove(
401        &mut self,
402        target_path: &OwnedTargetPath,
403        compact: bool,
404    ) -> Result<Option<vrl::value::Value>, String> {
405        match target_path.prefix {
406            PathPrefix::Event => match self {
407                VrlTarget::LogEvent(ref mut log, _) | VrlTarget::Trace(ref mut log, _) => {
408                    Ok(log.remove(&target_path.path, compact))
409                }
410                VrlTarget::Metric {
411                    ref mut metric,
412                    value,
413                    multi_value_tags: _,
414                } => {
415                    if target_path.path.is_root() {
416                        return Err(MetricPathError::SetPathError.to_string());
417                    }
418
419                    if let Some(paths) = target_path
420                        .path
421                        .to_alternative_components(MAX_METRIC_PATH_DEPTH)
422                    {
423                        let removed_value = match paths.as_slice() {
424                            ["namespace"] => metric.series.name.namespace.take().map(Into::into),
425                            ["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
426                            ["interval_ms"] => metric
427                                .data
428                                .time
429                                .interval_ms
430                                .take()
431                                .map(u32::from)
432                                .map(Into::into),
433                            ["tags"] => metric.series.tags.take().map(|map| {
434                                map.into_iter_single()
435                                    .map(|(k, v)| (k, v.into()))
436                                    .collect::<vrl::value::Value>()
437                            }),
438                            ["tags", field] => metric.remove_tag(field).map(Into::into),
439                            _ => {
440                                return Err(MetricPathError::InvalidPath {
441                                    path: &target_path.path.to_string(),
442                                    expected: VALID_METRIC_PATHS_SET,
443                                }
444                                .to_string())
445                            }
446                        };
447
448                        value.remove(&target_path.path, false);
449
450                        Ok(removed_value)
451                    } else {
452                        Ok(None)
453                    }
454                }
455            },
456            PathPrefix::Metadata => Ok(self
457                .metadata_mut()
458                .value_mut()
459                .remove(&target_path.path, compact)),
460        }
461    }
462}
463
464impl SecretTarget for VrlTarget {
465    fn get_secret(&self, key: &str) -> Option<&str> {
466        self.metadata().secrets().get_secret(key)
467    }
468
469    fn insert_secret(&mut self, key: &str, value: &str) {
470        self.metadata_mut().secrets_mut().insert_secret(key, value);
471    }
472
473    fn remove_secret(&mut self, key: &str) {
474        self.metadata_mut().secrets_mut().remove_secret(key);
475    }
476}
477
478/// Retrieves a value from a the provided metric using the path.
479/// Currently the root path and the following paths are supported:
480/// - `name`
481/// - `namespace`
482/// - `interval_ms`
483/// - `timestamp`
484/// - `kind`
485/// - `tags`
486/// - `tags.<tagname>`
487/// - `type`
488///
489/// Any other paths result in a `MetricPathError::InvalidPath` being returned.
490fn target_get_metric<'a>(
491    path: &OwnedValuePath,
492    value: &'a Value,
493) -> Result<Option<&'a Value>, String> {
494    if path.is_root() {
495        return Ok(Some(value));
496    }
497
498    let value = value.get(path);
499
500    let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
501        return Ok(None);
502    };
503
504    match paths.as_slice() {
505        ["name"]
506        | ["kind"]
507        | ["type"]
508        | ["tags", _]
509        | ["namespace"]
510        | ["timestamp"]
511        | ["interval_ms"]
512        | ["tags"] => Ok(value),
513        _ => Err(MetricPathError::InvalidPath {
514            path: &path.to_string(),
515            expected: VALID_METRIC_PATHS_GET,
516        }
517        .to_string()),
518    }
519}
520
521fn target_get_mut_metric<'a>(
522    path: &OwnedValuePath,
523    value: &'a mut Value,
524) -> Result<Option<&'a mut Value>, String> {
525    if path.is_root() {
526        return Ok(Some(value));
527    }
528
529    let value = value.get_mut(path);
530
531    let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
532        return Ok(None);
533    };
534
535    match paths.as_slice() {
536        ["name"]
537        | ["kind"]
538        | ["tags", _]
539        | ["namespace"]
540        | ["timestamp"]
541        | ["interval_ms"]
542        | ["tags"] => Ok(value),
543        _ => Err(MetricPathError::InvalidPath {
544            path: &path.to_string(),
545            expected: VALID_METRIC_PATHS_SET,
546        }
547        .to_string()),
548    }
549}
550
551/// pre-compute the `Value` structure of the metric.
552///
553/// This structure is partially populated based on the fields accessed by
554/// the VRL program as informed by `ProgramInfo`.
555fn precompute_metric_value(metric: &Metric, info: &ProgramInfo, multi_value_tags: bool) -> Value {
556    struct MetricProperty {
557        property: &'static str,
558        getter: fn(&Metric) -> Option<Value>,
559        set: bool,
560    }
561
562    impl MetricProperty {
563        fn new(property: &'static str, getter: fn(&Metric) -> Option<Value>) -> Self {
564            Self {
565                property,
566                getter,
567                set: false,
568            }
569        }
570
571        fn insert(&mut self, metric: &Metric, map: &mut ObjectMap) {
572            if self.set {
573                return;
574            }
575            if let Some(value) = (self.getter)(metric) {
576                map.insert(self.property.into(), value);
577                self.set = true;
578            }
579        }
580    }
581
582    fn get_single_value_tags(metric: &Metric) -> Option<Value> {
583        metric.tags().cloned().map(|tags| {
584            tags.into_iter_single()
585                .map(|(tag, value)| (tag.into(), value.into()))
586                .collect::<ObjectMap>()
587                .into()
588        })
589    }
590
591    fn get_multi_value_tags(metric: &Metric) -> Option<Value> {
592        metric.tags().cloned().map(|tags| {
593            tags.iter_sets()
594                .map(|(tag, tag_set)| {
595                    let array_values: Vec<Value> = tag_set
596                        .iter()
597                        .map(|v| match v {
598                            Some(s) => Value::Bytes(s.as_bytes().to_vec().into()),
599                            None => Value::Null,
600                        })
601                        .collect();
602                    (tag.into(), Value::Array(array_values))
603                })
604                .collect::<ObjectMap>()
605                .into()
606        })
607    }
608
609    let mut name = MetricProperty::new("name", |metric| Some(metric.name().to_owned().into()));
610    let mut kind = MetricProperty::new("kind", |metric| Some(metric.kind().into()));
611    let mut type_ = MetricProperty::new("type", |metric| Some(metric.value().clone().into()));
612    let mut namespace = MetricProperty::new("namespace", |metric| {
613        metric.namespace().map(String::from).map(Into::into)
614    });
615    let mut interval_ms =
616        MetricProperty::new("interval_ms", |metric| metric.interval_ms().map(Into::into));
617    let mut timestamp =
618        MetricProperty::new("timestamp", |metric| metric.timestamp().map(Into::into));
619    let mut tags = MetricProperty::new(
620        "tags",
621        if multi_value_tags {
622            get_multi_value_tags
623        } else {
624            get_single_value_tags
625        },
626    );
627
628    let mut map = ObjectMap::default();
629
630    for target_path in &info.target_queries {
631        // Accessing a root path requires us to pre-populate all fields.
632        if target_path == &OwnedTargetPath::event_root() {
633            let mut properties = [
634                &mut name,
635                &mut kind,
636                &mut type_,
637                &mut namespace,
638                &mut interval_ms,
639                &mut timestamp,
640                &mut tags,
641            ];
642            properties
643                .iter_mut()
644                .for_each(|property| property.insert(metric, &mut map));
645            break;
646        }
647
648        // For non-root paths, we continuously populate the value with the
649        // relevant data.
650        if let Some(OwnedSegment::Field(field)) = target_path.path.segments.first() {
651            let property = match field.as_ref() {
652                "name" => Some(&mut name),
653                "kind" => Some(&mut kind),
654                "type" => Some(&mut type_),
655                "namespace" => Some(&mut namespace),
656                "timestamp" => Some(&mut timestamp),
657                "interval_ms" => Some(&mut interval_ms),
658                "tags" => Some(&mut tags),
659                _ => None,
660            };
661            if let Some(property) = property {
662                property.insert(metric, &mut map);
663            }
664        }
665    }
666
667    map.into()
668}
669
670#[derive(Debug, Snafu)]
671enum MetricPathError<'a> {
672    #[snafu(display("cannot set root path"))]
673    SetPathError,
674
675    #[snafu(display("invalid path {}: expected one of {}", path, expected))]
676    InvalidPath { path: &'a str, expected: &'a str },
677}
678
679#[cfg(test)]
680mod test {
681    use chrono::{offset::TimeZone, Utc};
682    use lookup::owned_value_path;
683    use similar_asserts::assert_eq;
684    use vrl::btreemap;
685    use vrl::value::kind::Index;
686
687    use super::super::MetricValue;
688    use super::*;
689    use crate::metric_tags;
690
691    #[test]
692    fn test_field_definitions_in_message() {
693        let definition =
694            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
695        assert_eq!(
696            Definition::new_with_default_metadata(
697                Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
698                [LogNamespace::Legacy]
699            ),
700            move_field_definitions_into_message(definition)
701        );
702
703        // Test when a message field already exists.
704        let definition = Definition::new_with_default_metadata(
705            Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
706            [LogNamespace::Legacy],
707        );
708        assert_eq!(
709            Definition::new_with_default_metadata(
710                Kind::object(BTreeMap::from([(
711                    "message".into(),
712                    Kind::bytes().or_integer()
713                )])),
714                [LogNamespace::Legacy]
715            ),
716            move_field_definitions_into_message(definition)
717        );
718    }
719
720    #[test]
721    fn test_merged_array_definitions_simple() {
722        // Test merging the array definitions where the schema definition
723        // is simple, containing only one possible type in the array.
724        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
725            ("carrot".into(), Kind::bytes()),
726            ("potato".into(), Kind::integer()),
727        ]
728        .into();
729
730        let kind = Kind::array(Collection::from_unknown(Kind::object(object)));
731
732        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
733
734        let kind = Kind::object(BTreeMap::from([
735            ("carrot".into(), Kind::bytes()),
736            ("potato".into(), Kind::integer()),
737        ]));
738
739        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
740        let merged = merge_array_definitions(definition);
741
742        assert_eq!(wanted, merged);
743    }
744
745    #[test]
746    fn test_merged_array_definitions_complex() {
747        // Test merging the array definitions where the schema definition
748        // is fairly complex containing multiple different possible types.
749        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
750            ("carrot".into(), Kind::bytes()),
751            ("potato".into(), Kind::integer()),
752        ]
753        .into();
754
755        let array: BTreeMap<Index, Kind> = [
756            (Index::from(0), Kind::integer()),
757            (Index::from(1), Kind::boolean()),
758            (
759                Index::from(2),
760                Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
761            ),
762        ]
763        .into();
764
765        let mut kind = Kind::bytes();
766        kind.add_object(object);
767        kind.add_array(array);
768
769        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
770
771        let mut kind = Kind::bytes();
772        kind.add_integer();
773        kind.add_boolean();
774        kind.add_object(BTreeMap::from([
775            ("carrot".into(), Kind::bytes().or_undefined()),
776            ("potato".into(), Kind::integer().or_undefined()),
777            ("peas".into(), Kind::bytes().or_undefined()),
778        ]));
779
780        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
781        let merged = merge_array_definitions(definition);
782
783        assert_eq!(wanted, merged);
784    }
785
786    #[test]
787    fn log_get() {
788        let cases = vec![
789            (
790                BTreeMap::new(),
791                owned_value_path!(),
792                Ok(Some(BTreeMap::new().into())),
793            ),
794            (
795                BTreeMap::from([("foo".into(), "bar".into())]),
796                owned_value_path!(),
797                Ok(Some(BTreeMap::from([("foo".into(), "bar".into())]).into())),
798            ),
799            (
800                BTreeMap::from([("foo".into(), "bar".into())]),
801                owned_value_path!("foo"),
802                Ok(Some("bar".into())),
803            ),
804            (
805                BTreeMap::from([("foo".into(), "bar".into())]),
806                owned_value_path!("bar"),
807                Ok(None),
808            ),
809            (
810                btreemap! { "foo" => vec![btreemap! { "bar" => true }] },
811                owned_value_path!("foo", 0, "bar"),
812                Ok(Some(true.into())),
813            ),
814            (
815                btreemap! { "foo" => btreemap! { "bar baz" => btreemap! { "baz" => 2 } } },
816                owned_value_path!("foo", r"bar baz", "baz"),
817                Ok(Some(2.into())),
818            ),
819        ];
820
821        for (value, path, expect) in cases {
822            let value: ObjectMap = value;
823            let info = ProgramInfo {
824                fallible: false,
825                abortable: false,
826                target_queries: vec![],
827                target_assignments: vec![],
828            };
829            let target = VrlTarget::new(Event::Log(LogEvent::from(value)), &info, false);
830            let path = OwnedTargetPath::event(path);
831
832            assert_eq!(
833                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
834                expect
835            );
836        }
837    }
838
839    #[allow(clippy::too_many_lines)]
840    #[test]
841    fn log_insert() {
842        let cases = vec![
843            (
844                BTreeMap::from([("foo".into(), "bar".into())]),
845                owned_value_path!(0),
846                btreemap! { "baz" => "qux" }.into(),
847                btreemap! { "baz" => "qux" },
848                Ok(()),
849            ),
850            (
851                BTreeMap::from([("foo".into(), "bar".into())]),
852                owned_value_path!("foo"),
853                "baz".into(),
854                btreemap! { "foo" => "baz" },
855                Ok(()),
856            ),
857            (
858                BTreeMap::from([("foo".into(), "bar".into())]),
859                owned_value_path!("foo", 2, "bar baz", "a", "b"),
860                true.into(),
861                btreemap! {
862                    "foo" => vec![
863                        Value::Null,
864                        Value::Null,
865                        btreemap! {
866                            "bar baz" => btreemap! { "a" => btreemap! { "b" => true } },
867                        }.into()
868                    ]
869                },
870                Ok(()),
871            ),
872            (
873                btreemap! { "foo" => vec![0, 1, 2] },
874                owned_value_path!("foo", 5),
875                "baz".into(),
876                btreemap! {
877                    "foo" => vec![
878                        0.into(),
879                        1.into(),
880                        2.into(),
881                        Value::Null,
882                        Value::Null,
883                        Value::from("baz"),
884                    ],
885                },
886                Ok(()),
887            ),
888            (
889                BTreeMap::from([("foo".into(), "bar".into())]),
890                owned_value_path!("foo", 0),
891                "baz".into(),
892                btreemap! { "foo" => vec!["baz"] },
893                Ok(()),
894            ),
895            (
896                btreemap! { "foo" => Value::Array(vec![]) },
897                owned_value_path!("foo", 0),
898                "baz".into(),
899                btreemap! { "foo" => vec!["baz"] },
900                Ok(()),
901            ),
902            (
903                btreemap! { "foo" => Value::Array(vec![0.into()]) },
904                owned_value_path!("foo", 0),
905                "baz".into(),
906                btreemap! { "foo" => vec!["baz"] },
907                Ok(()),
908            ),
909            (
910                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
911                owned_value_path!("foo", 0),
912                "baz".into(),
913                btreemap! { "foo" => Value::Array(vec!["baz".into(), 1.into()]) },
914                Ok(()),
915            ),
916            (
917                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
918                owned_value_path!("foo", 1),
919                "baz".into(),
920                btreemap! { "foo" => Value::Array(vec![0.into(), "baz".into()]) },
921                Ok(()),
922            ),
923        ];
924
925        for (object, path, value, expect, result) in cases {
926            let object: ObjectMap = object;
927            let info = ProgramInfo {
928                fallible: false,
929                abortable: false,
930                target_queries: vec![],
931                target_assignments: vec![],
932            };
933            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
934            let expect = LogEvent::from(expect);
935            let value: Value = value;
936            let path = OwnedTargetPath::event(path);
937
938            assert_eq!(
939                Target::target_insert(&mut target, &path, value.clone()),
940                result
941            );
942            assert_eq!(
943                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
944                Ok(Some(value))
945            );
946            assert_eq!(
947                match target.into_events(LogNamespace::Legacy) {
948                    TargetEvents::One(event) => vec![event],
949                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
950                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
951                }
952                .first()
953                .cloned()
954                .unwrap(),
955                Event::Log(expect)
956            );
957        }
958    }
959
960    #[test]
961    fn log_remove() {
962        let cases = vec![
963            (
964                BTreeMap::from([("foo".into(), "bar".into())]),
965                owned_value_path!("foo"),
966                false,
967                Some(BTreeMap::new().into()),
968            ),
969            (
970                BTreeMap::from([("foo".into(), "bar".into())]),
971                owned_value_path!(r"foo bar", "foo"),
972                false,
973                Some(btreemap! { "foo" => "bar"}.into()),
974            ),
975            (
976                btreemap! { "foo" => "bar", "baz" => "qux" },
977                owned_value_path!(),
978                false,
979                Some(BTreeMap::new().into()),
980            ),
981            (
982                btreemap! { "foo" => "bar", "baz" => "qux" },
983                owned_value_path!(),
984                true,
985                Some(BTreeMap::new().into()),
986            ),
987            (
988                btreemap! { "foo" => vec![0] },
989                owned_value_path!("foo", 0),
990                false,
991                Some(btreemap! { "foo" => Value::Array(vec![]) }.into()),
992            ),
993            (
994                btreemap! { "foo" => vec![0] },
995                owned_value_path!("foo", 0),
996                true,
997                Some(BTreeMap::new().into()),
998            ),
999            (
1000                btreemap! {
1001                    "foo" => btreemap! { "bar baz" => vec![0] },
1002                    "bar" => "baz",
1003                },
1004                owned_value_path!("foo", r"bar baz", 0),
1005                false,
1006                Some(
1007                    btreemap! {
1008                        "foo" => btreemap! { "bar baz" => Value::Array(vec![]) },
1009                        "bar" => "baz",
1010                    }
1011                    .into(),
1012                ),
1013            ),
1014            (
1015                btreemap! {
1016                    "foo" => btreemap! { "bar baz" => vec![0] },
1017                    "bar" => "baz",
1018                },
1019                owned_value_path!("foo", r"bar baz", 0),
1020                true,
1021                Some(btreemap! { "bar" => "baz" }.into()),
1022            ),
1023        ];
1024
1025        for (object, path, compact, expect) in cases {
1026            let info = ProgramInfo {
1027                fallible: false,
1028                abortable: false,
1029                target_queries: vec![],
1030                target_assignments: vec![],
1031            };
1032            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
1033            let path = OwnedTargetPath::event(path);
1034            let removed = Target::target_get(&target, &path).unwrap().cloned();
1035
1036            assert_eq!(
1037                Target::target_remove(&mut target, &path, compact),
1038                Ok(removed)
1039            );
1040            assert_eq!(
1041                Target::target_get(&target, &OwnedTargetPath::event_root())
1042                    .map(Option::<&Value>::cloned),
1043                Ok(expect)
1044            );
1045        }
1046    }
1047
1048    #[test]
1049    fn log_into_events() {
1050        use vrl::btreemap;
1051
1052        let cases = vec![
1053            (
1054                Value::from(btreemap! {"foo" => "bar"}),
1055                vec![btreemap! {"foo" => "bar"}],
1056            ),
1057            (Value::from(1), vec![btreemap! {"message" => 1}]),
1058            (Value::from("2"), vec![btreemap! {"message" => "2"}]),
1059            (Value::from(true), vec![btreemap! {"message" => true}]),
1060            (
1061                Value::from(vec![
1062                    Value::from(1),
1063                    Value::from("2"),
1064                    Value::from(true),
1065                    Value::from(btreemap! {"foo" => "bar"}),
1066                ]),
1067                vec![
1068                    btreemap! {"message" => 1},
1069                    btreemap! {"message" => "2"},
1070                    btreemap! {"message" => true},
1071                    btreemap! {"foo" => "bar"},
1072                ],
1073            ),
1074        ];
1075
1076        for (value, expect) in cases {
1077            let metadata = EventMetadata::default();
1078            let info = ProgramInfo {
1079                fallible: false,
1080                abortable: false,
1081                target_queries: vec![],
1082                target_assignments: vec![],
1083            };
1084            let mut target = VrlTarget::new(
1085                Event::Log(LogEvent::new_with_metadata(metadata.clone())),
1086                &info,
1087                false,
1088            );
1089
1090            Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();
1091
1092            assert_eq!(
1093                match target.into_events(LogNamespace::Legacy) {
1094                    TargetEvents::One(event) => vec![event],
1095                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
1096                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
1097                },
1098                expect
1099                    .into_iter()
1100                    .map(|v| Event::Log(LogEvent::from_map(v, metadata.clone())))
1101                    .collect::<Vec<_>>()
1102            );
1103        }
1104    }
1105
1106    #[test]
1107    fn metric_all_fields() {
1108        let metric = Metric::new(
1109            "zub",
1110            MetricKind::Absolute,
1111            MetricValue::Counter { value: 1.23 },
1112        )
1113        .with_namespace(Some("zoob"))
1114        .with_tags(Some(metric_tags!("tig" => "tog")))
1115        .with_timestamp(Some(
1116            Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0)
1117                .single()
1118                .expect("invalid timestamp"),
1119        ))
1120        .with_interval_ms(Some(NonZero::<u32>::new(507).unwrap()));
1121
1122        let info = ProgramInfo {
1123            fallible: false,
1124            abortable: false,
1125            target_queries: vec![
1126                OwnedTargetPath::event(owned_value_path!("name")),
1127                OwnedTargetPath::event(owned_value_path!("namespace")),
1128                OwnedTargetPath::event(owned_value_path!("interval_ms")),
1129                OwnedTargetPath::event(owned_value_path!("timestamp")),
1130                OwnedTargetPath::event(owned_value_path!("kind")),
1131                OwnedTargetPath::event(owned_value_path!("type")),
1132                OwnedTargetPath::event(owned_value_path!("tags")),
1133            ],
1134            target_assignments: vec![],
1135        };
1136        let target = VrlTarget::new(Event::Metric(metric), &info, false);
1137
1138        assert_eq!(
1139            Ok(Some(
1140                btreemap! {
1141                    "name" => "zub",
1142                    "namespace" => "zoob",
1143                    "interval_ms" => 507,
1144                    "timestamp" => Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0).single().expect("invalid timestamp"),
1145                    "tags" => btreemap! { "tig" => "tog" },
1146                    "kind" => "absolute",
1147                    "type" => "counter",
1148                }
1149                .into()
1150            )),
1151            target
1152                .target_get(&OwnedTargetPath::event_root())
1153                .map(Option::<&Value>::cloned)
1154        );
1155    }
1156
1157    #[test]
1158    fn metric_fields() {
1159        struct Case {
1160            path: OwnedValuePath,
1161            current: Option<Value>,
1162            new: Value,
1163            delete: bool,
1164        }
1165
1166        let metric = Metric::new(
1167            "name",
1168            MetricKind::Absolute,
1169            MetricValue::Counter { value: 1.23 },
1170        )
1171        .with_tags(Some(metric_tags!("tig" => "tog")));
1172
1173        let cases = vec![
1174            Case {
1175                path: owned_value_path!("name"),
1176                current: Some(Value::from("name")),
1177                new: Value::from("namefoo"),
1178                delete: false,
1179            },
1180            Case {
1181                path: owned_value_path!("namespace"),
1182                current: None,
1183                new: "namespacefoo".into(),
1184                delete: true,
1185            },
1186            Case {
1187                path: owned_value_path!("timestamp"),
1188                current: None,
1189                new: Utc
1190                    .with_ymd_and_hms(2020, 12, 8, 12, 0, 0)
1191                    .single()
1192                    .expect("invalid timestamp")
1193                    .into(),
1194                delete: true,
1195            },
1196            Case {
1197                path: owned_value_path!("interval_ms"),
1198                current: None,
1199                new: 123_456.into(),
1200                delete: true,
1201            },
1202            Case {
1203                path: owned_value_path!("kind"),
1204                current: Some(Value::from("absolute")),
1205                new: "incremental".into(),
1206                delete: false,
1207            },
1208            Case {
1209                path: owned_value_path!("tags", "thing"),
1210                current: None,
1211                new: "footag".into(),
1212                delete: true,
1213            },
1214        ];
1215
1216        let info = ProgramInfo {
1217            fallible: false,
1218            abortable: false,
1219            target_queries: vec![
1220                OwnedTargetPath::event(owned_value_path!("name")),
1221                OwnedTargetPath::event(owned_value_path!("namespace")),
1222                OwnedTargetPath::event(owned_value_path!("timestamp")),
1223                OwnedTargetPath::event(owned_value_path!("interval_ms")),
1224                OwnedTargetPath::event(owned_value_path!("kind")),
1225            ],
1226            target_assignments: vec![],
1227        };
1228        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1229
1230        for Case {
1231            path,
1232            current,
1233            new,
1234            delete,
1235        } in cases
1236        {
1237            let path = OwnedTargetPath::event(path);
1238
1239            assert_eq!(
1240                Ok(current),
1241                target.target_get(&path).map(Option::<&Value>::cloned)
1242            );
1243            assert_eq!(Ok(()), target.target_insert(&path, new.clone()));
1244            assert_eq!(
1245                Ok(Some(new.clone())),
1246                target.target_get(&path).map(Option::<&Value>::cloned)
1247            );
1248
1249            if delete {
1250                assert_eq!(Ok(Some(new)), target.target_remove(&path, true));
1251                assert_eq!(
1252                    Ok(None),
1253                    target.target_get(&path).map(Option::<&Value>::cloned)
1254                );
1255            }
1256        }
1257    }
1258
1259    #[test]
1260    fn metric_set_tags() {
1261        let metric = Metric::new(
1262            "name",
1263            MetricKind::Absolute,
1264            MetricValue::Counter { value: 1.23 },
1265        )
1266        .with_tags(Some(metric_tags!("tig" => "tog")));
1267
1268        let info = ProgramInfo {
1269            fallible: false,
1270            abortable: false,
1271            target_queries: vec![],
1272            target_assignments: vec![],
1273        };
1274        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1275        let _result = target.target_insert(
1276            &OwnedTargetPath::event(owned_value_path!("tags")),
1277            Value::Object(BTreeMap::from([("a".into(), "b".into())])),
1278        );
1279
1280        match target {
1281            VrlTarget::Metric {
1282                metric,
1283                value: _,
1284                multi_value_tags: _,
1285            } => {
1286                assert!(metric.tags().is_some());
1287                assert_eq!(metric.tags().unwrap(), &crate::metric_tags!("a" => "b"));
1288            }
1289            _ => panic!("must be a metric"),
1290        }
1291    }
1292
1293    #[test]
1294    fn metric_invalid_paths() {
1295        let metric = Metric::new(
1296            "name",
1297            MetricKind::Absolute,
1298            MetricValue::Counter { value: 1.23 },
1299        );
1300
1301        let validpaths_get = [
1302            ".name",
1303            ".namespace",
1304            ".interval_ms",
1305            ".timestamp",
1306            ".kind",
1307            ".tags",
1308            ".type",
1309        ];
1310
1311        let validpaths_set = [
1312            ".name",
1313            ".namespace",
1314            ".interval_ms",
1315            ".timestamp",
1316            ".kind",
1317            ".tags",
1318        ];
1319
1320        let info = ProgramInfo {
1321            fallible: false,
1322            abortable: false,
1323            target_queries: vec![],
1324            target_assignments: vec![],
1325        };
1326        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1327
1328        assert_eq!(
1329            Err(format!(
1330                "invalid path zork: expected one of {}",
1331                validpaths_get.join(", ")
1332            )),
1333            target.target_get(&OwnedTargetPath::event(owned_value_path!("zork")))
1334        );
1335
1336        assert_eq!(
1337            Err(format!(
1338                "invalid path zork: expected one of {}",
1339                validpaths_set.join(", ")
1340            )),
1341            target.target_insert(
1342                &OwnedTargetPath::event(owned_value_path!("zork")),
1343                "thing".into()
1344            )
1345        );
1346
1347        assert_eq!(
1348            Err(format!(
1349                "invalid path zork: expected one of {}",
1350                validpaths_set.join(", ")
1351            )),
1352            target.target_remove(&OwnedTargetPath::event(owned_value_path!("zork")), true)
1353        );
1354
1355        assert_eq!(
1356            Err(format!(
1357                "invalid path tags.foo.flork: expected one of {}",
1358                validpaths_get.join(", ")
1359            )),
1360            target.target_get(&OwnedTargetPath::event(owned_value_path!(
1361                "tags", "foo", "flork"
1362            )))
1363        );
1364    }
1365
1366    #[test]
1367    fn test_metric_insert_get_multi_value_tag() {
1368        let metric = Metric::new(
1369            "name",
1370            MetricKind::Absolute,
1371            MetricValue::Counter { value: 1.23 },
1372        );
1373        let info = ProgramInfo {
1374            fallible: false,
1375            abortable: false,
1376            target_queries: vec![],
1377            target_assignments: vec![],
1378        };
1379
1380        let mut target = VrlTarget::new(Event::Metric(metric), &info, true);
1381
1382        let value = Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()]);
1383        target
1384            .target_insert(
1385                &OwnedTargetPath::event(owned_value_path!("tags", "foo")),
1386                value,
1387            )
1388            .unwrap();
1389
1390        let vrl_tags_value = target
1391            .target_get(&OwnedTargetPath::event(owned_value_path!("tags")))
1392            .unwrap()
1393            .unwrap();
1394
1395        assert_eq!(
1396            vrl_tags_value,
1397            &Value::Object(BTreeMap::from([(
1398                "foo".into(),
1399                Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()])
1400            )]))
1401        );
1402
1403        let VrlTarget::Metric { metric, .. } = target else {
1404            unreachable!()
1405        };
1406
1407        // get single value (should be the last one)
1408        assert_eq!(metric.tag_value("foo"), Some("b".into()));
1409    }
1410}