vector/transforms/
log_to_metric.rs

1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6    config::LogNamespace,
7    configurable::configurable_component,
8    event::{
9        DatadogMetricOriginMetadata, LogEvent,
10        metric::{Bucket, Quantile, Sample},
11    },
12};
13use vrl::{
14    event_path, path,
15    path::{PathParseError, parse_target_path},
16};
17
18use crate::{
19    common::expansion::pair_expansion,
20    config::{
21        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
22        TransformOutput, schema::Definition,
23    },
24    event::{
25        Event, Value,
26        metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
27    },
28    internal_events::{
29        DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
30        MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
31        MetricMetadataParseError, ParserMissingFieldError,
32    },
33    schema,
34    template::{Template, TemplateRenderingError},
35    transforms::{
36        FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
37    },
38};
39
40const ORIGIN_SERVICE_VALUE: u32 = 3;
41
42/// Configuration for the `log_to_metric` transform.
43#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct LogToMetricConfig {
47    /// A list of metrics to generate.
48    pub metrics: Option<Vec<MetricConfig>>,
49
50    /// Setting this flag changes the behavior of this transformation.
51    /// Notably the `metrics` field will be ignored.
52    /// All incoming events will be processed and if possible they will be converted to log events.
53    /// Otherwise, only items specified in the `metrics` field will be processed.
54    ///
55    /// Example:
56    /// <pre class="chroma"><code class="language-toml" data-lang="toml">{
57    ///     "counter": {
58    ///         "value": 10.0
59    ///     },
60    ///     "kind": "incremental",
61    ///     "name": "test.transform.counter",
62    ///     "tags": {
63    ///         "env": "test_env",
64    ///         "host": "localhost"
65    ///     }
66    /// }
67    /// </code></pre>
68    ///
69    /// This is a JSON representation of a counter with the following properties:
70    ///
71    /// - `counter`: An object with a single property `value` representing the counter value, in this case, `10.0`).
72    /// - `kind`: A string indicating the kind of counter, in this case, "incremental".
73    /// - `name`: A string representing the name of the counter, here set to "test.transform.counter".
74    /// - `tags`: An object containing additional tags such as "env" and "host".
75    ///
76    /// Objects that can be processed include counter, histogram, gauge, set and summary.
77    pub all_metrics: Option<bool>,
78}
79
80/// Specification of a counter derived from a log event.
81#[configurable_component]
82#[derive(Clone, Debug)]
83pub struct CounterConfig {
84    /// Increments the counter by the value in `field`, instead of only by `1`.
85    #[serde(default = "default_increment_by_value")]
86    pub increment_by_value: bool,
87
88    #[configurable(derived)]
89    #[serde(default = "default_kind")]
90    pub kind: MetricKind,
91}
92
93/// Specification of a metric derived from a log event.
94// TODO: While we're resolving the schema for this enum somewhat reasonably (in
95// `generate-components-docs.rb`), we have a problem where an overlapping field (overlap between two
96// or more of the subschemas) takes the details of the last subschema to be iterated over that
97// contains that field, such that, for example, the `Summary` variant below is overriding the
98// description for almost all of the fields because they're shared across all of the variants.
99#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct MetricConfig {
102    /// Name of the field in the event to generate the metric.
103    pub field: Template,
104
105    /// Overrides the name of the counter.
106    ///
107    /// If not specified, `field` is used as the name of the metric.
108    pub name: Option<Template>,
109
110    /// Sets the namespace for the metric.
111    pub namespace: Option<Template>,
112
113    /// Tags to apply to the metric.
114    ///
115    /// Both keys and values can be templated, allowing you to attach dynamic tags to events.
116    ///
117    #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
118    pub tags: Option<IndexMap<Template, TagConfig>>,
119
120    #[configurable(derived)]
121    #[serde(flatten)]
122    pub metric: MetricTypeConfig,
123}
124
125/// Specification of the value of a created tag.
126///
127/// This may be a single value, a `null` for a bare tag, or an array of either.
128#[configurable_component]
129#[derive(Clone, Debug)]
130#[serde(untagged)]
131pub enum TagConfig {
132    /// A single tag value.
133    Plain(Option<Template>),
134
135    /// An array of values to give to the same tag name.
136    Multi(Vec<Option<Template>>),
137}
138
139/// Specification of the type of an individual metric, and any associated data.
140#[configurable_component]
141#[derive(Clone, Debug)]
142#[serde(tag = "type", rename_all = "snake_case")]
143#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
144pub enum MetricTypeConfig {
145    /// A counter.
146    Counter(CounterConfig),
147
148    /// A histogram.
149    Histogram,
150
151    /// A gauge.
152    Gauge,
153
154    /// A set.
155    Set,
156
157    /// A summary.
158    Summary,
159}
160
161impl MetricConfig {
162    fn field(&self) -> &str {
163        self.field.get_ref()
164    }
165}
166
167const fn default_increment_by_value() -> bool {
168    false
169}
170
171const fn default_kind() -> MetricKind {
172    MetricKind::Incremental
173}
174
175#[derive(Debug, Clone)]
176pub struct LogToMetric {
177    pub metrics: Vec<MetricConfig>,
178    pub all_metrics: bool,
179}
180
181impl GenerateConfig for LogToMetricConfig {
182    fn generate_config() -> toml::Value {
183        toml::Value::try_from(Self {
184            metrics: Some(vec![MetricConfig {
185                field: "field_name".try_into().expect("Fixed template"),
186                name: None,
187                namespace: None,
188                tags: None,
189                metric: MetricTypeConfig::Counter(CounterConfig {
190                    increment_by_value: false,
191                    kind: MetricKind::Incremental,
192                }),
193            }]),
194            all_metrics: Some(true),
195        })
196        .unwrap()
197    }
198}
199
200#[async_trait::async_trait]
201#[typetag::serde(name = "log_to_metric")]
202impl TransformConfig for LogToMetricConfig {
203    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
204        Ok(Transform::function(LogToMetric {
205            metrics: self.metrics.clone().unwrap_or_default(),
206            all_metrics: self.all_metrics.unwrap_or_default(),
207        }))
208    }
209
210    fn input(&self) -> Input {
211        Input::log()
212    }
213
214    fn outputs(
215        &self,
216        _: vector_lib::enrichment::TableRegistry,
217        _: &[(OutputId, schema::Definition)],
218        _: LogNamespace,
219    ) -> Vec<TransformOutput> {
220        // Converting the log to a metric means we lose all incoming `Definition`s.
221        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
222    }
223
224    fn enable_concurrency(&self) -> bool {
225        true
226    }
227}
228
229/// Kinds of TranformError for Parsing
230#[configurable_component]
231#[derive(Clone, Debug)]
232pub enum TransformParseErrorKind {
233    ///  Error when Parsing a Float
234    FloatError,
235    ///  Error when Parsing an Int
236    IntError,
237    /// Errors when Parsing Arrays
238    ArrayError,
239}
240
241impl std::fmt::Display for TransformParseErrorKind {
242    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
243        write!(f, "{self:?}")
244    }
245}
246
247enum TransformError {
248    PathNotFound {
249        path: String,
250    },
251    PathNull {
252        path: String,
253    },
254    MetricDetailsNotFound,
255    MetricValueError {
256        path: String,
257        path_value: String,
258    },
259    ParseError {
260        path: String,
261        kind: TransformParseErrorKind,
262    },
263    ParseFloatError {
264        path: String,
265        error: ParseFloatError,
266    },
267    TemplateRenderingError(TemplateRenderingError),
268    PairExpansionError {
269        key: String,
270        value: String,
271        error: serde_json::Error,
272    },
273}
274
275fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
276    template
277        .render_string(event)
278        .map_err(TransformError::TemplateRenderingError)
279}
280
281fn render_tags(
282    tags: &Option<IndexMap<Template, TagConfig>>,
283    event: &Event,
284) -> Result<Option<MetricTags>, TransformError> {
285    let mut static_tags: HashMap<String, String> = HashMap::new();
286    let mut dynamic_tags: HashMap<String, String> = HashMap::new();
287    Ok(match tags {
288        None => None,
289        Some(tags) => {
290            let mut result = MetricTags::default();
291            for (name, config) in tags {
292                match config {
293                    TagConfig::Plain(template) => {
294                        render_tag_into(
295                            event,
296                            name,
297                            template.as_ref(),
298                            &mut result,
299                            &mut static_tags,
300                            &mut dynamic_tags,
301                        )?;
302                    }
303                    TagConfig::Multi(vec) => {
304                        for template in vec {
305                            render_tag_into(
306                                event,
307                                name,
308                                template.as_ref(),
309                                &mut result,
310                                &mut static_tags,
311                                &mut dynamic_tags,
312                            )?;
313                        }
314                    }
315                }
316            }
317            for (k, v) in static_tags {
318                if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
319                    warn!(
320                        "Static tags overrides dynamic tags. \
321                key: {}, value: {:?}, discarded value: {:?}",
322                        k, v, discarded_v
323                    );
324                };
325            }
326            result.as_option()
327        }
328    })
329}
330
331fn render_tag_into(
332    event: &Event,
333    key_template: &Template,
334    value_template: Option<&Template>,
335    result: &mut MetricTags,
336    static_tags: &mut HashMap<String, String>,
337    dynamic_tags: &mut HashMap<String, String>,
338) -> Result<(), TransformError> {
339    let key = match render_template(key_template, event) {
340        Ok(key_s) => key_s,
341        Err(TransformError::TemplateRenderingError(err)) => {
342            emit!(crate::internal_events::TemplateRenderingError {
343                error: err,
344                drop_event: false,
345                field: Some(key_template.get_ref()),
346            });
347            return Ok(());
348        }
349        Err(err) => return Err(err),
350    };
351    match value_template {
352        None => {
353            result.insert(key, TagValue::Bare);
354        }
355        Some(template) => match render_template(template, event) {
356            Ok(value) => {
357                let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
358                    .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
359                result.extend(expanded_pairs);
360            }
361            Err(TransformError::TemplateRenderingError(value_error)) => {
362                emit!(crate::internal_events::TemplateRenderingError {
363                    error: value_error,
364                    drop_event: false,
365                    field: Some(template.get_ref()),
366                });
367                return Ok(());
368            }
369            Err(other) => return Err(other),
370        },
371    };
372    Ok(())
373}
374
375fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
376    let log = event.as_log();
377
378    let timestamp = log
379        .get_timestamp()
380        .and_then(Value::as_timestamp)
381        .cloned()
382        .or_else(|| Some(Utc::now()));
383
384    // Assign the OriginService for the new metric
385    let metadata = event
386        .metadata()
387        .clone()
388        .with_schema_definition(&Arc::new(Definition::any()))
389        .with_origin_metadata(DatadogMetricOriginMetadata::new(
390            None,
391            None,
392            Some(ORIGIN_SERVICE_VALUE),
393        ));
394
395    let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
396        path: config.field().to_string(),
397    })?;
398
399    let value = match log.get(&field) {
400        None => Err(TransformError::PathNotFound {
401            path: field.to_string(),
402        }),
403        Some(Value::Null) => Err(TransformError::PathNull {
404            path: field.to_string(),
405        }),
406        Some(value) => Ok(value),
407    }?;
408
409    let name = config.name.as_ref().unwrap_or(&config.field);
410    let name = render_template(name, event)?;
411
412    let namespace = config.namespace.as_ref();
413    let namespace = namespace
414        .map(|namespace| render_template(namespace, event))
415        .transpose()?;
416
417    let tags = render_tags(&config.tags, event)?;
418
419    let (kind, value) = match &config.metric {
420        MetricTypeConfig::Counter(counter) => {
421            let value = if counter.increment_by_value {
422                value.to_string_lossy().parse().map_err(|error| {
423                    TransformError::ParseFloatError {
424                        path: config.field.get_ref().to_owned(),
425                        error,
426                    }
427                })?
428            } else {
429                1.0
430            };
431
432            (counter.kind, MetricValue::Counter { value })
433        }
434        MetricTypeConfig::Histogram => {
435            let value = value.to_string_lossy().parse().map_err(|error| {
436                TransformError::ParseFloatError {
437                    path: field.to_string(),
438                    error,
439                }
440            })?;
441
442            (
443                MetricKind::Incremental,
444                MetricValue::Distribution {
445                    samples: vector_lib::samples![value => 1],
446                    statistic: StatisticKind::Histogram,
447                },
448            )
449        }
450        MetricTypeConfig::Summary => {
451            let value = value.to_string_lossy().parse().map_err(|error| {
452                TransformError::ParseFloatError {
453                    path: field.to_string(),
454                    error,
455                }
456            })?;
457
458            (
459                MetricKind::Incremental,
460                MetricValue::Distribution {
461                    samples: vector_lib::samples![value => 1],
462                    statistic: StatisticKind::Summary,
463                },
464            )
465        }
466        MetricTypeConfig::Gauge => {
467            let value = value.to_string_lossy().parse().map_err(|error| {
468                TransformError::ParseFloatError {
469                    path: field.to_string(),
470                    error,
471                }
472            })?;
473
474            (MetricKind::Absolute, MetricValue::Gauge { value })
475        }
476        MetricTypeConfig::Set => {
477            let value = value.to_string_lossy().into_owned();
478
479            (
480                MetricKind::Incremental,
481                MetricValue::Set {
482                    values: std::iter::once(value).collect(),
483                },
484            )
485        }
486    };
487    Ok(Metric::new_with_metadata(name, kind, value, metadata)
488        .with_namespace(namespace)
489        .with_tags(tags)
490        .with_timestamp(timestamp))
491}
492
493fn bytes_to_str(value: &Value) -> Option<String> {
494    match value {
495        Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
496        _ => None,
497    }
498}
499
500fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
501    // TODO: update returned errors after `TransformError` is refactored.
502    let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
503        PathParseError::InvalidPathSyntax { path } => PathNotFound {
504            path: path.to_string(),
505        },
506    })?;
507    match maybe_value {
508        None => Err(PathNotFound {
509            path: path.to_string(),
510        }),
511        Some(v) => Ok(bytes_to_str(v)),
512    }
513}
514
515fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
516    let counter_value = log
517        .get(event_path!("counter", "value"))
518        .ok_or_else(|| TransformError::PathNotFound {
519            path: "counter.value".to_string(),
520        })?
521        .as_float()
522        .ok_or_else(|| TransformError::ParseError {
523            path: "counter.value".to_string(),
524            kind: TransformParseErrorKind::FloatError,
525        })?;
526
527    Ok(MetricValue::Counter {
528        value: *counter_value,
529    })
530}
531
532fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
533    let gauge_value = log
534        .get(event_path!("gauge", "value"))
535        .ok_or_else(|| TransformError::PathNotFound {
536            path: "gauge.value".to_string(),
537        })?
538        .as_float()
539        .ok_or_else(|| TransformError::ParseError {
540            path: "gauge.value".to_string(),
541            kind: TransformParseErrorKind::FloatError,
542        })?;
543    Ok(MetricValue::Gauge {
544        value: *gauge_value,
545    })
546}
547
548fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
549    let set_values = log
550        .get(event_path!("set", "values"))
551        .ok_or_else(|| TransformError::PathNotFound {
552            path: "set.values".to_string(),
553        })?
554        .as_array()
555        .ok_or_else(|| TransformError::ParseError {
556            path: "set.values".to_string(),
557            kind: TransformParseErrorKind::ArrayError,
558        })?;
559
560    let mut values: Vec<String> = Vec::new();
561    for e_value in set_values {
562        let value = e_value
563            .as_bytes()
564            .ok_or_else(|| TransformError::ParseError {
565                path: "set.values".to_string(),
566                kind: TransformParseErrorKind::ArrayError,
567            })?;
568        values.push(String::from_utf8_lossy(value).to_string());
569    }
570
571    Ok(MetricValue::Set {
572        values: values.into_iter().collect(),
573    })
574}
575
576fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
577    let event_samples = log
578        .get(event_path!("distribution", "samples"))
579        .ok_or_else(|| TransformError::PathNotFound {
580            path: "distribution.samples".to_string(),
581        })?
582        .as_array()
583        .ok_or_else(|| TransformError::ParseError {
584            path: "distribution.samples".to_string(),
585            kind: TransformParseErrorKind::ArrayError,
586        })?;
587
588    let mut samples: Vec<Sample> = Vec::new();
589    for e_sample in event_samples {
590        let value = e_sample
591            .get(path!("value"))
592            .ok_or_else(|| TransformError::PathNotFound {
593                path: "value".to_string(),
594            })?
595            .as_float()
596            .ok_or_else(|| TransformError::ParseError {
597                path: "value".to_string(),
598                kind: TransformParseErrorKind::FloatError,
599            })?;
600
601        let rate = e_sample
602            .get(path!("rate"))
603            .ok_or_else(|| TransformError::PathNotFound {
604                path: "rate".to_string(),
605            })?
606            .as_integer()
607            .ok_or_else(|| TransformError::ParseError {
608                path: "rate".to_string(),
609                kind: TransformParseErrorKind::IntError,
610            })?;
611
612        samples.push(Sample {
613            value: *value,
614            rate: rate as u32,
615        });
616    }
617
618    let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
619        Some(n) => n,
620        None => {
621            return Err(TransformError::PathNotFound {
622                path: "distribution.statistic".to_string(),
623            });
624        }
625    };
626    let statistic_kind = match statistic_str.as_str() {
627        "histogram" => Ok(StatisticKind::Histogram),
628        "summary" => Ok(StatisticKind::Summary),
629        _ => Err(TransformError::MetricValueError {
630            path: "distribution.statistic".to_string(),
631            path_value: statistic_str.to_string(),
632        }),
633    }?;
634
635    Ok(MetricValue::Distribution {
636        samples,
637        statistic: statistic_kind,
638    })
639}
640
641fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
642    let event_buckets = log
643        .get(event_path!("histogram", "buckets"))
644        .ok_or_else(|| TransformError::PathNotFound {
645            path: "histogram.buckets".to_string(),
646        })?
647        .as_array()
648        .ok_or_else(|| TransformError::ParseError {
649            path: "histogram.buckets".to_string(),
650            kind: TransformParseErrorKind::ArrayError,
651        })?;
652
653    let mut buckets: Vec<Bucket> = Vec::new();
654    for e_bucket in event_buckets {
655        let upper_limit = e_bucket
656            .get(path!("upper_limit"))
657            .ok_or_else(|| TransformError::PathNotFound {
658                path: "histogram.buckets.upper_limit".to_string(),
659            })?
660            .as_float()
661            .ok_or_else(|| TransformError::ParseError {
662                path: "histogram.buckets.upper_limit".to_string(),
663                kind: TransformParseErrorKind::FloatError,
664            })?;
665
666        let count = e_bucket
667            .get(path!("count"))
668            .ok_or_else(|| TransformError::PathNotFound {
669                path: "histogram.buckets.count".to_string(),
670            })?
671            .as_integer()
672            .ok_or_else(|| TransformError::ParseError {
673                path: "histogram.buckets.count".to_string(),
674                kind: TransformParseErrorKind::IntError,
675            })?;
676
677        buckets.push(Bucket {
678            upper_limit: *upper_limit,
679            count: count as u64,
680        });
681    }
682
683    let count = log
684        .get(event_path!("histogram", "count"))
685        .ok_or_else(|| TransformError::PathNotFound {
686            path: "histogram.count".to_string(),
687        })?
688        .as_integer()
689        .ok_or_else(|| TransformError::ParseError {
690            path: "histogram.count".to_string(),
691            kind: TransformParseErrorKind::IntError,
692        })?;
693
694    let sum = log
695        .get(event_path!("histogram", "sum"))
696        .ok_or_else(|| TransformError::PathNotFound {
697            path: "histogram.sum".to_string(),
698        })?
699        .as_float()
700        .ok_or_else(|| TransformError::ParseError {
701            path: "histogram.sum".to_string(),
702            kind: TransformParseErrorKind::FloatError,
703        })?;
704
705    Ok(MetricValue::AggregatedHistogram {
706        buckets,
707        count: count as u64,
708        sum: *sum,
709    })
710}
711
712fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
713    let event_quantiles = log
714        .get(event_path!("summary", "quantiles"))
715        .ok_or_else(|| TransformError::PathNotFound {
716            path: "summary.quantiles".to_string(),
717        })?
718        .as_array()
719        .ok_or_else(|| TransformError::ParseError {
720            path: "summary.quantiles".to_string(),
721            kind: TransformParseErrorKind::ArrayError,
722        })?;
723
724    let mut quantiles: Vec<Quantile> = Vec::new();
725    for e_quantile in event_quantiles {
726        let quantile = e_quantile
727            .get(path!("quantile"))
728            .ok_or_else(|| TransformError::PathNotFound {
729                path: "summary.quantiles.quantile".to_string(),
730            })?
731            .as_float()
732            .ok_or_else(|| TransformError::ParseError {
733                path: "summary.quantiles.quantile".to_string(),
734                kind: TransformParseErrorKind::FloatError,
735            })?;
736
737        let value = e_quantile
738            .get(path!("value"))
739            .ok_or_else(|| TransformError::PathNotFound {
740                path: "summary.quantiles.value".to_string(),
741            })?
742            .as_float()
743            .ok_or_else(|| TransformError::ParseError {
744                path: "summary.quantiles.value".to_string(),
745                kind: TransformParseErrorKind::FloatError,
746            })?;
747
748        quantiles.push(Quantile {
749            quantile: *quantile,
750            value: *value,
751        })
752    }
753
754    let count = log
755        .get(event_path!("summary", "count"))
756        .ok_or_else(|| TransformError::PathNotFound {
757            path: "summary.count".to_string(),
758        })?
759        .as_integer()
760        .ok_or_else(|| TransformError::ParseError {
761            path: "summary.count".to_string(),
762            kind: TransformParseErrorKind::IntError,
763        })?;
764
765    let sum = log
766        .get(event_path!("summary", "sum"))
767        .ok_or_else(|| TransformError::PathNotFound {
768            path: "summary.sum".to_string(),
769        })?
770        .as_float()
771        .ok_or_else(|| TransformError::ParseError {
772            path: "summary.sum".to_string(),
773            kind: TransformParseErrorKind::FloatError,
774        })?;
775
776    Ok(MetricValue::AggregatedSummary {
777        quantiles,
778        count: count as u64,
779        sum: *sum,
780    })
781}
782
783fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
784    let log = event.as_log();
785    let timestamp = log
786        .get_timestamp()
787        .and_then(Value::as_timestamp)
788        .cloned()
789        .or_else(|| Some(Utc::now()));
790
791    let name = match try_get_string_from_log(log, "name")? {
792        Some(n) => n,
793        None => {
794            return Err(TransformError::PathNotFound {
795                path: "name".to_string(),
796            });
797        }
798    };
799
800    let mut tags = MetricTags::default();
801
802    if let Some(els) = log.get(event_path!("tags"))
803        && let Some(el) = els.as_object()
804    {
805        for (key, value) in el {
806            tags.insert(key.to_string(), bytes_to_str(value));
807        }
808    }
809    let tags_result = Some(tags);
810
811    let kind_str = match try_get_string_from_log(log, "kind")? {
812        Some(n) => n,
813        None => {
814            return Err(TransformError::PathNotFound {
815                path: "kind".to_string(),
816            });
817        }
818    };
819
820    let kind = match kind_str.as_str() {
821        "absolute" => Ok(MetricKind::Absolute),
822        "incremental" => Ok(MetricKind::Incremental),
823        value => Err(TransformError::MetricValueError {
824            path: "kind".to_string(),
825            path_value: value.to_string(),
826        }),
827    }?;
828
829    let mut value: Option<MetricValue> = None;
830    if let Some(root_event) = log.as_map() {
831        for key in root_event.keys() {
832            value = match key.as_str() {
833                "gauge" => Some(get_gauge_value(log)?),
834                "distribution" => Some(get_distribution_value(log)?),
835                "histogram" => Some(get_histogram_value(log)?),
836                "summary" => Some(get_summary_value(log)?),
837                "counter" => Some(get_counter_value(log)?),
838                "set" => Some(get_set_value(log)?),
839                _ => None,
840            };
841
842            if value.is_some() {
843                break;
844            }
845        }
846    }
847
848    let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
849
850    let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
851        .with_tags(tags_result)
852        .with_timestamp(timestamp);
853
854    if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
855        metric = metric.with_namespace(namespace);
856    }
857
858    Ok(metric)
859}
860
861impl FunctionTransform for LogToMetric {
862    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
863        // Metrics are "all or none" for a specific log. If a single fails, none are produced.
864        let mut buffer = Vec::with_capacity(self.metrics.len());
865        if self.all_metrics {
866            match to_metrics(&event) {
867                Ok(metric) => {
868                    output.push(Event::Metric(metric));
869                }
870                Err(err) => {
871                    match err {
872                        TransformError::MetricValueError { path, path_value } => {
873                            emit!(MetricMetadataInvalidFieldValueError {
874                                field: path.as_ref(),
875                                field_value: path_value.as_ref()
876                            })
877                        }
878                        TransformError::PathNotFound { path } => {
879                            emit!(ParserMissingFieldError::<DROP_EVENT> {
880                                field: path.as_ref()
881                            })
882                        }
883                        TransformError::ParseError { path, kind } => {
884                            emit!(MetricMetadataParseError {
885                                field: path.as_ref(),
886                                kind: &kind.to_string(),
887                            })
888                        }
889                        TransformError::MetricDetailsNotFound => {
890                            emit!(MetricMetadataMetricDetailsNotFoundError {})
891                        }
892                        TransformError::PairExpansionError { key, value, error } => {
893                            emit!(crate::internal_events::PairExpansionError {
894                                key: &key,
895                                value: &value,
896                                drop_event: true,
897                                error
898                            })
899                        }
900                        _ => {}
901                    };
902                }
903            }
904        } else {
905            for config in self.metrics.iter() {
906                match to_metric_with_config(config, &event) {
907                    Ok(metric) => {
908                        buffer.push(Event::Metric(metric));
909                    }
910                    Err(err) => {
911                        match err {
912                            TransformError::PathNull { path } => {
913                                emit!(LogToMetricFieldNullError {
914                                    field: path.as_ref()
915                                })
916                            }
917                            TransformError::PathNotFound { path } => {
918                                emit!(ParserMissingFieldError::<DROP_EVENT> {
919                                    field: path.as_ref()
920                                })
921                            }
922                            TransformError::ParseFloatError { path, error } => {
923                                emit!(LogToMetricParseFloatError {
924                                    field: path.as_ref(),
925                                    error
926                                })
927                            }
928                            TransformError::TemplateRenderingError(error) => {
929                                emit!(crate::internal_events::TemplateRenderingError {
930                                    error,
931                                    drop_event: true,
932                                    field: None,
933                                })
934                            }
935                            TransformError::PairExpansionError { key, value, error } => {
936                                emit!(crate::internal_events::PairExpansionError {
937                                    key: &key,
938                                    value: &value,
939                                    drop_event: true,
940                                    error
941                                })
942                            }
943                            _ => {}
944                        };
945                        // early return to prevent the partial buffer from being sent
946                        return;
947                    }
948                }
949            }
950        }
951
952        // Metric generation was successful, publish them all.
953        for event in buffer {
954            output.push(event);
955        }
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use std::{sync::Arc, time::Duration};
962
963    use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
964    use tokio::sync::mpsc;
965    use tokio_stream::wrappers::ReceiverStream;
966    use vector_lib::{
967        config::ComponentKey,
968        event::{EventMetadata, ObjectMap},
969        metric_tags,
970    };
971
972    use super::*;
973    use crate::{
974        config::log_schema,
975        event::{
976            Event, LogEvent,
977            metric::{Metric, MetricKind, MetricValue, StatisticKind},
978        },
979        test_util::components::assert_transform_compliance,
980        transforms::test::create_topology,
981    };
982
983    #[test]
984    fn generate_config() {
985        crate::test_util::test_generate_config::<LogToMetricConfig>();
986    }
987
988    fn parse_config(s: &str) -> LogToMetricConfig {
989        toml::from_str(s).unwrap()
990    }
991
992    fn parse_yaml_config(s: &str) -> LogToMetricConfig {
993        serde_yaml::from_str(s).unwrap()
994    }
995
996    fn ts() -> DateTime<Utc> {
997        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
998            .single()
999            .and_then(|t| t.with_nanosecond(11))
1000            .expect("invalid timestamp")
1001    }
1002
1003    fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1004        let mut log = Event::Log(LogEvent::from("i am a log"));
1005        log.as_mut_log().insert(key, value);
1006        log.as_mut_log()
1007            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1008        log
1009    }
1010
1011    async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1012        assert_transform_compliance(async move {
1013            let (tx, rx) = mpsc::channel(1);
1014            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1015            tx.send(event).await.unwrap();
1016            let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1017                .await
1018                .unwrap_or(None);
1019            drop(tx);
1020            topology.stop().await;
1021            assert_eq!(out.recv().await, None);
1022            result
1023        })
1024        .await
1025    }
1026
1027    async fn do_transform_multiple_events(
1028        config: LogToMetricConfig,
1029        event: Event,
1030        count: usize,
1031    ) -> Vec<Event> {
1032        assert_transform_compliance(async move {
1033            let (tx, rx) = mpsc::channel(1);
1034            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1035            tx.send(event).await.unwrap();
1036
1037            let mut results = vec![];
1038            for _ in 0..count {
1039                let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1040                    .await
1041                    .unwrap_or(None);
1042                if let Some(event) = result {
1043                    results.push(event);
1044                }
1045            }
1046
1047            drop(tx);
1048            topology.stop().await;
1049            assert_eq!(out.recv().await, None);
1050            results
1051        })
1052        .await
1053    }
1054
1055    #[tokio::test]
1056    async fn count_http_status_codes() {
1057        let config = parse_config(
1058            r#"
1059            [[metrics]]
1060            type = "counter"
1061            field = "status"
1062            "#,
1063        );
1064
1065        let event = create_event("status", "42");
1066        let mut metadata =
1067            event
1068                .metadata()
1069                .clone()
1070                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1071                    None,
1072                    None,
1073                    Some(ORIGIN_SERVICE_VALUE),
1074                ));
1075        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1076        metadata.set_schema_definition(&Arc::new(Definition::any()));
1077        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1078        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1079        let metric = do_transform(config, event).await.unwrap();
1080
1081        assert_eq!(
1082            metric.into_metric(),
1083            Metric::new_with_metadata(
1084                "status",
1085                MetricKind::Incremental,
1086                MetricValue::Counter { value: 1.0 },
1087                metadata,
1088            )
1089            .with_timestamp(Some(ts()))
1090        );
1091    }
1092
1093    #[tokio::test]
1094    async fn count_http_requests_with_tags() {
1095        let config = parse_config(
1096            r#"
1097            [[metrics]]
1098            type = "counter"
1099            field = "message"
1100            name = "http_requests_total"
1101            namespace = "app"
1102            tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1103            "#,
1104        );
1105
1106        let mut event = create_event("message", "i am log");
1107        event.as_mut_log().insert("method", "post");
1108        event.as_mut_log().insert("code", "200");
1109        let mut metadata =
1110            event
1111                .metadata()
1112                .clone()
1113                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1114                    None,
1115                    None,
1116                    Some(ORIGIN_SERVICE_VALUE),
1117                ));
1118        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1119        metadata.set_schema_definition(&Arc::new(Definition::any()));
1120        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1121        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1122
1123        let metric = do_transform(config, event).await.unwrap();
1124
1125        assert_eq!(
1126            metric.into_metric(),
1127            Metric::new_with_metadata(
1128                "http_requests_total",
1129                MetricKind::Incremental,
1130                MetricValue::Counter { value: 1.0 },
1131                metadata,
1132            )
1133            .with_namespace(Some("app"))
1134            .with_tags(Some(metric_tags!(
1135                "method" => "post",
1136                "code" => "200",
1137                "host" => "localhost",
1138            )))
1139            .with_timestamp(Some(ts()))
1140        );
1141    }
1142
1143    #[tokio::test]
1144    async fn count_http_requests_with_tags_expansion() {
1145        let config = parse_config(
1146            r#"
1147            [[metrics]]
1148            type = "counter"
1149            field = "message"
1150            name = "http_requests_total"
1151            namespace = "app"
1152            tags = {"*" = "{{ dict }}"}
1153            "#,
1154        );
1155
1156        let mut event = create_event("message", "i am log");
1157        let log = event.as_mut_log();
1158
1159        let mut test_dict = ObjectMap::default();
1160        test_dict.insert("one".into(), Value::from("foo"));
1161        test_dict.insert("two".into(), Value::from("baz"));
1162        log.insert("dict", Value::from(test_dict));
1163
1164        let mut metadata =
1165            event
1166                .metadata()
1167                .clone()
1168                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1169                    None,
1170                    None,
1171                    Some(ORIGIN_SERVICE_VALUE),
1172                ));
1173        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1174        metadata.set_schema_definition(&Arc::new(Definition::any()));
1175        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1176        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1177
1178        let metric = do_transform(config, event).await.unwrap();
1179
1180        assert_eq!(
1181            metric.into_metric(),
1182            Metric::new_with_metadata(
1183                "http_requests_total",
1184                MetricKind::Incremental,
1185                MetricValue::Counter { value: 1.0 },
1186                metadata,
1187            )
1188            .with_namespace(Some("app"))
1189            .with_tags(Some(metric_tags!(
1190                "one" => "foo",
1191                "two" => "baz",
1192            )))
1193            .with_timestamp(Some(ts()))
1194        );
1195    }
1196    #[tokio::test]
1197    async fn count_http_requests_with_colliding_dynamic_tags() {
1198        let config = parse_config(
1199            r#"
1200            [[metrics]]
1201            type = "counter"
1202            field = "message"
1203            name = "http_requests_total"
1204            namespace = "app"
1205            tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1206            "#,
1207        );
1208
1209        let mut event = create_event("message", "i am log");
1210        let log = event.as_mut_log();
1211
1212        let mut map1 = ObjectMap::default();
1213        map1.insert("key1".into(), Value::from("val1"));
1214        log.insert("map1", Value::from(map1));
1215
1216        let mut map2 = ObjectMap::default();
1217        map2.insert("l1_key1".into(), Value::from("val2"));
1218        log.insert("map2", Value::from(map2));
1219
1220        let mut metadata =
1221            event
1222                .metadata()
1223                .clone()
1224                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1225                    None,
1226                    None,
1227                    Some(ORIGIN_SERVICE_VALUE),
1228                ));
1229        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1230        metadata.set_schema_definition(&Arc::new(Definition::any()));
1231        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1232        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1233
1234        let metric = do_transform(config, event).await.unwrap().into_metric();
1235        let tags = metric.tags().expect("Metric should have tags");
1236
1237        assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1238
1239        assert_eq!(tags.iter_all().count(), 2);
1240        for (name, value) in tags.iter_all() {
1241            assert_eq!(name, "l1_key1");
1242            assert!(value == Some("val1") || value == Some("val2"));
1243        }
1244    }
1245    #[tokio::test]
1246    async fn multi_value_tags_yaml() {
1247        // Have to use YAML to represent bare tags
1248        let config = parse_yaml_config(
1249            r#"
1250            metrics:
1251            - field: "message"
1252              type: "counter"
1253              tags:
1254                tag:
1255                - "one"
1256                - null
1257                - "two"
1258            "#,
1259        );
1260
1261        let event = create_event("message", "I am log");
1262        let metric = do_transform(config, event).await.unwrap().into_metric();
1263        let tags = metric.tags().expect("Metric should have tags");
1264
1265        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1266
1267        assert_eq!(tags.iter_all().count(), 3);
1268        for (name, value) in tags.iter_all() {
1269            assert_eq!(name, "tag");
1270            assert!(value.is_none() || value == Some("one") || value == Some("two"));
1271        }
1272    }
1273    #[tokio::test]
1274    async fn multi_value_tags_expansion_yaml() {
1275        // Have to use YAML to represent bare tags
1276        let config = parse_yaml_config(
1277            r#"
1278            metrics:
1279            - field: "message"
1280              type: "counter"
1281              tags:
1282                "*": "{{dict}}"
1283            "#,
1284        );
1285
1286        let mut event = create_event("message", "I am log");
1287        let log = event.as_mut_log();
1288
1289        let mut test_dict = ObjectMap::default();
1290        test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1291        log.insert("dict", Value::from(test_dict));
1292
1293        let metric = do_transform(config, event).await.unwrap().into_metric();
1294        let tags = metric.tags().expect("Metric should have tags");
1295
1296        assert_eq!(
1297            tags.iter_single().collect::<Vec<_>>(),
1298            vec![("one", "[\"foo\",\"baz\"]")]
1299        );
1300
1301        assert_eq!(tags.iter_all().count(), 1);
1302        for (name, value) in tags.iter_all() {
1303            assert_eq!(name, "one");
1304            assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1305        }
1306    }
1307
1308    #[tokio::test]
1309    async fn multi_value_tags_toml() {
1310        let config = parse_config(
1311            r#"
1312            [[metrics]]
1313            field = "message"
1314            type = "counter"
1315            [metrics.tags]
1316            tag = ["one", "two"]
1317            "#,
1318        );
1319
1320        let event = create_event("message", "I am log");
1321        let metric = do_transform(config, event).await.unwrap().into_metric();
1322        let tags = metric.tags().expect("Metric should have tags");
1323
1324        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1325
1326        assert_eq!(tags.iter_all().count(), 2);
1327        for (name, value) in tags.iter_all() {
1328            assert_eq!(name, "tag");
1329            assert!(value == Some("one") || value == Some("two"));
1330        }
1331    }
1332
1333    #[tokio::test]
1334    async fn count_exceptions() {
1335        let config = parse_config(
1336            r#"
1337            [[metrics]]
1338            type = "counter"
1339            field = "backtrace"
1340            name = "exception_total"
1341            "#,
1342        );
1343
1344        let event = create_event("backtrace", "message");
1345        let mut metadata =
1346            event
1347                .metadata()
1348                .clone()
1349                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1350                    None,
1351                    None,
1352                    Some(ORIGIN_SERVICE_VALUE),
1353                ));
1354        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1355        metadata.set_schema_definition(&Arc::new(Definition::any()));
1356        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1357        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1358
1359        let metric = do_transform(config, event).await.unwrap();
1360
1361        assert_eq!(
1362            metric.into_metric(),
1363            Metric::new_with_metadata(
1364                "exception_total",
1365                MetricKind::Incremental,
1366                MetricValue::Counter { value: 1.0 },
1367                metadata
1368            )
1369            .with_timestamp(Some(ts()))
1370        );
1371    }
1372
1373    #[tokio::test]
1374    async fn count_exceptions_no_match() {
1375        let config = parse_config(
1376            r#"
1377            [[metrics]]
1378            type = "counter"
1379            field = "backtrace"
1380            name = "exception_total"
1381            "#,
1382        );
1383
1384        let event = create_event("success", "42");
1385        assert_eq!(do_transform(config, event).await, None);
1386    }
1387
1388    #[tokio::test]
1389    async fn sum_order_amounts() {
1390        let config = parse_config(
1391            r#"
1392            [[metrics]]
1393            type = "counter"
1394            field = "amount"
1395            name = "amount_total"
1396            increment_by_value = true
1397            "#,
1398        );
1399
1400        let event = create_event("amount", "33.99");
1401        let mut metadata =
1402            event
1403                .metadata()
1404                .clone()
1405                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1406                    None,
1407                    None,
1408                    Some(ORIGIN_SERVICE_VALUE),
1409                ));
1410        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1411        metadata.set_schema_definition(&Arc::new(Definition::any()));
1412        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1413        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1414        let metric = do_transform(config, event).await.unwrap();
1415
1416        assert_eq!(
1417            metric.into_metric(),
1418            Metric::new_with_metadata(
1419                "amount_total",
1420                MetricKind::Incremental,
1421                MetricValue::Counter { value: 33.99 },
1422                metadata,
1423            )
1424            .with_timestamp(Some(ts()))
1425        );
1426    }
1427
1428    #[tokio::test]
1429    async fn count_absolute() {
1430        let config = parse_config(
1431            r#"
1432            [[metrics]]
1433            type = "counter"
1434            field = "amount"
1435            name = "amount_total"
1436            increment_by_value = true
1437            kind = "absolute"
1438            "#,
1439        );
1440
1441        let event = create_event("amount", "33.99");
1442        let mut metadata =
1443            event
1444                .metadata()
1445                .clone()
1446                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1447                    None,
1448                    None,
1449                    Some(ORIGIN_SERVICE_VALUE),
1450                ));
1451        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1452        metadata.set_schema_definition(&Arc::new(Definition::any()));
1453        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1454        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1455
1456        let metric = do_transform(config, event).await.unwrap();
1457
1458        assert_eq!(
1459            metric.into_metric(),
1460            Metric::new_with_metadata(
1461                "amount_total",
1462                MetricKind::Absolute,
1463                MetricValue::Counter { value: 33.99 },
1464                metadata,
1465            )
1466            .with_timestamp(Some(ts()))
1467        );
1468    }
1469
1470    #[tokio::test]
1471    async fn memory_usage_gauge() {
1472        let config = parse_config(
1473            r#"
1474            [[metrics]]
1475            type = "gauge"
1476            field = "memory_rss"
1477            name = "memory_rss_bytes"
1478            "#,
1479        );
1480
1481        let event = create_event("memory_rss", "123");
1482        let mut metadata =
1483            event
1484                .metadata()
1485                .clone()
1486                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1487                    None,
1488                    None,
1489                    Some(ORIGIN_SERVICE_VALUE),
1490                ));
1491
1492        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1493        metadata.set_schema_definition(&Arc::new(Definition::any()));
1494
1495        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1496        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1497
1498        let metric = do_transform(config, event).await.unwrap();
1499
1500        assert_eq!(
1501            metric.into_metric(),
1502            Metric::new_with_metadata(
1503                "memory_rss_bytes",
1504                MetricKind::Absolute,
1505                MetricValue::Gauge { value: 123.0 },
1506                metadata,
1507            )
1508            .with_timestamp(Some(ts()))
1509        );
1510    }
1511
1512    #[tokio::test]
1513    async fn parse_failure() {
1514        let config = parse_config(
1515            r#"
1516            [[metrics]]
1517            type = "counter"
1518            field = "status"
1519            name = "status_total"
1520            increment_by_value = true
1521            "#,
1522        );
1523
1524        let event = create_event("status", "not a number");
1525        assert_eq!(do_transform(config, event).await, None);
1526    }
1527
1528    #[tokio::test]
1529    async fn missing_field() {
1530        let config = parse_config(
1531            r#"
1532            [[metrics]]
1533            type = "counter"
1534            field = "status"
1535            name = "status_total"
1536            "#,
1537        );
1538
1539        let event = create_event("not foo", "not a number");
1540        assert_eq!(do_transform(config, event).await, None);
1541    }
1542
1543    #[tokio::test]
1544    async fn null_field() {
1545        let config = parse_config(
1546            r#"
1547            [[metrics]]
1548            type = "counter"
1549            field = "status"
1550            name = "status_total"
1551            "#,
1552        );
1553
1554        let event = create_event("status", Value::Null);
1555        assert_eq!(do_transform(config, event).await, None);
1556    }
1557
1558    #[tokio::test]
1559    async fn multiple_metrics() {
1560        let config = parse_config(
1561            r#"
1562            [[metrics]]
1563            type = "counter"
1564            field = "status"
1565
1566            [[metrics]]
1567            type = "counter"
1568            field = "backtrace"
1569            name = "exception_total"
1570            "#,
1571        );
1572
1573        let mut event = Event::Log(LogEvent::from("i am a log"));
1574        event
1575            .as_mut_log()
1576            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1577        event.as_mut_log().insert("status", "42");
1578        event.as_mut_log().insert("backtrace", "message");
1579        let mut metadata =
1580            event
1581                .metadata()
1582                .clone()
1583                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1584                    None,
1585                    None,
1586                    Some(ORIGIN_SERVICE_VALUE),
1587                ));
1588
1589        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1590        metadata.set_schema_definition(&Arc::new(Definition::any()));
1591        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1592        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1593
1594        let output = do_transform_multiple_events(config, event, 2).await;
1595
1596        assert_eq!(2, output.len());
1597        assert_eq!(
1598            output[0].clone().into_metric(),
1599            Metric::new_with_metadata(
1600                "status",
1601                MetricKind::Incremental,
1602                MetricValue::Counter { value: 1.0 },
1603                metadata.clone(),
1604            )
1605            .with_timestamp(Some(ts()))
1606        );
1607        assert_eq!(
1608            output[1].clone().into_metric(),
1609            Metric::new_with_metadata(
1610                "exception_total",
1611                MetricKind::Incremental,
1612                MetricValue::Counter { value: 1.0 },
1613                metadata,
1614            )
1615            .with_timestamp(Some(ts()))
1616        );
1617    }
1618
1619    #[tokio::test]
1620    async fn multiple_metrics_with_multiple_templates() {
1621        let config = parse_config(
1622            r#"
1623            [[metrics]]
1624            type = "set"
1625            field = "status"
1626            name = "{{host}}_{{worker}}_status_set"
1627
1628            [[metrics]]
1629            type = "counter"
1630            field = "backtrace"
1631            name = "{{service}}_exception_total"
1632            namespace = "{{host}}"
1633            "#,
1634        );
1635
1636        let mut event = Event::Log(LogEvent::from("i am a log"));
1637        event
1638            .as_mut_log()
1639            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1640        event.as_mut_log().insert("status", "42");
1641        event.as_mut_log().insert("backtrace", "message");
1642        event.as_mut_log().insert("host", "local");
1643        event.as_mut_log().insert("worker", "abc");
1644        event.as_mut_log().insert("service", "xyz");
1645        let mut metadata =
1646            event
1647                .metadata()
1648                .clone()
1649                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1650                    None,
1651                    None,
1652                    Some(ORIGIN_SERVICE_VALUE),
1653                ));
1654
1655        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1656        metadata.set_schema_definition(&Arc::new(Definition::any()));
1657        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1658        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1659
1660        let output = do_transform_multiple_events(config, event, 2).await;
1661
1662        assert_eq!(2, output.len());
1663        assert_eq!(
1664            output[0].as_metric(),
1665            &Metric::new_with_metadata(
1666                "local_abc_status_set",
1667                MetricKind::Incremental,
1668                MetricValue::Set {
1669                    values: vec!["42".into()].into_iter().collect()
1670                },
1671                metadata.clone(),
1672            )
1673            .with_timestamp(Some(ts()))
1674        );
1675        assert_eq!(
1676            output[1].as_metric(),
1677            &Metric::new_with_metadata(
1678                "xyz_exception_total",
1679                MetricKind::Incremental,
1680                MetricValue::Counter { value: 1.0 },
1681                metadata,
1682            )
1683            .with_namespace(Some("local"))
1684            .with_timestamp(Some(ts()))
1685        );
1686    }
1687
1688    #[tokio::test]
1689    async fn user_ip_set() {
1690        let config = parse_config(
1691            r#"
1692            [[metrics]]
1693            type = "set"
1694            field = "user_ip"
1695            name = "unique_user_ip"
1696            "#,
1697        );
1698
1699        let event = create_event("user_ip", "1.2.3.4");
1700        let mut metadata =
1701            event
1702                .metadata()
1703                .clone()
1704                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1705                    None,
1706                    None,
1707                    Some(ORIGIN_SERVICE_VALUE),
1708                ));
1709        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1710        metadata.set_schema_definition(&Arc::new(Definition::any()));
1711        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1712        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1713
1714        let metric = do_transform(config, event).await.unwrap();
1715
1716        assert_eq!(
1717            metric.into_metric(),
1718            Metric::new_with_metadata(
1719                "unique_user_ip",
1720                MetricKind::Incremental,
1721                MetricValue::Set {
1722                    values: vec!["1.2.3.4".into()].into_iter().collect()
1723                },
1724                metadata,
1725            )
1726            .with_timestamp(Some(ts()))
1727        );
1728    }
1729
1730    #[tokio::test]
1731    async fn response_time_histogram() {
1732        let config = parse_config(
1733            r#"
1734            [[metrics]]
1735            type = "histogram"
1736            field = "response_time"
1737            "#,
1738        );
1739
1740        let event = create_event("response_time", "2.5");
1741        let mut metadata =
1742            event
1743                .metadata()
1744                .clone()
1745                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1746                    None,
1747                    None,
1748                    Some(ORIGIN_SERVICE_VALUE),
1749                ));
1750
1751        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1752        metadata.set_schema_definition(&Arc::new(Definition::any()));
1753        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1754        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1755
1756        let metric = do_transform(config, event).await.unwrap();
1757
1758        assert_eq!(
1759            metric.into_metric(),
1760            Metric::new_with_metadata(
1761                "response_time",
1762                MetricKind::Incremental,
1763                MetricValue::Distribution {
1764                    samples: vector_lib::samples![2.5 => 1],
1765                    statistic: StatisticKind::Histogram
1766                },
1767                metadata
1768            )
1769            .with_timestamp(Some(ts()))
1770        );
1771    }
1772
1773    #[tokio::test]
1774    async fn response_time_summary() {
1775        let config = parse_config(
1776            r#"
1777            [[metrics]]
1778            type = "summary"
1779            field = "response_time"
1780            "#,
1781        );
1782
1783        let event = create_event("response_time", "2.5");
1784        let mut metadata =
1785            event
1786                .metadata()
1787                .clone()
1788                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1789                    None,
1790                    None,
1791                    Some(ORIGIN_SERVICE_VALUE),
1792                ));
1793
1794        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1795        metadata.set_schema_definition(&Arc::new(Definition::any()));
1796        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1797        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1798
1799        let metric = do_transform(config, event).await.unwrap();
1800
1801        assert_eq!(
1802            metric.into_metric(),
1803            Metric::new_with_metadata(
1804                "response_time",
1805                MetricKind::Incremental,
1806                MetricValue::Distribution {
1807                    samples: vector_lib::samples![2.5 => 1],
1808                    statistic: StatisticKind::Summary
1809                },
1810                metadata
1811            )
1812            .with_timestamp(Some(ts()))
1813        );
1814    }
1815
1816    //  Metric Metadata Tests
1817    //
1818    fn create_log_event(json_str: &str) -> Event {
1819        create_log_event_with_namespace(json_str, Some("test_namespace"))
1820    }
1821
1822    fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1823        let mut log_value: Value =
1824            serde_json::from_str(json_str).expect("JSON was not well-formatted");
1825        log_value.insert("timestamp", ts());
1826
1827        if let Some(namespace) = namespace {
1828            log_value.insert("namespace", namespace);
1829        }
1830
1831        let mut metadata = EventMetadata::default();
1832        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1833        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1834
1835        Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1836    }
1837
1838    #[tokio::test]
1839    async fn transform_gauge() {
1840        let config = LogToMetricConfig {
1841            metrics: None,
1842            all_metrics: Some(true),
1843        };
1844
1845        let json_str = r#"{
1846          "gauge": {
1847            "value": 990.0
1848          },
1849          "kind": "absolute",
1850          "name": "test.transform.gauge",
1851          "tags": {
1852            "env": "test_env",
1853            "host": "localhost"
1854          }
1855        }"#;
1856        let log = create_log_event(json_str);
1857        let metric = do_transform(config, log.clone()).await.unwrap();
1858        assert_eq!(
1859            *metric.as_metric(),
1860            Metric::new_with_metadata(
1861                "test.transform.gauge",
1862                MetricKind::Absolute,
1863                MetricValue::Gauge { value: 990.0 },
1864                metric.metadata().clone(),
1865            )
1866            .with_namespace(Some("test_namespace"))
1867            .with_tags(Some(metric_tags!(
1868                "env" => "test_env",
1869                "host" => "localhost",
1870            )))
1871            .with_timestamp(Some(ts()))
1872        );
1873    }
1874
1875    #[tokio::test]
1876    async fn transform_histogram() {
1877        let config = LogToMetricConfig {
1878            metrics: None,
1879            all_metrics: Some(true),
1880        };
1881
1882        let json_str = r#"{
1883          "histogram": {
1884            "sum": 18.0,
1885            "count": 5,
1886            "buckets": [
1887              {
1888                "upper_limit": 1.0,
1889                "count": 1
1890              },
1891              {
1892                "upper_limit": 2.0,
1893                "count": 2
1894              },
1895              {
1896                "upper_limit": 5.0,
1897                "count": 1
1898              },
1899              {
1900                "upper_limit": 10.0,
1901                "count": 1
1902              }
1903            ]
1904          },
1905          "kind": "absolute",
1906          "name": "test.transform.histogram",
1907          "tags": {
1908            "env": "test_env",
1909            "host": "localhost"
1910          }
1911        }"#;
1912        let log = create_log_event(json_str);
1913        let metric = do_transform(config, log.clone()).await.unwrap();
1914        assert_eq!(
1915            *metric.as_metric(),
1916            Metric::new_with_metadata(
1917                "test.transform.histogram",
1918                MetricKind::Absolute,
1919                MetricValue::AggregatedHistogram {
1920                    count: 5,
1921                    sum: 18.0,
1922                    buckets: vec![
1923                        Bucket {
1924                            upper_limit: 1.0,
1925                            count: 1,
1926                        },
1927                        Bucket {
1928                            upper_limit: 2.0,
1929                            count: 2,
1930                        },
1931                        Bucket {
1932                            upper_limit: 5.0,
1933                            count: 1,
1934                        },
1935                        Bucket {
1936                            upper_limit: 10.0,
1937                            count: 1,
1938                        },
1939                    ],
1940                },
1941                metric.metadata().clone(),
1942            )
1943            .with_namespace(Some("test_namespace"))
1944            .with_tags(Some(metric_tags!(
1945                "env" => "test_env",
1946                "host" => "localhost",
1947            )))
1948            .with_timestamp(Some(ts()))
1949        );
1950    }
1951
1952    #[tokio::test]
1953    async fn transform_distribution_histogram() {
1954        let config = LogToMetricConfig {
1955            metrics: None,
1956            all_metrics: Some(true),
1957        };
1958
1959        let json_str = r#"{
1960          "distribution": {
1961            "samples": [
1962              {
1963                "value": 1.0,
1964                "rate": 1
1965              },
1966              {
1967                "value": 2.0,
1968                "rate": 2
1969              }
1970            ],
1971            "statistic": "histogram"
1972          },
1973          "kind": "absolute",
1974          "name": "test.transform.distribution_histogram",
1975          "tags": {
1976            "env": "test_env",
1977            "host": "localhost"
1978          }
1979        }"#;
1980        let log = create_log_event(json_str);
1981        let metric = do_transform(config, log.clone()).await.unwrap();
1982        assert_eq!(
1983            *metric.as_metric(),
1984            Metric::new_with_metadata(
1985                "test.transform.distribution_histogram",
1986                MetricKind::Absolute,
1987                MetricValue::Distribution {
1988                    samples: vec![
1989                        Sample {
1990                            value: 1.0,
1991                            rate: 1
1992                        },
1993                        Sample {
1994                            value: 2.0,
1995                            rate: 2
1996                        },
1997                    ],
1998                    statistic: StatisticKind::Histogram,
1999                },
2000                metric.metadata().clone(),
2001            )
2002            .with_namespace(Some("test_namespace"))
2003            .with_tags(Some(metric_tags!(
2004                "env" => "test_env",
2005                "host" => "localhost",
2006            )))
2007            .with_timestamp(Some(ts()))
2008        );
2009    }
2010
2011    #[tokio::test]
2012    async fn transform_distribution_summary() {
2013        let config = LogToMetricConfig {
2014            metrics: None,
2015            all_metrics: Some(true),
2016        };
2017
2018        let json_str = r#"{
2019          "distribution": {
2020            "samples": [
2021              {
2022                "value": 1.0,
2023                "rate": 1
2024              },
2025              {
2026                "value": 2.0,
2027                "rate": 2
2028              }
2029            ],
2030            "statistic": "summary"
2031          },
2032          "kind": "absolute",
2033          "name": "test.transform.distribution_summary",
2034          "tags": {
2035            "env": "test_env",
2036            "host": "localhost"
2037          }
2038        }"#;
2039        let log = create_log_event(json_str);
2040        let metric = do_transform(config, log.clone()).await.unwrap();
2041        assert_eq!(
2042            *metric.as_metric(),
2043            Metric::new_with_metadata(
2044                "test.transform.distribution_summary",
2045                MetricKind::Absolute,
2046                MetricValue::Distribution {
2047                    samples: vec![
2048                        Sample {
2049                            value: 1.0,
2050                            rate: 1
2051                        },
2052                        Sample {
2053                            value: 2.0,
2054                            rate: 2
2055                        },
2056                    ],
2057                    statistic: StatisticKind::Summary,
2058                },
2059                metric.metadata().clone(),
2060            )
2061            .with_namespace(Some("test_namespace"))
2062            .with_tags(Some(metric_tags!(
2063                "env" => "test_env",
2064                "host" => "localhost",
2065            )))
2066            .with_timestamp(Some(ts()))
2067        );
2068    }
2069
2070    #[tokio::test]
2071    async fn transform_summary() {
2072        let config = LogToMetricConfig {
2073            metrics: None,
2074            all_metrics: Some(true),
2075        };
2076
2077        let json_str = r#"{
2078          "summary": {
2079            "sum": 100.0,
2080            "count": 7,
2081            "quantiles": [
2082              {
2083                "quantile": 0.05,
2084                "value": 10.0
2085              },
2086              {
2087                "quantile": 0.95,
2088                "value": 25.0
2089              }
2090            ]
2091          },
2092          "kind": "absolute",
2093          "name": "test.transform.histogram",
2094          "tags": {
2095            "env": "test_env",
2096            "host": "localhost"
2097          }
2098        }"#;
2099        let log = create_log_event(json_str);
2100        let metric = do_transform(config, log.clone()).await.unwrap();
2101        assert_eq!(
2102            *metric.as_metric(),
2103            Metric::new_with_metadata(
2104                "test.transform.histogram",
2105                MetricKind::Absolute,
2106                MetricValue::AggregatedSummary {
2107                    quantiles: vec![
2108                        Quantile {
2109                            quantile: 0.05,
2110                            value: 10.0,
2111                        },
2112                        Quantile {
2113                            quantile: 0.95,
2114                            value: 25.0,
2115                        },
2116                    ],
2117                    count: 7,
2118                    sum: 100.0,
2119                },
2120                metric.metadata().clone(),
2121            )
2122            .with_namespace(Some("test_namespace"))
2123            .with_tags(Some(metric_tags!(
2124                "env" => "test_env",
2125                "host" => "localhost",
2126            )))
2127            .with_timestamp(Some(ts()))
2128        );
2129    }
2130
2131    #[tokio::test]
2132    async fn transform_counter() {
2133        let config = LogToMetricConfig {
2134            metrics: None,
2135            all_metrics: Some(true),
2136        };
2137
2138        let json_str = r#"{
2139          "counter": {
2140            "value": 10.0
2141          },
2142          "kind": "incremental",
2143          "name": "test.transform.counter",
2144          "tags": {
2145            "env": "test_env",
2146            "host": "localhost"
2147          }
2148        }"#;
2149        let log = create_log_event(json_str);
2150        let metric = do_transform(config, log.clone()).await.unwrap();
2151        assert_eq!(
2152            *metric.as_metric(),
2153            Metric::new_with_metadata(
2154                "test.transform.counter",
2155                MetricKind::Incremental,
2156                MetricValue::Counter { value: 10.0 },
2157                metric.metadata().clone(),
2158            )
2159            .with_namespace(Some("test_namespace"))
2160            .with_tags(Some(metric_tags!(
2161                "env" => "test_env",
2162                "host" => "localhost",
2163            )))
2164            .with_timestamp(Some(ts()))
2165        );
2166    }
2167
2168    #[tokio::test]
2169    async fn transform_set() {
2170        let config = LogToMetricConfig {
2171            metrics: None,
2172            all_metrics: Some(true),
2173        };
2174
2175        let json_str = r#"{
2176          "set": {
2177            "values": ["990.0", "1234"]
2178          },
2179          "kind": "incremental",
2180          "name": "test.transform.set",
2181          "tags": {
2182            "env": "test_env",
2183            "host": "localhost"
2184          }
2185        }"#;
2186        let log = create_log_event(json_str);
2187        let metric = do_transform(config, log.clone()).await.unwrap();
2188        assert_eq!(
2189            *metric.as_metric(),
2190            Metric::new_with_metadata(
2191                "test.transform.set",
2192                MetricKind::Incremental,
2193                MetricValue::Set {
2194                    values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2195                },
2196                metric.metadata().clone(),
2197            )
2198            .with_namespace(Some("test_namespace"))
2199            .with_tags(Some(metric_tags!(
2200                "env" => "test_env",
2201                "host" => "localhost",
2202            )))
2203            .with_timestamp(Some(ts()))
2204        );
2205    }
2206
2207    #[tokio::test]
2208    async fn transform_all_metrics_optional_namespace() {
2209        let config = LogToMetricConfig {
2210            metrics: None,
2211            all_metrics: Some(true),
2212        };
2213
2214        let json_str = r#"{
2215          "counter": {
2216            "value": 10.0
2217          },
2218          "kind": "incremental",
2219          "name": "test.transform.counter",
2220          "tags": {
2221            "env": "test_env",
2222            "host": "localhost"
2223          }
2224        }"#;
2225        let log = create_log_event_with_namespace(json_str, None);
2226        let metric = do_transform(config, log.clone()).await.unwrap();
2227        assert_eq!(
2228            *metric.as_metric(),
2229            Metric::new_with_metadata(
2230                "test.transform.counter",
2231                MetricKind::Incremental,
2232                MetricValue::Counter { value: 10.0 },
2233                metric.metadata().clone(),
2234            )
2235            .with_tags(Some(metric_tags!(
2236                "env" => "test_env",
2237                "host" => "localhost",
2238            )))
2239            .with_timestamp(Some(ts()))
2240        );
2241    }
2242}