vector/transforms/
metric_to_log.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::Utc;
4use serde_json::Value;
5use vector_lib::{
6    TimeZone,
7    codecs::MetricTagValues,
8    config::LogNamespace,
9    configurable::configurable_component,
10    lookup::{PathPrefix, event_path, owned_value_path, path},
11};
12use vrl::{
13    path::OwnedValuePath,
14    value::{Kind, kind::Collection},
15};
16
17use crate::{
18    config::{
19        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
20        TransformOutput, log_schema,
21    },
22    event::{self, Event, LogEvent, Metric},
23    internal_events::MetricToLogSerializeError,
24    schema::Definition,
25    transforms::{FunctionTransform, OutputBuffer, Transform},
26    types::Conversion,
27};
28
29/// Configuration for the `metric_to_log` transform.
30#[configurable_component(transform("metric_to_log", "Convert metric events to log events."))]
31#[derive(Clone, Debug, Default)]
32#[serde(deny_unknown_fields)]
33pub struct MetricToLogConfig {
34    /// Name of the tag in the metric to use for the source host.
35    ///
36    /// If present, the value of the tag is set on the generated log event in the `host` field,
37    /// where the field key uses the [global `host_key` option][global_log_schema_host_key].
38    ///
39    /// [global_log_schema_host_key]: https://vector.dev/docs/reference/configuration//global-options#log_schema.host_key
40    #[configurable(metadata(docs::examples = "host", docs::examples = "hostname"))]
41    pub host_tag: Option<String>,
42
43    /// The name of the time zone to apply to timestamp conversions that do not contain an explicit
44    /// time zone.
45    ///
46    /// This overrides the [global `timezone`][global_timezone] option. The time zone name may be
47    /// any name in the [TZ database][tz_database] or `local` to indicate system local time.
48    ///
49    /// [global_timezone]: https://vector.dev/docs/reference/configuration//global-options#timezone
50    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
51    pub timezone: Option<TimeZone>,
52
53    /// The namespace to use for logs. This overrides the global setting.
54    #[serde(default)]
55    #[configurable(metadata(docs::hidden))]
56    pub log_namespace: Option<bool>,
57
58    /// Controls how metric tag values are encoded.
59    ///
60    /// When set to `single`, only the last non-bare value of tags is displayed with the
61    /// metric.  When set to `full`, all metric tags are exposed as separate assignments as
62    /// described by [the `native_json` codec][vector_native_json].
63    ///
64    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
65    #[serde(default)]
66    pub metric_tag_values: MetricTagValues,
67}
68
69impl MetricToLogConfig {
70    pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
71        MetricToLog::new(
72            self.host_tag.as_deref(),
73            self.timezone.unwrap_or_else(|| context.globals.timezone()),
74            context.log_namespace(self.log_namespace),
75            self.metric_tag_values,
76        )
77    }
78}
79
80impl GenerateConfig for MetricToLogConfig {
81    fn generate_config() -> toml::Value {
82        toml::Value::try_from(Self {
83            host_tag: Some("host-tag".to_string()),
84            timezone: None,
85            log_namespace: None,
86            metric_tag_values: MetricTagValues::Single,
87        })
88        .unwrap()
89    }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "metric_to_log")]
94impl TransformConfig for MetricToLogConfig {
95    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
96        Ok(Transform::function(self.build_transform(context)))
97    }
98
99    fn input(&self) -> Input {
100        Input::metric()
101    }
102
103    fn outputs(
104        &self,
105        context: &TransformContext,
106        input_definitions: &[(OutputId, Definition)],
107    ) -> Vec<TransformOutput> {
108        let log_namespace = context.schema.log_namespace().merge(self.log_namespace);
109        let schema_definition = schema_definition(log_namespace);
110
111        vec![TransformOutput::new(
112            DataType::Log,
113            input_definitions
114                .iter()
115                .map(|(output, _)| (output.clone(), schema_definition.clone()))
116                .collect(),
117        )]
118    }
119
120    fn enable_concurrency(&self) -> bool {
121        true
122    }
123}
124
125fn schema_definition(log_namespace: LogNamespace) -> Definition {
126    let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace]))
127        .with_event_field(&owned_value_path!("name"), Kind::bytes(), None)
128        .with_event_field(
129            &owned_value_path!("namespace"),
130            Kind::bytes().or_undefined(),
131            None,
132        )
133        .with_event_field(
134            &owned_value_path!("tags"),
135            Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
136            None,
137        )
138        .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None)
139        .with_event_field(
140            &owned_value_path!("counter"),
141            Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
142            None,
143        )
144        .with_event_field(
145            &owned_value_path!("gauge"),
146            Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
147            None,
148        )
149        .with_event_field(
150            &owned_value_path!("set"),
151            Kind::object(Collection::empty().with_known(
152                "values",
153                Kind::array(Collection::empty().with_unknown(Kind::bytes())),
154            ))
155            .or_undefined(),
156            None,
157        )
158        .with_event_field(
159            &owned_value_path!("distribution"),
160            Kind::object(
161                Collection::empty()
162                    .with_known(
163                        "samples",
164                        Kind::array(
165                            Collection::empty().with_unknown(Kind::object(
166                                Collection::empty()
167                                    .with_known("value", Kind::float())
168                                    .with_known("rate", Kind::integer()),
169                            )),
170                        ),
171                    )
172                    .with_known("statistic", Kind::bytes()),
173            )
174            .or_undefined(),
175            None,
176        )
177        .with_event_field(
178            &owned_value_path!("aggregated_histogram"),
179            Kind::object(
180                Collection::empty()
181                    .with_known(
182                        "buckets",
183                        Kind::array(
184                            Collection::empty().with_unknown(Kind::object(
185                                Collection::empty()
186                                    .with_known("upper_limit", Kind::float())
187                                    .with_known("count", Kind::integer()),
188                            )),
189                        ),
190                    )
191                    .with_known("count", Kind::integer())
192                    .with_known("sum", Kind::float()),
193            )
194            .or_undefined(),
195            None,
196        )
197        .with_event_field(
198            &owned_value_path!("aggregated_summary"),
199            Kind::object(
200                Collection::empty()
201                    .with_known(
202                        "quantiles",
203                        Kind::array(
204                            Collection::empty().with_unknown(Kind::object(
205                                Collection::empty()
206                                    .with_known("quantile", Kind::float())
207                                    .with_known("value", Kind::float()),
208                            )),
209                        ),
210                    )
211                    .with_known("count", Kind::integer())
212                    .with_known("sum", Kind::float()),
213            )
214            .or_undefined(),
215            None,
216        )
217        .with_event_field(
218            &owned_value_path!("sketch"),
219            Kind::any().or_undefined(),
220            None,
221        );
222
223    match log_namespace {
224        LogNamespace::Vector => {
225            // from serializing the Metric (Legacy moves it to another field)
226            schema_definition = schema_definition.with_event_field(
227                &owned_value_path!("timestamp"),
228                Kind::bytes().or_undefined(),
229                None,
230            );
231
232            // This is added as a "marker" field to determine which namespace is being used at runtime.
233            // This is normally handled automatically by sources, but this is a special case.
234            schema_definition = schema_definition.with_metadata_field(
235                &owned_value_path!("vector"),
236                Kind::object(Collection::empty()),
237                None,
238            );
239        }
240        LogNamespace::Legacy => {
241            if let Some(timestamp_key) = log_schema().timestamp_key() {
242                schema_definition =
243                    schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None);
244            }
245
246            schema_definition = schema_definition.with_event_field(
247                log_schema().host_key().expect("valid host key"),
248                Kind::bytes().or_undefined(),
249                None,
250            );
251        }
252    }
253    schema_definition
254}
255
256#[derive(Clone, Debug)]
257pub struct MetricToLog {
258    host_tag: Option<OwnedValuePath>,
259    timezone: TimeZone,
260    log_namespace: LogNamespace,
261    tag_values: MetricTagValues,
262}
263
264impl MetricToLog {
265    pub fn new(
266        host_tag: Option<&str>,
267        timezone: TimeZone,
268        log_namespace: LogNamespace,
269        tag_values: MetricTagValues,
270    ) -> Self {
271        Self {
272            host_tag: host_tag.map_or(
273                log_schema().host_key().cloned().map(|mut key| {
274                    key.push_front_field("tags");
275                    key
276                }),
277                |host| Some(owned_value_path!("tags", host)),
278            ),
279            timezone,
280            log_namespace,
281            tag_values,
282        }
283    }
284
285    pub fn transform_one(&self, mut metric: Metric) -> Option<LogEvent> {
286        if self.tag_values == MetricTagValues::Single {
287            metric.reduce_tags_to_single();
288        }
289        serde_json::to_value(&metric)
290            .map_err(|error| emit!(MetricToLogSerializeError { error }))
291            .ok()
292            .and_then(|value| match value {
293                Value::Object(object) => {
294                    let (_, _, metadata) = metric.into_parts();
295                    let mut log = LogEvent::new_with_metadata(metadata);
296
297                    // converting all fields from serde `Value` to Vector `Value`
298                    for (key, value) in object {
299                        log.insert(event_path!(&key), value);
300                    }
301
302                    if self.log_namespace == LogNamespace::Legacy {
303                        // "Vector" namespace just leaves the `timestamp` in place.
304
305                        let timestamp = log
306                            .remove(event_path!("timestamp"))
307                            .and_then(|value| {
308                                Conversion::Timestamp(self.timezone)
309                                    .convert(value.coerce_to_bytes())
310                                    .ok()
311                            })
312                            .unwrap_or_else(|| event::Value::Timestamp(Utc::now()));
313
314                        log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
315
316                        if let Some(host_tag) = &self.host_tag
317                            && let Some(host_value) =
318                                log.remove_prune((PathPrefix::Event, host_tag), true)
319                        {
320                            log.maybe_insert(log_schema().host_key_target_path(), host_value);
321                        }
322                    }
323                    if self.log_namespace == LogNamespace::Vector {
324                        // Create vector metadata since this is used as a marker to see which namespace is used at runtime.
325                        // This can be removed once metrics support namespacing.
326                        log.insert(
327                            (PathPrefix::Metadata, path!("vector")),
328                            vrl::value::Value::Object(BTreeMap::new()),
329                        );
330                    }
331                    Some(log)
332                }
333                _ => None,
334            })
335    }
336}
337
338impl FunctionTransform for MetricToLog {
339    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
340        let retval: Option<Event> = self
341            .transform_one(event.into_metric())
342            .map(|log| log.into());
343        output.extend(retval.into_iter())
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use std::sync::Arc;
350
351    use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
352    use futures::executor::block_on;
353    use proptest::prelude::*;
354    use similar_asserts::assert_eq;
355    use tokio::sync::mpsc;
356    use tokio_stream::wrappers::ReceiverStream;
357    use vector_lib::{config::ComponentKey, event::EventMetadata, metric_tags};
358
359    use super::*;
360    use crate::{
361        event::{
362            KeyString, Metric, Value,
363            metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet},
364        },
365        test_util::{components::assert_transform_compliance, random_string},
366        transforms::test::create_topology,
367    };
368
369    #[test]
370    fn generate_config() {
371        crate::test_util::test_generate_config::<MetricToLogConfig>();
372    }
373
374    async fn do_transform(metric: Metric) -> Option<LogEvent> {
375        assert_transform_compliance(async move {
376            let config = MetricToLogConfig {
377                host_tag: Some("host".into()),
378                timezone: None,
379                log_namespace: Some(false),
380                ..Default::default()
381            };
382            let (tx, rx) = mpsc::channel(1);
383            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
384
385            tx.send(metric.into()).await.unwrap();
386
387            let result = out.recv().await;
388
389            drop(tx);
390            topology.stop().await;
391            assert_eq!(out.recv().await, None);
392
393            result
394        })
395        .await
396        .map(|e| e.into_log())
397    }
398
399    fn ts() -> DateTime<Utc> {
400        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
401            .single()
402            .and_then(|t| t.with_nanosecond(11))
403            .expect("invalid timestamp")
404    }
405
406    fn tags() -> MetricTags {
407        metric_tags! {
408            "host" => "localhost",
409            "some_tag" => "some_value",
410        }
411    }
412
413    fn event_metadata() -> EventMetadata {
414        EventMetadata::default().with_source_type("unit_test_stream")
415    }
416
417    #[tokio::test]
418    async fn transform_counter() {
419        let counter = Metric::new_with_metadata(
420            "counter",
421            MetricKind::Absolute,
422            MetricValue::Counter { value: 1.0 },
423            event_metadata(),
424        )
425        .with_tags(Some(tags()))
426        .with_timestamp(Some(ts()));
427        let mut metadata = counter.metadata().clone();
428        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
429        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
430        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
431
432        let log = do_transform(counter).await.unwrap();
433        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
434
435        assert_eq!(
436            collected,
437            vec![
438                (KeyString::from("counter.value"), &Value::from(1.0)),
439                (KeyString::from("host"), &Value::from("localhost")),
440                (KeyString::from("kind"), &Value::from("absolute")),
441                (KeyString::from("name"), &Value::from("counter")),
442                (KeyString::from("tags.some_tag"), &Value::from("some_value")),
443                (KeyString::from("timestamp"), &Value::from(ts())),
444            ]
445        );
446        assert_eq!(log.metadata(), &metadata);
447    }
448
449    #[tokio::test]
450    async fn transform_gauge() {
451        let gauge = Metric::new_with_metadata(
452            "gauge",
453            MetricKind::Absolute,
454            MetricValue::Gauge { value: 1.0 },
455            event_metadata(),
456        )
457        .with_timestamp(Some(ts()));
458        let mut metadata = gauge.metadata().clone();
459        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
460        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
461        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
462
463        let log = do_transform(gauge).await.unwrap();
464        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
465
466        assert_eq!(
467            collected,
468            vec![
469                (KeyString::from("gauge.value"), &Value::from(1.0)),
470                (KeyString::from("kind"), &Value::from("absolute")),
471                (KeyString::from("name"), &Value::from("gauge")),
472                (KeyString::from("timestamp"), &Value::from(ts())),
473            ]
474        );
475        assert_eq!(log.metadata(), &metadata);
476    }
477
478    #[tokio::test]
479    async fn transform_set() {
480        let set = Metric::new_with_metadata(
481            "set",
482            MetricKind::Absolute,
483            MetricValue::Set {
484                values: vec!["one".into(), "two".into()].into_iter().collect(),
485            },
486            event_metadata(),
487        )
488        .with_timestamp(Some(ts()));
489        let mut metadata = set.metadata().clone();
490        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
491        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
492        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
493
494        let log = do_transform(set).await.unwrap();
495        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
496
497        assert_eq!(
498            collected,
499            vec![
500                (KeyString::from("kind"), &Value::from("absolute")),
501                (KeyString::from("name"), &Value::from("set")),
502                (KeyString::from("set.values[0]"), &Value::from("one")),
503                (KeyString::from("set.values[1]"), &Value::from("two")),
504                (KeyString::from("timestamp"), &Value::from(ts())),
505            ]
506        );
507        assert_eq!(log.metadata(), &metadata);
508    }
509
510    #[tokio::test]
511    async fn transform_distribution() {
512        let distro = Metric::new_with_metadata(
513            "distro",
514            MetricKind::Absolute,
515            MetricValue::Distribution {
516                samples: vector_lib::samples![1.0 => 10, 2.0 => 20],
517                statistic: StatisticKind::Histogram,
518            },
519            event_metadata(),
520        )
521        .with_timestamp(Some(ts()));
522        let mut metadata = distro.metadata().clone();
523        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
524        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
525        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
526
527        let log = do_transform(distro).await.unwrap();
528        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
529
530        assert_eq!(
531            collected,
532            vec![
533                (
534                    KeyString::from("distribution.samples[0].rate"),
535                    &Value::from(10)
536                ),
537                (
538                    KeyString::from("distribution.samples[0].value"),
539                    &Value::from(1.0)
540                ),
541                (
542                    KeyString::from("distribution.samples[1].rate"),
543                    &Value::from(20)
544                ),
545                (
546                    KeyString::from("distribution.samples[1].value"),
547                    &Value::from(2.0)
548                ),
549                (
550                    KeyString::from("distribution.statistic"),
551                    &Value::from("histogram")
552                ),
553                (KeyString::from("kind"), &Value::from("absolute")),
554                (KeyString::from("name"), &Value::from("distro")),
555                (KeyString::from("timestamp"), &Value::from(ts())),
556            ]
557        );
558        assert_eq!(log.metadata(), &metadata);
559    }
560
561    #[tokio::test]
562    async fn transform_histogram() {
563        let histo = Metric::new_with_metadata(
564            "histo",
565            MetricKind::Absolute,
566            MetricValue::AggregatedHistogram {
567                buckets: vector_lib::buckets![1.0 => 10, 2.0 => 20],
568                count: 30,
569                sum: 50.0,
570            },
571            event_metadata(),
572        )
573        .with_timestamp(Some(ts()));
574        let mut metadata = histo.metadata().clone();
575        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
576        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
577        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
578
579        let log = do_transform(histo).await.unwrap();
580        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
581
582        assert_eq!(
583            collected,
584            vec![
585                (
586                    KeyString::from("aggregated_histogram.buckets[0].count"),
587                    &Value::from(10)
588                ),
589                (
590                    KeyString::from("aggregated_histogram.buckets[0].upper_limit"),
591                    &Value::from(1.0)
592                ),
593                (
594                    KeyString::from("aggregated_histogram.buckets[1].count"),
595                    &Value::from(20)
596                ),
597                (
598                    KeyString::from("aggregated_histogram.buckets[1].upper_limit"),
599                    &Value::from(2.0)
600                ),
601                (
602                    KeyString::from("aggregated_histogram.count"),
603                    &Value::from(30)
604                ),
605                (
606                    KeyString::from("aggregated_histogram.sum"),
607                    &Value::from(50.0)
608                ),
609                (KeyString::from("kind"), &Value::from("absolute")),
610                (KeyString::from("name"), &Value::from("histo")),
611                (KeyString::from("timestamp"), &Value::from(ts())),
612            ]
613        );
614        assert_eq!(log.metadata(), &metadata);
615    }
616
617    #[tokio::test]
618    async fn transform_summary() {
619        let summary = Metric::new_with_metadata(
620            "summary",
621            MetricKind::Absolute,
622            MetricValue::AggregatedSummary {
623                quantiles: vector_lib::quantiles![50.0 => 10.0, 90.0 => 20.0],
624                count: 30,
625                sum: 50.0,
626            },
627            event_metadata(),
628        )
629        .with_timestamp(Some(ts()));
630        let mut metadata = summary.metadata().clone();
631        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
632        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
633        metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
634
635        let log = do_transform(summary).await.unwrap();
636        let collected: Vec<_> = log.all_event_fields().unwrap().collect();
637
638        assert_eq!(
639            collected,
640            vec![
641                (
642                    KeyString::from("aggregated_summary.count"),
643                    &Value::from(30)
644                ),
645                (
646                    KeyString::from("aggregated_summary.quantiles[0].quantile"),
647                    &Value::from(50.0)
648                ),
649                (
650                    KeyString::from("aggregated_summary.quantiles[0].value"),
651                    &Value::from(10.0)
652                ),
653                (
654                    KeyString::from("aggregated_summary.quantiles[1].quantile"),
655                    &Value::from(90.0)
656                ),
657                (
658                    KeyString::from("aggregated_summary.quantiles[1].value"),
659                    &Value::from(20.0)
660                ),
661                (
662                    KeyString::from("aggregated_summary.sum"),
663                    &Value::from(50.0)
664                ),
665                (KeyString::from("kind"), &Value::from("absolute")),
666                (KeyString::from("name"), &Value::from("summary")),
667                (KeyString::from("timestamp"), &Value::from(ts())),
668            ]
669        );
670        assert_eq!(log.metadata(), &metadata);
671    }
672
673    // Test the encoding of tag values with the `metric_tag_values` flag.
674    proptest! {
675        #[test]
676        fn transform_tag_single_encoding(values: TagValueSet) {
677            let name = random_string(16);
678            let tags = block_on(transform_tags(
679                MetricTagValues::Single,
680                values.iter()
681                    .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
682                    .collect(),
683            ));
684            // The resulting tag must be either a single string value or not present.
685            let value = values.into_single().map(|value| Value::Bytes(value.into()));
686            assert_eq!(tags.get(&*name), value.as_ref());
687        }
688
689        #[test]
690        fn transform_tag_full_encoding(values: TagValueSet) {
691            let name = random_string(16);
692            let tags = block_on(transform_tags(
693                MetricTagValues::Full,
694                values.iter()
695                    .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
696                    .collect(),
697            ));
698            let tag = tags.get(&*name);
699            match values.len() {
700                // Empty tag set => missing tag
701                0 => assert_eq!(tag, None),
702                // Single value tag => scalar value
703                1 => assert_eq!(tag, Some(&tag_to_value(values.into_iter().next().unwrap()))),
704                // Multi-valued tag => array value
705                _ => assert_eq!(tag, Some(&Value::Array(values.into_iter().map(tag_to_value).collect()))),
706            }
707        }
708    }
709
710    fn tag_to_value(tag: TagValue) -> Value {
711        tag.into_option().into()
712    }
713
714    async fn transform_tags(metric_tag_values: MetricTagValues, tags: MetricTags) -> Value {
715        let counter = Metric::new(
716            "counter",
717            MetricKind::Absolute,
718            MetricValue::Counter { value: 1.0 },
719        )
720        .with_tags(Some(tags))
721        .with_timestamp(Some(ts()));
722
723        let mut output = OutputBuffer::with_capacity(1);
724
725        MetricToLogConfig {
726            metric_tag_values,
727            ..Default::default()
728        }
729        .build_transform(&TransformContext::default())
730        .transform(&mut output, counter.into());
731
732        assert_eq!(output.len(), 1);
733        output.into_events().next().unwrap().into_log()["tags"].clone()
734    }
735}