vector_core/event/
vrl_target.rs

1use std::{
2    borrow::Cow,
3    collections::BTreeMap,
4    convert::TryFrom,
5    marker::PhantomData,
6    num::{NonZero, TryFromIntError},
7};
8
9use lookup::{OwnedTargetPath, OwnedValuePath, PathPrefix, lookup_v2::OwnedSegment};
10use snafu::Snafu;
11use vrl::{
12    compiler::{ProgramInfo, SecretTarget, Target, value::VrlValueConvert},
13    prelude::Collection,
14    value::{Kind, ObjectMap, Value},
15};
16
17use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent, metric::TagValue};
18use crate::{
19    config::{LogNamespace, log_schema},
20    schema::Definition,
21};
22
23const VALID_METRIC_PATHS_SET: &str = ".name, .namespace, .interval_ms, .timestamp, .kind, .tags";
24
25/// We can get the `type` of the metric in Remap, but can't set it.
26const VALID_METRIC_PATHS_GET: &str =
27    ".name, .namespace, .interval_ms, .timestamp, .kind, .tags, .type";
28
29/// Metrics aren't interested in paths that have a length longer than 3.
30///
31/// The longest path is 2, and we need to check that a third segment doesn't exist as we don't want
32/// fields such as `.tags.host.thing`.
33const MAX_METRIC_PATH_DEPTH: usize = 3;
34
35/// An adapter to turn `Event`s into `vrl_lib::Target`s.
36#[allow(clippy::large_enum_variant)]
37#[derive(Debug, Clone)]
38pub enum VrlTarget {
39    // `LogEvent` is essentially just a destructured `event::LogEvent`, but without the semantics
40    // that `fields` must always be a `Map` variant.
41    LogEvent(Value, EventMetadata),
42    Metric {
43        metric: Metric,
44        value: Value,
45        multi_value_tags: bool,
46    },
47    Trace(Value, EventMetadata),
48}
49
50pub enum TargetEvents {
51    One(Event),
52    Logs(TargetIter<LogEvent>),
53    Traces(TargetIter<TraceEvent>),
54}
55
56pub struct TargetIter<T> {
57    iter: std::vec::IntoIter<Value>,
58    metadata: EventMetadata,
59    _marker: PhantomData<T>,
60    log_namespace: LogNamespace,
61}
62
63fn create_log_event(value: Value, metadata: EventMetadata) -> LogEvent {
64    let mut log = LogEvent::new_with_metadata(metadata);
65    log.maybe_insert(log_schema().message_key_target_path(), value);
66    log
67}
68
69impl Iterator for TargetIter<LogEvent> {
70    type Item = Event;
71
72    fn next(&mut self) -> Option<Self::Item> {
73        self.iter.next().map(|v| {
74            match self.log_namespace {
75                LogNamespace::Legacy => match v {
76                    value @ Value::Object(_) => LogEvent::from_parts(value, self.metadata.clone()),
77                    value => create_log_event(value, self.metadata.clone()),
78                },
79                LogNamespace::Vector => LogEvent::from_parts(v, self.metadata.clone()),
80            }
81            .into()
82        })
83    }
84}
85
86impl Iterator for TargetIter<TraceEvent> {
87    type Item = Event;
88
89    fn next(&mut self) -> Option<Self::Item> {
90        self.iter.next().map(|v| {
91            match v {
92                value @ Value::Object(_) => {
93                    TraceEvent::from(LogEvent::from_parts(value, self.metadata.clone()))
94                }
95                value => TraceEvent::from(create_log_event(value, self.metadata.clone())),
96            }
97            .into()
98        })
99    }
100}
101
102impl VrlTarget {
103    pub fn new(event: Event, info: &ProgramInfo, multi_value_metric_tags: bool) -> Self {
104        match event {
105            Event::Log(event) => {
106                let (value, metadata) = event.into_parts();
107                VrlTarget::LogEvent(value, metadata)
108            }
109            Event::Metric(metric) => {
110                // We pre-generate [`Value`] types for the metric fields accessed in
111                // the event. This allows us to then return references to those
112                // values, even if the field is accessed more than once.
113                let value = precompute_metric_value(&metric, info, multi_value_metric_tags);
114
115                VrlTarget::Metric {
116                    metric,
117                    value,
118                    multi_value_tags: multi_value_metric_tags,
119                }
120            }
121            Event::Trace(event) => {
122                let (fields, metadata) = event.into_parts();
123                VrlTarget::Trace(Value::Object(fields), metadata)
124            }
125        }
126    }
127
128    /// Modifies a schema in the same way that the `into_events` function modifies the event
129    pub fn modify_schema_definition_for_into_events(input: Definition) -> Definition {
130        let log_namespaces = input.log_namespaces().clone();
131
132        // both namespaces merge arrays, but only `Legacy` moves field definitions into a "message" field.
133        let merged_arrays = merge_array_definitions(input);
134        Definition::combine_log_namespaces(
135            &log_namespaces,
136            move_field_definitions_into_message(merged_arrays.clone()),
137            merged_arrays,
138        )
139    }
140
141    /// Turn the target back into events.
142    ///
143    /// This returns an iterator of events as one event can be turned into multiple by assigning an
144    /// array to `.` in VRL.
145    pub fn into_events(self, log_namespace: LogNamespace) -> TargetEvents {
146        match self {
147            VrlTarget::LogEvent(value, metadata) => match value {
148                value @ Value::Object(_) => {
149                    TargetEvents::One(LogEvent::from_parts(value, metadata).into())
150                }
151
152                Value::Array(values) => TargetEvents::Logs(TargetIter {
153                    iter: values.into_iter(),
154                    metadata,
155                    _marker: PhantomData,
156                    log_namespace,
157                }),
158
159                v => match log_namespace {
160                    LogNamespace::Vector => {
161                        TargetEvents::One(LogEvent::from_parts(v, metadata).into())
162                    }
163                    LogNamespace::Legacy => TargetEvents::One(create_log_event(v, metadata).into()),
164                },
165            },
166            VrlTarget::Trace(value, metadata) => match value {
167                value @ Value::Object(_) => {
168                    let log = LogEvent::from_parts(value, metadata);
169                    TargetEvents::One(TraceEvent::from(log).into())
170                }
171
172                Value::Array(values) => TargetEvents::Traces(TargetIter {
173                    iter: values.into_iter(),
174                    metadata,
175                    _marker: PhantomData,
176                    log_namespace,
177                }),
178
179                v => TargetEvents::One(create_log_event(v, metadata).into()),
180            },
181            VrlTarget::Metric { metric, .. } => TargetEvents::One(Event::Metric(metric)),
182        }
183    }
184
185    fn metadata(&self) -> &EventMetadata {
186        match self {
187            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
188            VrlTarget::Metric { metric, .. } => metric.metadata(),
189        }
190    }
191
192    fn metadata_mut(&mut self) -> &mut EventMetadata {
193        match self {
194            VrlTarget::LogEvent(_, metadata) | VrlTarget::Trace(_, metadata) => metadata,
195            VrlTarget::Metric { metric, .. } => metric.metadata_mut(),
196        }
197    }
198}
199
200/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]),
201/// or an object, that data is moved into the `message` field.
202fn move_field_definitions_into_message(mut definition: Definition) -> Definition {
203    let mut message = definition.event_kind().clone();
204    message.remove_object();
205    message.remove_array();
206
207    if !message.is_never()
208        && let Some(message_key) = log_schema().message_key()
209    {
210        // We need to add the given message type to a field called `message`
211        // in the event.
212        let message = Kind::object(Collection::from(BTreeMap::from([(
213            message_key.to_string().into(),
214            message,
215        )])));
216
217        definition.event_kind_mut().remove_bytes();
218        definition.event_kind_mut().remove_integer();
219        definition.event_kind_mut().remove_float();
220        definition.event_kind_mut().remove_boolean();
221        definition.event_kind_mut().remove_timestamp();
222        definition.event_kind_mut().remove_regex();
223        definition.event_kind_mut().remove_null();
224
225        *definition.event_kind_mut() = definition.event_kind().union(message);
226    }
227
228    definition
229}
230
231/// If the transform returns an array, the elements of this array will be separated
232/// out into it's individual elements and passed downstream.
233///
234/// The potential types that the transform can output are any of the arrays
235/// elements or any non-array elements that are within the definition. All these
236/// definitions need to be merged together.
237fn merge_array_definitions(mut definition: Definition) -> Definition {
238    if let Some(array) = definition.event_kind().as_array() {
239        let array_kinds = array.reduced_kind();
240
241        let kind = definition.event_kind_mut();
242        kind.remove_array();
243        *kind = kind.union(array_kinds);
244    }
245
246    definition
247}
248
249fn set_metric_tag_values(name: String, value: &Value, metric: &mut Metric, multi_value_tags: bool) {
250    if multi_value_tags {
251        let values = if let Value::Array(values) = value {
252            values.as_slice()
253        } else {
254            std::slice::from_ref(value)
255        };
256
257        let tag_values = values
258            .iter()
259            .filter_map(|value| match value {
260                Value::Bytes(bytes) => {
261                    Some(TagValue::Value(String::from_utf8_lossy(bytes).to_string()))
262                }
263                Value::Null => Some(TagValue::Bare),
264                _ => None,
265            })
266            .collect::<Vec<_>>();
267
268        metric.set_multi_value_tag(name, tag_values);
269    } else {
270        // set a single tag value
271        if let Ok(tag_value) = value.try_bytes_utf8_lossy().map(Cow::into_owned) {
272            metric.replace_tag(name, tag_value);
273        } else if value.is_null() {
274            metric.set_multi_value_tag(name, vec![TagValue::Bare]);
275        }
276    }
277}
278
279impl Target for VrlTarget {
280    fn target_insert(&mut self, target_path: &OwnedTargetPath, value: Value) -> Result<(), String> {
281        let path = &target_path.path;
282        match target_path.prefix {
283            PathPrefix::Event => match self {
284                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
285                    log.insert(path, value);
286                    Ok(())
287                }
288                VrlTarget::Metric {
289                    metric,
290                    value: metric_value,
291                    multi_value_tags,
292                } => {
293                    if path.is_root() {
294                        return Err(MetricPathError::SetPathError.to_string());
295                    }
296
297                    if let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) {
298                        match paths.as_slice() {
299                            ["tags"] => {
300                                let value =
301                                    value.clone().try_object().map_err(|e| e.to_string())?;
302
303                                metric.remove_tags();
304                                for (field, value) in &value {
305                                    set_metric_tag_values(
306                                        field[..].into(),
307                                        value,
308                                        metric,
309                                        *multi_value_tags,
310                                    );
311                                }
312                            }
313                            ["tags", field] => {
314                                set_metric_tag_values(
315                                    (*field).to_owned(),
316                                    &value,
317                                    metric,
318                                    *multi_value_tags,
319                                );
320                            }
321                            ["name"] => {
322                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
323                                metric.series.name.name =
324                                    String::from_utf8_lossy(&value).into_owned();
325                            }
326                            ["namespace"] => {
327                                let value = value.clone().try_bytes().map_err(|e| e.to_string())?;
328                                metric.series.name.namespace =
329                                    Some(String::from_utf8_lossy(&value).into_owned());
330                            }
331                            ["interval_ms"] => {
332                                let value: i64 =
333                                    value.clone().try_into_i64().map_err(|e| e.to_string())?;
334                                let value: u32 = value
335                                    .try_into()
336                                    .map_err(|e: TryFromIntError| e.to_string())?;
337                                let value = NonZero::try_from(value).map_err(|e| e.to_string())?;
338                                metric.data.time.interval_ms = Some(value);
339                            }
340                            ["timestamp"] => {
341                                let value =
342                                    value.clone().try_timestamp().map_err(|e| e.to_string())?;
343                                metric.data.time.timestamp = Some(value);
344                            }
345                            ["kind"] => {
346                                metric.data.kind = MetricKind::try_from(value.clone())?;
347                            }
348                            _ => {
349                                return Err(MetricPathError::InvalidPath {
350                                    path: &path.to_string(),
351                                    expected: VALID_METRIC_PATHS_SET,
352                                }
353                                .to_string());
354                            }
355                        }
356
357                        metric_value.insert(path, value);
358
359                        return Ok(());
360                    }
361
362                    Err(MetricPathError::InvalidPath {
363                        path: &path.to_string(),
364                        expected: VALID_METRIC_PATHS_SET,
365                    }
366                    .to_string())
367                }
368            },
369            PathPrefix::Metadata => {
370                self.metadata_mut()
371                    .value_mut()
372                    .insert(&target_path.path, value);
373                Ok(())
374            }
375        }
376    }
377
378    #[allow(clippy::redundant_closure_for_method_calls)] // false positive
379    fn target_get(&self, target_path: &OwnedTargetPath) -> Result<Option<&Value>, String> {
380        match target_path.prefix {
381            PathPrefix::Event => match self {
382                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
383                    Ok(log.get(&target_path.path))
384                }
385                VrlTarget::Metric { value, .. } => target_get_metric(&target_path.path, value),
386            },
387            PathPrefix::Metadata => Ok(self.metadata().value().get(&target_path.path)),
388        }
389    }
390
391    fn target_get_mut(
392        &mut self,
393        target_path: &OwnedTargetPath,
394    ) -> Result<Option<&mut Value>, String> {
395        match target_path.prefix {
396            PathPrefix::Event => match self {
397                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
398                    Ok(log.get_mut(&target_path.path))
399                }
400                VrlTarget::Metric { value, .. } => target_get_mut_metric(&target_path.path, value),
401            },
402            PathPrefix::Metadata => Ok(self.metadata_mut().value_mut().get_mut(&target_path.path)),
403        }
404    }
405
406    fn target_remove(
407        &mut self,
408        target_path: &OwnedTargetPath,
409        compact: bool,
410    ) -> Result<Option<vrl::value::Value>, String> {
411        match target_path.prefix {
412            PathPrefix::Event => match self {
413                VrlTarget::LogEvent(log, _) | VrlTarget::Trace(log, _) => {
414                    Ok(log.remove(&target_path.path, compact))
415                }
416                VrlTarget::Metric {
417                    metric,
418                    value,
419                    multi_value_tags: _,
420                } => {
421                    if target_path.path.is_root() {
422                        return Err(MetricPathError::SetPathError.to_string());
423                    }
424
425                    if let Some(paths) = target_path
426                        .path
427                        .to_alternative_components(MAX_METRIC_PATH_DEPTH)
428                    {
429                        let removed_value = match paths.as_slice() {
430                            ["namespace"] => metric.series.name.namespace.take().map(Into::into),
431                            ["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
432                            ["interval_ms"] => metric
433                                .data
434                                .time
435                                .interval_ms
436                                .take()
437                                .map(u32::from)
438                                .map(Into::into),
439                            ["tags"] => metric.series.tags.take().map(|map| {
440                                map.into_iter_single()
441                                    .map(|(k, v)| (k, v.into()))
442                                    .collect::<vrl::value::Value>()
443                            }),
444                            ["tags", field] => metric.remove_tag(field).map(Into::into),
445                            _ => {
446                                return Err(MetricPathError::InvalidPath {
447                                    path: &target_path.path.to_string(),
448                                    expected: VALID_METRIC_PATHS_SET,
449                                }
450                                .to_string());
451                            }
452                        };
453
454                        value.remove(&target_path.path, false);
455
456                        Ok(removed_value)
457                    } else {
458                        Ok(None)
459                    }
460                }
461            },
462            PathPrefix::Metadata => Ok(self
463                .metadata_mut()
464                .value_mut()
465                .remove(&target_path.path, compact)),
466        }
467    }
468}
469
470impl SecretTarget for VrlTarget {
471    fn get_secret(&self, key: &str) -> Option<&str> {
472        self.metadata().secrets().get_secret(key)
473    }
474
475    fn insert_secret(&mut self, key: &str, value: &str) {
476        self.metadata_mut().secrets_mut().insert_secret(key, value);
477    }
478
479    fn remove_secret(&mut self, key: &str) {
480        self.metadata_mut().secrets_mut().remove_secret(key);
481    }
482}
483
484/// Retrieves a value from a the provided metric using the path.
485/// Currently the root path and the following paths are supported:
486/// - `name`
487/// - `namespace`
488/// - `interval_ms`
489/// - `timestamp`
490/// - `kind`
491/// - `tags`
492/// - `tags.<tagname>`
493/// - `type`
494///
495/// Any other paths result in a `MetricPathError::InvalidPath` being returned.
496fn target_get_metric<'a>(
497    path: &OwnedValuePath,
498    value: &'a Value,
499) -> Result<Option<&'a Value>, String> {
500    if path.is_root() {
501        return Ok(Some(value));
502    }
503
504    let value = value.get(path);
505
506    let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
507        return Ok(None);
508    };
509
510    match paths.as_slice() {
511        ["name"]
512        | ["kind"]
513        | ["type"]
514        | ["tags", _]
515        | ["namespace"]
516        | ["timestamp"]
517        | ["interval_ms"]
518        | ["tags"] => Ok(value),
519        _ => Err(MetricPathError::InvalidPath {
520            path: &path.to_string(),
521            expected: VALID_METRIC_PATHS_GET,
522        }
523        .to_string()),
524    }
525}
526
527fn target_get_mut_metric<'a>(
528    path: &OwnedValuePath,
529    value: &'a mut Value,
530) -> Result<Option<&'a mut Value>, String> {
531    if path.is_root() {
532        return Ok(Some(value));
533    }
534
535    let value = value.get_mut(path);
536
537    let Some(paths) = path.to_alternative_components(MAX_METRIC_PATH_DEPTH) else {
538        return Ok(None);
539    };
540
541    match paths.as_slice() {
542        ["name"]
543        | ["kind"]
544        | ["tags", _]
545        | ["namespace"]
546        | ["timestamp"]
547        | ["interval_ms"]
548        | ["tags"] => Ok(value),
549        _ => Err(MetricPathError::InvalidPath {
550            path: &path.to_string(),
551            expected: VALID_METRIC_PATHS_SET,
552        }
553        .to_string()),
554    }
555}
556
557/// pre-compute the `Value` structure of the metric.
558///
559/// This structure is partially populated based on the fields accessed by
560/// the VRL program as informed by `ProgramInfo`.
561fn precompute_metric_value(metric: &Metric, info: &ProgramInfo, multi_value_tags: bool) -> Value {
562    struct MetricProperty {
563        property: &'static str,
564        getter: fn(&Metric) -> Option<Value>,
565        set: bool,
566    }
567
568    impl MetricProperty {
569        fn new(property: &'static str, getter: fn(&Metric) -> Option<Value>) -> Self {
570            Self {
571                property,
572                getter,
573                set: false,
574            }
575        }
576
577        fn insert(&mut self, metric: &Metric, map: &mut ObjectMap) {
578            if self.set {
579                return;
580            }
581            if let Some(value) = (self.getter)(metric) {
582                map.insert(self.property.into(), value);
583                self.set = true;
584            }
585        }
586    }
587
588    fn get_single_value_tags(metric: &Metric) -> Option<Value> {
589        metric.tags().cloned().map(|tags| {
590            tags.into_iter_single()
591                .map(|(tag, value)| (tag.into(), value.into()))
592                .collect::<ObjectMap>()
593                .into()
594        })
595    }
596
597    fn get_multi_value_tags(metric: &Metric) -> Option<Value> {
598        metric.tags().cloned().map(|tags| {
599            tags.iter_sets()
600                .map(|(tag, tag_set)| {
601                    let array_values: Vec<Value> = tag_set
602                        .iter()
603                        .map(|v| match v {
604                            Some(s) => Value::Bytes(s.as_bytes().to_vec().into()),
605                            None => Value::Null,
606                        })
607                        .collect();
608                    (tag.into(), Value::Array(array_values))
609                })
610                .collect::<ObjectMap>()
611                .into()
612        })
613    }
614
615    let mut name = MetricProperty::new("name", |metric| Some(metric.name().to_owned().into()));
616    let mut kind = MetricProperty::new("kind", |metric| Some(metric.kind().into()));
617    let mut type_ = MetricProperty::new("type", |metric| Some(metric.value().clone().into()));
618    let mut namespace = MetricProperty::new("namespace", |metric| {
619        metric.namespace().map(String::from).map(Into::into)
620    });
621    let mut interval_ms =
622        MetricProperty::new("interval_ms", |metric| metric.interval_ms().map(Into::into));
623    let mut timestamp =
624        MetricProperty::new("timestamp", |metric| metric.timestamp().map(Into::into));
625    let mut tags = MetricProperty::new(
626        "tags",
627        if multi_value_tags {
628            get_multi_value_tags
629        } else {
630            get_single_value_tags
631        },
632    );
633
634    let mut map = ObjectMap::default();
635
636    for target_path in &info.target_queries {
637        // Accessing a root path requires us to pre-populate all fields.
638        if target_path == &OwnedTargetPath::event_root() {
639            let mut properties = [
640                &mut name,
641                &mut kind,
642                &mut type_,
643                &mut namespace,
644                &mut interval_ms,
645                &mut timestamp,
646                &mut tags,
647            ];
648            for property in &mut properties {
649                property.insert(metric, &mut map);
650            }
651            break;
652        }
653
654        // For non-root paths, we continuously populate the value with the
655        // relevant data.
656        if let Some(OwnedSegment::Field(field)) = target_path.path.segments.first() {
657            let property = match field.as_ref() {
658                "name" => Some(&mut name),
659                "kind" => Some(&mut kind),
660                "type" => Some(&mut type_),
661                "namespace" => Some(&mut namespace),
662                "timestamp" => Some(&mut timestamp),
663                "interval_ms" => Some(&mut interval_ms),
664                "tags" => Some(&mut tags),
665                _ => None,
666            };
667            if let Some(property) = property {
668                property.insert(metric, &mut map);
669            }
670        }
671    }
672
673    map.into()
674}
675
676#[derive(Debug, Snafu)]
677enum MetricPathError<'a> {
678    #[snafu(display("cannot set root path"))]
679    SetPathError,
680
681    #[snafu(display("invalid path {}: expected one of {}", path, expected))]
682    InvalidPath { path: &'a str, expected: &'a str },
683}
684
685#[cfg(test)]
686mod test {
687    use chrono::{Utc, offset::TimeZone};
688    use lookup::owned_value_path;
689    use similar_asserts::assert_eq;
690    use vrl::{btreemap, value::kind::Index};
691
692    use super::{super::MetricValue, *};
693    use crate::metric_tags;
694
695    #[test]
696    fn test_field_definitions_in_message() {
697        let definition =
698            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]);
699        assert_eq!(
700            Definition::new_with_default_metadata(
701                Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])),
702                [LogNamespace::Legacy]
703            ),
704            move_field_definitions_into_message(definition)
705        );
706
707        // Test when a message field already exists.
708        let definition = Definition::new_with_default_metadata(
709            Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(),
710            [LogNamespace::Legacy],
711        );
712        assert_eq!(
713            Definition::new_with_default_metadata(
714                Kind::object(BTreeMap::from([(
715                    "message".into(),
716                    Kind::bytes().or_integer()
717                )])),
718                [LogNamespace::Legacy]
719            ),
720            move_field_definitions_into_message(definition)
721        );
722    }
723
724    #[test]
725    fn test_merged_array_definitions_simple() {
726        // Test merging the array definitions where the schema definition
727        // is simple, containing only one possible type in the array.
728        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
729            ("carrot".into(), Kind::bytes()),
730            ("potato".into(), Kind::integer()),
731        ]
732        .into();
733
734        let kind = Kind::array(Collection::from_unknown(Kind::object(object)));
735
736        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
737
738        let kind = Kind::object(BTreeMap::from([
739            ("carrot".into(), Kind::bytes()),
740            ("potato".into(), Kind::integer()),
741        ]));
742
743        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
744        let merged = merge_array_definitions(definition);
745
746        assert_eq!(wanted, merged);
747    }
748
749    #[test]
750    fn test_merged_array_definitions_complex() {
751        // Test merging the array definitions where the schema definition
752        // is fairly complex containing multiple different possible types.
753        let object: BTreeMap<vrl::value::kind::Field, Kind> = [
754            ("carrot".into(), Kind::bytes()),
755            ("potato".into(), Kind::integer()),
756        ]
757        .into();
758
759        let array: BTreeMap<Index, Kind> = [
760            (Index::from(0), Kind::integer()),
761            (Index::from(1), Kind::boolean()),
762            (
763                Index::from(2),
764                Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])),
765            ),
766        ]
767        .into();
768
769        let mut kind = Kind::bytes();
770        kind.add_object(object);
771        kind.add_array(array);
772
773        let definition = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
774
775        let mut kind = Kind::bytes();
776        kind.add_integer();
777        kind.add_boolean();
778        kind.add_object(BTreeMap::from([
779            ("carrot".into(), Kind::bytes().or_undefined()),
780            ("potato".into(), Kind::integer().or_undefined()),
781            ("peas".into(), Kind::bytes().or_undefined()),
782        ]));
783
784        let wanted = Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]);
785        let merged = merge_array_definitions(definition);
786
787        assert_eq!(wanted, merged);
788    }
789
790    #[test]
791    fn log_get() {
792        let cases = vec![
793            (
794                BTreeMap::new(),
795                owned_value_path!(),
796                Ok(Some(BTreeMap::new().into())),
797            ),
798            (
799                BTreeMap::from([("foo".into(), "bar".into())]),
800                owned_value_path!(),
801                Ok(Some(BTreeMap::from([("foo".into(), "bar".into())]).into())),
802            ),
803            (
804                BTreeMap::from([("foo".into(), "bar".into())]),
805                owned_value_path!("foo"),
806                Ok(Some("bar".into())),
807            ),
808            (
809                BTreeMap::from([("foo".into(), "bar".into())]),
810                owned_value_path!("bar"),
811                Ok(None),
812            ),
813            (
814                btreemap! { "foo" => vec![btreemap! { "bar" => true }] },
815                owned_value_path!("foo", 0, "bar"),
816                Ok(Some(true.into())),
817            ),
818            (
819                btreemap! { "foo" => btreemap! { "bar baz" => btreemap! { "baz" => 2 } } },
820                owned_value_path!("foo", r"bar baz", "baz"),
821                Ok(Some(2.into())),
822            ),
823        ];
824
825        for (value, path, expect) in cases {
826            let value: ObjectMap = value;
827            let info = ProgramInfo {
828                fallible: false,
829                abortable: false,
830                target_queries: vec![],
831                target_assignments: vec![],
832            };
833            let target = VrlTarget::new(Event::Log(LogEvent::from(value)), &info, false);
834            let path = OwnedTargetPath::event(path);
835
836            assert_eq!(
837                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
838                expect
839            );
840        }
841    }
842
843    #[allow(clippy::too_many_lines)]
844    #[test]
845    fn log_insert() {
846        let cases = vec![
847            (
848                BTreeMap::from([("foo".into(), "bar".into())]),
849                owned_value_path!(0),
850                btreemap! { "baz" => "qux" }.into(),
851                btreemap! { "baz" => "qux" },
852                Ok(()),
853            ),
854            (
855                BTreeMap::from([("foo".into(), "bar".into())]),
856                owned_value_path!("foo"),
857                "baz".into(),
858                btreemap! { "foo" => "baz" },
859                Ok(()),
860            ),
861            (
862                BTreeMap::from([("foo".into(), "bar".into())]),
863                owned_value_path!("foo", 2, "bar baz", "a", "b"),
864                true.into(),
865                btreemap! {
866                    "foo" => vec![
867                        Value::Null,
868                        Value::Null,
869                        btreemap! {
870                            "bar baz" => btreemap! { "a" => btreemap! { "b" => true } },
871                        }.into()
872                    ]
873                },
874                Ok(()),
875            ),
876            (
877                btreemap! { "foo" => vec![0, 1, 2] },
878                owned_value_path!("foo", 5),
879                "baz".into(),
880                btreemap! {
881                    "foo" => vec![
882                        0.into(),
883                        1.into(),
884                        2.into(),
885                        Value::Null,
886                        Value::Null,
887                        Value::from("baz"),
888                    ],
889                },
890                Ok(()),
891            ),
892            (
893                BTreeMap::from([("foo".into(), "bar".into())]),
894                owned_value_path!("foo", 0),
895                "baz".into(),
896                btreemap! { "foo" => vec!["baz"] },
897                Ok(()),
898            ),
899            (
900                btreemap! { "foo" => Value::Array(vec![]) },
901                owned_value_path!("foo", 0),
902                "baz".into(),
903                btreemap! { "foo" => vec!["baz"] },
904                Ok(()),
905            ),
906            (
907                btreemap! { "foo" => Value::Array(vec![0.into()]) },
908                owned_value_path!("foo", 0),
909                "baz".into(),
910                btreemap! { "foo" => vec!["baz"] },
911                Ok(()),
912            ),
913            (
914                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
915                owned_value_path!("foo", 0),
916                "baz".into(),
917                btreemap! { "foo" => Value::Array(vec!["baz".into(), 1.into()]) },
918                Ok(()),
919            ),
920            (
921                btreemap! { "foo" => Value::Array(vec![0.into(), 1.into()]) },
922                owned_value_path!("foo", 1),
923                "baz".into(),
924                btreemap! { "foo" => Value::Array(vec![0.into(), "baz".into()]) },
925                Ok(()),
926            ),
927        ];
928
929        for (object, path, value, expect, result) in cases {
930            let object: ObjectMap = object;
931            let info = ProgramInfo {
932                fallible: false,
933                abortable: false,
934                target_queries: vec![],
935                target_assignments: vec![],
936            };
937            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
938            let expect = LogEvent::from(expect);
939            let value: Value = value;
940            let path = OwnedTargetPath::event(path);
941
942            assert_eq!(
943                Target::target_insert(&mut target, &path, value.clone()),
944                result
945            );
946            assert_eq!(
947                Target::target_get(&target, &path).map(Option::<&Value>::cloned),
948                Ok(Some(value))
949            );
950            assert_eq!(
951                match target.into_events(LogNamespace::Legacy) {
952                    TargetEvents::One(event) => vec![event],
953                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
954                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
955                }
956                .first()
957                .cloned()
958                .unwrap(),
959                Event::Log(expect)
960            );
961        }
962    }
963
964    #[test]
965    fn log_remove() {
966        let cases = vec![
967            (
968                BTreeMap::from([("foo".into(), "bar".into())]),
969                owned_value_path!("foo"),
970                false,
971                Some(BTreeMap::new().into()),
972            ),
973            (
974                BTreeMap::from([("foo".into(), "bar".into())]),
975                owned_value_path!(r"foo bar", "foo"),
976                false,
977                Some(btreemap! { "foo" => "bar"}.into()),
978            ),
979            (
980                btreemap! { "foo" => "bar", "baz" => "qux" },
981                owned_value_path!(),
982                false,
983                Some(BTreeMap::new().into()),
984            ),
985            (
986                btreemap! { "foo" => "bar", "baz" => "qux" },
987                owned_value_path!(),
988                true,
989                Some(BTreeMap::new().into()),
990            ),
991            (
992                btreemap! { "foo" => vec![0] },
993                owned_value_path!("foo", 0),
994                false,
995                Some(btreemap! { "foo" => Value::Array(vec![]) }.into()),
996            ),
997            (
998                btreemap! { "foo" => vec![0] },
999                owned_value_path!("foo", 0),
1000                true,
1001                Some(BTreeMap::new().into()),
1002            ),
1003            (
1004                btreemap! {
1005                    "foo" => btreemap! { "bar baz" => vec![0] },
1006                    "bar" => "baz",
1007                },
1008                owned_value_path!("foo", r"bar baz", 0),
1009                false,
1010                Some(
1011                    btreemap! {
1012                        "foo" => btreemap! { "bar baz" => Value::Array(vec![]) },
1013                        "bar" => "baz",
1014                    }
1015                    .into(),
1016                ),
1017            ),
1018            (
1019                btreemap! {
1020                    "foo" => btreemap! { "bar baz" => vec![0] },
1021                    "bar" => "baz",
1022                },
1023                owned_value_path!("foo", r"bar baz", 0),
1024                true,
1025                Some(btreemap! { "bar" => "baz" }.into()),
1026            ),
1027        ];
1028
1029        for (object, path, compact, expect) in cases {
1030            let info = ProgramInfo {
1031                fallible: false,
1032                abortable: false,
1033                target_queries: vec![],
1034                target_assignments: vec![],
1035            };
1036            let mut target = VrlTarget::new(Event::Log(LogEvent::from(object)), &info, false);
1037            let path = OwnedTargetPath::event(path);
1038            let removed = Target::target_get(&target, &path).unwrap().cloned();
1039
1040            assert_eq!(
1041                Target::target_remove(&mut target, &path, compact),
1042                Ok(removed)
1043            );
1044            assert_eq!(
1045                Target::target_get(&target, &OwnedTargetPath::event_root())
1046                    .map(Option::<&Value>::cloned),
1047                Ok(expect)
1048            );
1049        }
1050    }
1051
1052    #[test]
1053    fn log_into_events() {
1054        use vrl::btreemap;
1055
1056        let cases = vec![
1057            (
1058                Value::from(btreemap! {"foo" => "bar"}),
1059                vec![btreemap! {"foo" => "bar"}],
1060            ),
1061            (Value::from(1), vec![btreemap! {"message" => 1}]),
1062            (Value::from("2"), vec![btreemap! {"message" => "2"}]),
1063            (Value::from(true), vec![btreemap! {"message" => true}]),
1064            (
1065                Value::from(vec![
1066                    Value::from(1),
1067                    Value::from("2"),
1068                    Value::from(true),
1069                    Value::from(btreemap! {"foo" => "bar"}),
1070                ]),
1071                vec![
1072                    btreemap! {"message" => 1},
1073                    btreemap! {"message" => "2"},
1074                    btreemap! {"message" => true},
1075                    btreemap! {"foo" => "bar"},
1076                ],
1077            ),
1078        ];
1079
1080        for (value, expect) in cases {
1081            let metadata = EventMetadata::default();
1082            let info = ProgramInfo {
1083                fallible: false,
1084                abortable: false,
1085                target_queries: vec![],
1086                target_assignments: vec![],
1087            };
1088            let mut target = VrlTarget::new(
1089                Event::Log(LogEvent::new_with_metadata(metadata.clone())),
1090                &info,
1091                false,
1092            );
1093
1094            Target::target_insert(&mut target, &OwnedTargetPath::event_root(), value).unwrap();
1095
1096            assert_eq!(
1097                match target.into_events(LogNamespace::Legacy) {
1098                    TargetEvents::One(event) => vec![event],
1099                    TargetEvents::Logs(events) => events.collect::<Vec<_>>(),
1100                    TargetEvents::Traces(events) => events.collect::<Vec<_>>(),
1101                },
1102                expect
1103                    .into_iter()
1104                    .map(|v| Event::Log(LogEvent::from_map(v, metadata.clone())))
1105                    .collect::<Vec<_>>()
1106            );
1107        }
1108    }
1109
1110    #[test]
1111    fn metric_all_fields() {
1112        let metric = Metric::new(
1113            "zub",
1114            MetricKind::Absolute,
1115            MetricValue::Counter { value: 1.23 },
1116        )
1117        .with_namespace(Some("zoob"))
1118        .with_tags(Some(metric_tags!("tig" => "tog")))
1119        .with_timestamp(Some(
1120            Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0)
1121                .single()
1122                .expect("invalid timestamp"),
1123        ))
1124        .with_interval_ms(Some(NonZero::<u32>::new(507).unwrap()));
1125
1126        let info = ProgramInfo {
1127            fallible: false,
1128            abortable: false,
1129            target_queries: vec![
1130                OwnedTargetPath::event(owned_value_path!("name")),
1131                OwnedTargetPath::event(owned_value_path!("namespace")),
1132                OwnedTargetPath::event(owned_value_path!("interval_ms")),
1133                OwnedTargetPath::event(owned_value_path!("timestamp")),
1134                OwnedTargetPath::event(owned_value_path!("kind")),
1135                OwnedTargetPath::event(owned_value_path!("type")),
1136                OwnedTargetPath::event(owned_value_path!("tags")),
1137            ],
1138            target_assignments: vec![],
1139        };
1140        let target = VrlTarget::new(Event::Metric(metric), &info, false);
1141
1142        assert_eq!(
1143            Ok(Some(
1144                btreemap! {
1145                    "name" => "zub",
1146                    "namespace" => "zoob",
1147                    "interval_ms" => 507,
1148                    "timestamp" => Utc.with_ymd_and_hms(2020, 12, 10, 12, 0, 0).single().expect("invalid timestamp"),
1149                    "tags" => btreemap! { "tig" => "tog" },
1150                    "kind" => "absolute",
1151                    "type" => "counter",
1152                }
1153                .into()
1154            )),
1155            target
1156                .target_get(&OwnedTargetPath::event_root())
1157                .map(Option::<&Value>::cloned)
1158        );
1159    }
1160
1161    #[test]
1162    fn metric_fields() {
1163        struct Case {
1164            path: OwnedValuePath,
1165            current: Option<Value>,
1166            new: Value,
1167            delete: bool,
1168        }
1169
1170        let metric = Metric::new(
1171            "name",
1172            MetricKind::Absolute,
1173            MetricValue::Counter { value: 1.23 },
1174        )
1175        .with_tags(Some(metric_tags!("tig" => "tog")));
1176
1177        let cases = vec![
1178            Case {
1179                path: owned_value_path!("name"),
1180                current: Some(Value::from("name")),
1181                new: Value::from("namefoo"),
1182                delete: false,
1183            },
1184            Case {
1185                path: owned_value_path!("namespace"),
1186                current: None,
1187                new: "namespacefoo".into(),
1188                delete: true,
1189            },
1190            Case {
1191                path: owned_value_path!("timestamp"),
1192                current: None,
1193                new: Utc
1194                    .with_ymd_and_hms(2020, 12, 8, 12, 0, 0)
1195                    .single()
1196                    .expect("invalid timestamp")
1197                    .into(),
1198                delete: true,
1199            },
1200            Case {
1201                path: owned_value_path!("interval_ms"),
1202                current: None,
1203                new: 123_456.into(),
1204                delete: true,
1205            },
1206            Case {
1207                path: owned_value_path!("kind"),
1208                current: Some(Value::from("absolute")),
1209                new: "incremental".into(),
1210                delete: false,
1211            },
1212            Case {
1213                path: owned_value_path!("tags", "thing"),
1214                current: None,
1215                new: "footag".into(),
1216                delete: true,
1217            },
1218        ];
1219
1220        let info = ProgramInfo {
1221            fallible: false,
1222            abortable: false,
1223            target_queries: vec![
1224                OwnedTargetPath::event(owned_value_path!("name")),
1225                OwnedTargetPath::event(owned_value_path!("namespace")),
1226                OwnedTargetPath::event(owned_value_path!("timestamp")),
1227                OwnedTargetPath::event(owned_value_path!("interval_ms")),
1228                OwnedTargetPath::event(owned_value_path!("kind")),
1229            ],
1230            target_assignments: vec![],
1231        };
1232        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1233
1234        for Case {
1235            path,
1236            current,
1237            new,
1238            delete,
1239        } in cases
1240        {
1241            let path = OwnedTargetPath::event(path);
1242
1243            assert_eq!(
1244                Ok(current),
1245                target.target_get(&path).map(Option::<&Value>::cloned)
1246            );
1247            assert_eq!(Ok(()), target.target_insert(&path, new.clone()));
1248            assert_eq!(
1249                Ok(Some(new.clone())),
1250                target.target_get(&path).map(Option::<&Value>::cloned)
1251            );
1252
1253            if delete {
1254                assert_eq!(Ok(Some(new)), target.target_remove(&path, true));
1255                assert_eq!(
1256                    Ok(None),
1257                    target.target_get(&path).map(Option::<&Value>::cloned)
1258                );
1259            }
1260        }
1261    }
1262
1263    #[test]
1264    fn metric_set_tags() {
1265        let metric = Metric::new(
1266            "name",
1267            MetricKind::Absolute,
1268            MetricValue::Counter { value: 1.23 },
1269        )
1270        .with_tags(Some(metric_tags!("tig" => "tog")));
1271
1272        let info = ProgramInfo {
1273            fallible: false,
1274            abortable: false,
1275            target_queries: vec![],
1276            target_assignments: vec![],
1277        };
1278        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1279        let _result = target.target_insert(
1280            &OwnedTargetPath::event(owned_value_path!("tags")),
1281            Value::Object(BTreeMap::from([("a".into(), "b".into())])),
1282        );
1283
1284        match target {
1285            VrlTarget::Metric {
1286                metric,
1287                value: _,
1288                multi_value_tags: _,
1289            } => {
1290                assert!(metric.tags().is_some());
1291                assert_eq!(metric.tags().unwrap(), &crate::metric_tags!("a" => "b"));
1292            }
1293            _ => panic!("must be a metric"),
1294        }
1295    }
1296
1297    #[test]
1298    fn metric_invalid_paths() {
1299        let metric = Metric::new(
1300            "name",
1301            MetricKind::Absolute,
1302            MetricValue::Counter { value: 1.23 },
1303        );
1304
1305        let validpaths_get = [
1306            ".name",
1307            ".namespace",
1308            ".interval_ms",
1309            ".timestamp",
1310            ".kind",
1311            ".tags",
1312            ".type",
1313        ];
1314
1315        let validpaths_set = [
1316            ".name",
1317            ".namespace",
1318            ".interval_ms",
1319            ".timestamp",
1320            ".kind",
1321            ".tags",
1322        ];
1323
1324        let info = ProgramInfo {
1325            fallible: false,
1326            abortable: false,
1327            target_queries: vec![],
1328            target_assignments: vec![],
1329        };
1330        let mut target = VrlTarget::new(Event::Metric(metric), &info, false);
1331
1332        assert_eq!(
1333            Err(format!(
1334                "invalid path zork: expected one of {}",
1335                validpaths_get.join(", ")
1336            )),
1337            target.target_get(&OwnedTargetPath::event(owned_value_path!("zork")))
1338        );
1339
1340        assert_eq!(
1341            Err(format!(
1342                "invalid path zork: expected one of {}",
1343                validpaths_set.join(", ")
1344            )),
1345            target.target_insert(
1346                &OwnedTargetPath::event(owned_value_path!("zork")),
1347                "thing".into()
1348            )
1349        );
1350
1351        assert_eq!(
1352            Err(format!(
1353                "invalid path zork: expected one of {}",
1354                validpaths_set.join(", ")
1355            )),
1356            target.target_remove(&OwnedTargetPath::event(owned_value_path!("zork")), true)
1357        );
1358
1359        assert_eq!(
1360            Err(format!(
1361                "invalid path tags.foo.flork: expected one of {}",
1362                validpaths_get.join(", ")
1363            )),
1364            target.target_get(&OwnedTargetPath::event(owned_value_path!(
1365                "tags", "foo", "flork"
1366            )))
1367        );
1368    }
1369
1370    #[test]
1371    fn test_metric_insert_get_multi_value_tag() {
1372        let metric = Metric::new(
1373            "name",
1374            MetricKind::Absolute,
1375            MetricValue::Counter { value: 1.23 },
1376        );
1377        let info = ProgramInfo {
1378            fallible: false,
1379            abortable: false,
1380            target_queries: vec![],
1381            target_assignments: vec![],
1382        };
1383
1384        let mut target = VrlTarget::new(Event::Metric(metric), &info, true);
1385
1386        let value = Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()]);
1387        target
1388            .target_insert(
1389                &OwnedTargetPath::event(owned_value_path!("tags", "foo")),
1390                value,
1391            )
1392            .unwrap();
1393
1394        let vrl_tags_value = target
1395            .target_get(&OwnedTargetPath::event(owned_value_path!("tags")))
1396            .unwrap()
1397            .unwrap();
1398
1399        assert_eq!(
1400            vrl_tags_value,
1401            &Value::Object(BTreeMap::from([(
1402                "foo".into(),
1403                Value::Array(vec!["a".into(), "".into(), Value::Null, "b".into()])
1404            )]))
1405        );
1406
1407        let VrlTarget::Metric { metric, .. } = target else {
1408            unreachable!()
1409        };
1410
1411        // get single value (should be the last one)
1412        assert_eq!(metric.tag_value("foo"), Some("b".into()));
1413    }
1414}