vector/transforms/
log_to_metric.rs

1use std::sync::Arc;
2use std::{collections::HashMap, num::ParseFloatError};
3
4use chrono::Utc;
5use indexmap::IndexMap;
6use vector_lib::configurable::configurable_component;
7use vector_lib::event::LogEvent;
8use vector_lib::{
9    config::LogNamespace,
10    event::DatadogMetricOriginMetadata,
11    event::{
12        metric::Sample,
13        metric::{Bucket, Quantile},
14    },
15};
16use vrl::path::{parse_target_path, PathParseError};
17use vrl::{event_path, path};
18
19use crate::config::schema::Definition;
20use crate::transforms::log_to_metric::TransformError::PathNotFound;
21use crate::{
22    common::expansion::pair_expansion,
23    config::{
24        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
25        TransformOutput,
26    },
27    event::{
28        metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
29        Event, Value,
30    },
31    internal_events::{
32        LogToMetricFieldNullError, LogToMetricParseFloatError,
33        MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
34        MetricMetadataParseError, ParserMissingFieldError, DROP_EVENT,
35    },
36    schema,
37    template::{Template, TemplateRenderingError},
38    transforms::{FunctionTransform, OutputBuffer, Transform},
39};
40
41const ORIGIN_SERVICE_VALUE: u32 = 3;
42
43/// Configuration for the `log_to_metric` transform.
44#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct LogToMetricConfig {
48    /// A list of metrics to generate.
49    pub metrics: Option<Vec<MetricConfig>>,
50
51    /// Setting this flag changes the behavior of this transformation.
52    /// Notably the `metrics` field will be ignored.
53    /// All incoming events will be processed and if possible they will be converted to log events.
54    /// Otherwise, only items specified in the `metrics` field will be processed.
55    ///
56    /// Example:
57    /// <pre class="chroma"><code class="language-toml" data-lang="toml">{
58    ///     "counter": {
59    ///         "value": 10.0
60    ///     },
61    ///     "kind": "incremental",
62    ///     "name": "test.transform.counter",
63    ///     "tags": {
64    ///         "env": "test_env",
65    ///         "host": "localhost"
66    ///     }
67    /// }
68    /// </code></pre>
69    ///
70    /// This is a JSON representation of a counter with the following properties:
71    ///
72    /// - `counter`: An object with a single property `value` representing the counter value, in this case, `10.0`).
73    /// - `kind`: A string indicating the kind of counter, in this case, "incremental".
74    /// - `name`: A string representing the name of the counter, here set to "test.transform.counter".
75    /// - `tags`: An object containing additional tags such as "env" and "host".
76    ///
77    /// Objects that can be processed include counter, histogram, gauge, set and summary.
78    pub all_metrics: Option<bool>,
79}
80
81/// Specification of a counter derived from a log event.
82#[configurable_component]
83#[derive(Clone, Debug)]
84pub struct CounterConfig {
85    /// Increments the counter by the value in `field`, instead of only by `1`.
86    #[serde(default = "default_increment_by_value")]
87    pub increment_by_value: bool,
88
89    #[configurable(derived)]
90    #[serde(default = "default_kind")]
91    pub kind: MetricKind,
92}
93
94/// Specification of a metric derived from a log event.
95// TODO: While we're resolving the schema for this enum somewhat reasonably (in
96// `generate-components-docs.rb`), we have a problem where an overlapping field (overlap between two
97// or more of the subschemas) takes the details of the last subschema to be iterated over that
98// contains that field, such that, for example, the `Summary` variant below is overriding the
99// description for almost all of the fields because they're shared across all of the variants.
100#[configurable_component]
101#[derive(Clone, Debug)]
102pub struct MetricConfig {
103    /// Name of the field in the event to generate the metric.
104    pub field: Template,
105
106    /// Overrides the name of the counter.
107    ///
108    /// If not specified, `field` is used as the name of the metric.
109    pub name: Option<Template>,
110
111    /// Sets the namespace for the metric.
112    pub namespace: Option<Template>,
113
114    /// Tags to apply to the metric.
115    ///
116    /// Both keys and values can be templated, allowing you to attach dynamic tags to events.
117    ///
118    #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
119    pub tags: Option<IndexMap<Template, TagConfig>>,
120
121    #[configurable(derived)]
122    #[serde(flatten)]
123    pub metric: MetricTypeConfig,
124}
125
126/// Specification of the value of a created tag.
127///
128/// This may be a single value, a `null` for a bare tag, or an array of either.
129#[configurable_component]
130#[derive(Clone, Debug)]
131#[serde(untagged)]
132pub enum TagConfig {
133    /// A single tag value.
134    Plain(Option<Template>),
135
136    /// An array of values to give to the same tag name.
137    Multi(Vec<Option<Template>>),
138}
139
140/// Specification of the type of an individual metric, and any associated data.
141#[configurable_component]
142#[derive(Clone, Debug)]
143#[serde(tag = "type", rename_all = "snake_case")]
144#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
145pub enum MetricTypeConfig {
146    /// A counter.
147    Counter(CounterConfig),
148
149    /// A histogram.
150    Histogram,
151
152    /// A gauge.
153    Gauge,
154
155    /// A set.
156    Set,
157
158    /// A summary.
159    Summary,
160}
161
162impl MetricConfig {
163    fn field(&self) -> &str {
164        self.field.get_ref()
165    }
166}
167
168const fn default_increment_by_value() -> bool {
169    false
170}
171
172const fn default_kind() -> MetricKind {
173    MetricKind::Incremental
174}
175
176#[derive(Debug, Clone)]
177pub struct LogToMetric {
178    pub metrics: Vec<MetricConfig>,
179    pub all_metrics: bool,
180}
181
182impl GenerateConfig for LogToMetricConfig {
183    fn generate_config() -> toml::Value {
184        toml::Value::try_from(Self {
185            metrics: Some(vec![MetricConfig {
186                field: "field_name".try_into().expect("Fixed template"),
187                name: None,
188                namespace: None,
189                tags: None,
190                metric: MetricTypeConfig::Counter(CounterConfig {
191                    increment_by_value: false,
192                    kind: MetricKind::Incremental,
193                }),
194            }]),
195            all_metrics: Some(true),
196        })
197        .unwrap()
198    }
199}
200
201#[async_trait::async_trait]
202#[typetag::serde(name = "log_to_metric")]
203impl TransformConfig for LogToMetricConfig {
204    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
205        Ok(Transform::function(LogToMetric {
206            metrics: self.metrics.clone().unwrap_or_default(),
207            all_metrics: self.all_metrics.unwrap_or_default(),
208        }))
209    }
210
211    fn input(&self) -> Input {
212        Input::log()
213    }
214
215    fn outputs(
216        &self,
217        _: vector_lib::enrichment::TableRegistry,
218        _: &[(OutputId, schema::Definition)],
219        _: LogNamespace,
220    ) -> Vec<TransformOutput> {
221        // Converting the log to a metric means we lose all incoming `Definition`s.
222        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
223    }
224
225    fn enable_concurrency(&self) -> bool {
226        true
227    }
228}
229
230/// Kinds of TranformError for Parsing
231#[configurable_component]
232#[derive(Clone, Debug)]
233pub enum TransformParseErrorKind {
234    ///  Error when Parsing a Float
235    FloatError,
236    ///  Error when Parsing an Int
237    IntError,
238    /// Errors when Parsing Arrays
239    ArrayError,
240}
241
242impl std::fmt::Display for TransformParseErrorKind {
243    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
244        write!(f, "{self:?}")
245    }
246}
247
248enum TransformError {
249    PathNotFound {
250        path: String,
251    },
252    PathNull {
253        path: String,
254    },
255    MetricDetailsNotFound,
256    MetricValueError {
257        path: String,
258        path_value: String,
259    },
260    ParseError {
261        path: String,
262        kind: TransformParseErrorKind,
263    },
264    ParseFloatError {
265        path: String,
266        error: ParseFloatError,
267    },
268    TemplateRenderingError(TemplateRenderingError),
269    PairExpansionError {
270        key: String,
271        value: String,
272        error: serde_json::Error,
273    },
274}
275
276fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
277    template
278        .render_string(event)
279        .map_err(TransformError::TemplateRenderingError)
280}
281
282fn render_tags(
283    tags: &Option<IndexMap<Template, TagConfig>>,
284    event: &Event,
285) -> Result<Option<MetricTags>, TransformError> {
286    let mut static_tags: HashMap<String, String> = HashMap::new();
287    let mut dynamic_tags: HashMap<String, String> = HashMap::new();
288    Ok(match tags {
289        None => None,
290        Some(tags) => {
291            let mut result = MetricTags::default();
292            for (name, config) in tags {
293                match config {
294                    TagConfig::Plain(template) => {
295                        render_tag_into(
296                            event,
297                            name,
298                            template.as_ref(),
299                            &mut result,
300                            &mut static_tags,
301                            &mut dynamic_tags,
302                        )?;
303                    }
304                    TagConfig::Multi(vec) => {
305                        for template in vec {
306                            render_tag_into(
307                                event,
308                                name,
309                                template.as_ref(),
310                                &mut result,
311                                &mut static_tags,
312                                &mut dynamic_tags,
313                            )?;
314                        }
315                    }
316                }
317            }
318            for (k, v) in static_tags {
319                if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
320                    warn!(
321                        "Static tags overrides dynamic tags. \
322                key: {}, value: {:?}, discarded value: {:?}",
323                        k, v, discarded_v
324                    );
325                };
326            }
327            result.as_option()
328        }
329    })
330}
331
332fn render_tag_into(
333    event: &Event,
334    key_template: &Template,
335    value_template: Option<&Template>,
336    result: &mut MetricTags,
337    static_tags: &mut HashMap<String, String>,
338    dynamic_tags: &mut HashMap<String, String>,
339) -> Result<(), TransformError> {
340    let key = match render_template(key_template, event) {
341        Ok(key_s) => key_s,
342        Err(TransformError::TemplateRenderingError(err)) => {
343            emit!(crate::internal_events::TemplateRenderingError {
344                error: err,
345                drop_event: false,
346                field: Some(key_template.get_ref()),
347            });
348            return Ok(());
349        }
350        Err(err) => return Err(err),
351    };
352    match value_template {
353        None => {
354            result.insert(key, TagValue::Bare);
355        }
356        Some(template) => match render_template(template, event) {
357            Ok(value) => {
358                let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
359                    .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
360                result.extend(expanded_pairs);
361            }
362            Err(TransformError::TemplateRenderingError(value_error)) => {
363                emit!(crate::internal_events::TemplateRenderingError {
364                    error: value_error,
365                    drop_event: false,
366                    field: Some(template.get_ref()),
367                });
368                return Ok(());
369            }
370            Err(other) => return Err(other),
371        },
372    };
373    Ok(())
374}
375
376fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
377    let log = event.as_log();
378
379    let timestamp = log
380        .get_timestamp()
381        .and_then(Value::as_timestamp)
382        .cloned()
383        .or_else(|| Some(Utc::now()));
384
385    // Assign the OriginService for the new metric
386    let metadata = event
387        .metadata()
388        .clone()
389        .with_schema_definition(&Arc::new(Definition::any()))
390        .with_origin_metadata(DatadogMetricOriginMetadata::new(
391            None,
392            None,
393            Some(ORIGIN_SERVICE_VALUE),
394        ));
395
396    let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
397        path: config.field().to_string(),
398    })?;
399
400    let value = match log.get(&field) {
401        None => Err(TransformError::PathNotFound {
402            path: field.to_string(),
403        }),
404        Some(Value::Null) => Err(TransformError::PathNull {
405            path: field.to_string(),
406        }),
407        Some(value) => Ok(value),
408    }?;
409
410    let name = config.name.as_ref().unwrap_or(&config.field);
411    let name = render_template(name, event)?;
412
413    let namespace = config.namespace.as_ref();
414    let namespace = namespace
415        .map(|namespace| render_template(namespace, event))
416        .transpose()?;
417
418    let tags = render_tags(&config.tags, event)?;
419
420    let (kind, value) = match &config.metric {
421        MetricTypeConfig::Counter(counter) => {
422            let value = if counter.increment_by_value {
423                value.to_string_lossy().parse().map_err(|error| {
424                    TransformError::ParseFloatError {
425                        path: config.field.get_ref().to_owned(),
426                        error,
427                    }
428                })?
429            } else {
430                1.0
431            };
432
433            (counter.kind, MetricValue::Counter { value })
434        }
435        MetricTypeConfig::Histogram => {
436            let value = value.to_string_lossy().parse().map_err(|error| {
437                TransformError::ParseFloatError {
438                    path: field.to_string(),
439                    error,
440                }
441            })?;
442
443            (
444                MetricKind::Incremental,
445                MetricValue::Distribution {
446                    samples: vector_lib::samples![value => 1],
447                    statistic: StatisticKind::Histogram,
448                },
449            )
450        }
451        MetricTypeConfig::Summary => {
452            let value = value.to_string_lossy().parse().map_err(|error| {
453                TransformError::ParseFloatError {
454                    path: field.to_string(),
455                    error,
456                }
457            })?;
458
459            (
460                MetricKind::Incremental,
461                MetricValue::Distribution {
462                    samples: vector_lib::samples![value => 1],
463                    statistic: StatisticKind::Summary,
464                },
465            )
466        }
467        MetricTypeConfig::Gauge => {
468            let value = value.to_string_lossy().parse().map_err(|error| {
469                TransformError::ParseFloatError {
470                    path: field.to_string(),
471                    error,
472                }
473            })?;
474
475            (MetricKind::Absolute, MetricValue::Gauge { value })
476        }
477        MetricTypeConfig::Set => {
478            let value = value.to_string_lossy().into_owned();
479
480            (
481                MetricKind::Incremental,
482                MetricValue::Set {
483                    values: std::iter::once(value).collect(),
484                },
485            )
486        }
487    };
488    Ok(Metric::new_with_metadata(name, kind, value, metadata)
489        .with_namespace(namespace)
490        .with_tags(tags)
491        .with_timestamp(timestamp))
492}
493
494fn bytes_to_str(value: &Value) -> Option<String> {
495    match value {
496        Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
497        _ => None,
498    }
499}
500
501fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
502    // TODO: update returned errors after `TransformError` is refactored.
503    let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
504        PathParseError::InvalidPathSyntax { path } => PathNotFound {
505            path: path.to_string(),
506        },
507    })?;
508    match maybe_value {
509        None => Err(PathNotFound {
510            path: path.to_string(),
511        }),
512        Some(v) => Ok(bytes_to_str(v)),
513    }
514}
515
516fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
517    let counter_value = log
518        .get(event_path!("counter", "value"))
519        .ok_or_else(|| TransformError::PathNotFound {
520            path: "counter.value".to_string(),
521        })?
522        .as_float()
523        .ok_or_else(|| TransformError::ParseError {
524            path: "counter.value".to_string(),
525            kind: TransformParseErrorKind::FloatError,
526        })?;
527
528    Ok(MetricValue::Counter {
529        value: *counter_value,
530    })
531}
532
533fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
534    let gauge_value = log
535        .get(event_path!("gauge", "value"))
536        .ok_or_else(|| TransformError::PathNotFound {
537            path: "gauge.value".to_string(),
538        })?
539        .as_float()
540        .ok_or_else(|| TransformError::ParseError {
541            path: "gauge.value".to_string(),
542            kind: TransformParseErrorKind::FloatError,
543        })?;
544    Ok(MetricValue::Gauge {
545        value: *gauge_value,
546    })
547}
548
549fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
550    let set_values = log
551        .get(event_path!("set", "values"))
552        .ok_or_else(|| TransformError::PathNotFound {
553            path: "set.values".to_string(),
554        })?
555        .as_array()
556        .ok_or_else(|| TransformError::ParseError {
557            path: "set.values".to_string(),
558            kind: TransformParseErrorKind::ArrayError,
559        })?;
560
561    let mut values: Vec<String> = Vec::new();
562    for e_value in set_values {
563        let value = e_value
564            .as_bytes()
565            .ok_or_else(|| TransformError::ParseError {
566                path: "set.values".to_string(),
567                kind: TransformParseErrorKind::ArrayError,
568            })?;
569        values.push(String::from_utf8_lossy(value).to_string());
570    }
571
572    Ok(MetricValue::Set {
573        values: values.into_iter().collect(),
574    })
575}
576
577fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
578    let event_samples = log
579        .get(event_path!("distribution", "samples"))
580        .ok_or_else(|| TransformError::PathNotFound {
581            path: "distribution.samples".to_string(),
582        })?
583        .as_array()
584        .ok_or_else(|| TransformError::ParseError {
585            path: "distribution.samples".to_string(),
586            kind: TransformParseErrorKind::ArrayError,
587        })?;
588
589    let mut samples: Vec<Sample> = Vec::new();
590    for e_sample in event_samples {
591        let value = e_sample
592            .get(path!("value"))
593            .ok_or_else(|| TransformError::PathNotFound {
594                path: "value".to_string(),
595            })?
596            .as_float()
597            .ok_or_else(|| TransformError::ParseError {
598                path: "value".to_string(),
599                kind: TransformParseErrorKind::FloatError,
600            })?;
601
602        let rate = e_sample
603            .get(path!("rate"))
604            .ok_or_else(|| TransformError::PathNotFound {
605                path: "rate".to_string(),
606            })?
607            .as_integer()
608            .ok_or_else(|| TransformError::ParseError {
609                path: "rate".to_string(),
610                kind: TransformParseErrorKind::IntError,
611            })?;
612
613        samples.push(Sample {
614            value: *value,
615            rate: rate as u32,
616        });
617    }
618
619    let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
620        Some(n) => n,
621        None => {
622            return Err(TransformError::PathNotFound {
623                path: "distribution.statistic".to_string(),
624            })
625        }
626    };
627    let statistic_kind = match statistic_str.as_str() {
628        "histogram" => Ok(StatisticKind::Histogram),
629        "summary" => Ok(StatisticKind::Summary),
630        _ => Err(TransformError::MetricValueError {
631            path: "distribution.statistic".to_string(),
632            path_value: statistic_str.to_string(),
633        }),
634    }?;
635
636    Ok(MetricValue::Distribution {
637        samples,
638        statistic: statistic_kind,
639    })
640}
641
642fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
643    let event_buckets = log
644        .get(event_path!("histogram", "buckets"))
645        .ok_or_else(|| TransformError::PathNotFound {
646            path: "histogram.buckets".to_string(),
647        })?
648        .as_array()
649        .ok_or_else(|| TransformError::ParseError {
650            path: "histogram.buckets".to_string(),
651            kind: TransformParseErrorKind::ArrayError,
652        })?;
653
654    let mut buckets: Vec<Bucket> = Vec::new();
655    for e_bucket in event_buckets {
656        let upper_limit = e_bucket
657            .get(path!("upper_limit"))
658            .ok_or_else(|| TransformError::PathNotFound {
659                path: "histogram.buckets.upper_limit".to_string(),
660            })?
661            .as_float()
662            .ok_or_else(|| TransformError::ParseError {
663                path: "histogram.buckets.upper_limit".to_string(),
664                kind: TransformParseErrorKind::FloatError,
665            })?;
666
667        let count = e_bucket
668            .get(path!("count"))
669            .ok_or_else(|| TransformError::PathNotFound {
670                path: "histogram.buckets.count".to_string(),
671            })?
672            .as_integer()
673            .ok_or_else(|| TransformError::ParseError {
674                path: "histogram.buckets.count".to_string(),
675                kind: TransformParseErrorKind::IntError,
676            })?;
677
678        buckets.push(Bucket {
679            upper_limit: *upper_limit,
680            count: count as u64,
681        });
682    }
683
684    let count = log
685        .get(event_path!("histogram", "count"))
686        .ok_or_else(|| TransformError::PathNotFound {
687            path: "histogram.count".to_string(),
688        })?
689        .as_integer()
690        .ok_or_else(|| TransformError::ParseError {
691            path: "histogram.count".to_string(),
692            kind: TransformParseErrorKind::IntError,
693        })?;
694
695    let sum = log
696        .get(event_path!("histogram", "sum"))
697        .ok_or_else(|| TransformError::PathNotFound {
698            path: "histogram.sum".to_string(),
699        })?
700        .as_float()
701        .ok_or_else(|| TransformError::ParseError {
702            path: "histogram.sum".to_string(),
703            kind: TransformParseErrorKind::FloatError,
704        })?;
705
706    Ok(MetricValue::AggregatedHistogram {
707        buckets,
708        count: count as u64,
709        sum: *sum,
710    })
711}
712
713fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
714    let event_quantiles = log
715        .get(event_path!("summary", "quantiles"))
716        .ok_or_else(|| TransformError::PathNotFound {
717            path: "summary.quantiles".to_string(),
718        })?
719        .as_array()
720        .ok_or_else(|| TransformError::ParseError {
721            path: "summary.quantiles".to_string(),
722            kind: TransformParseErrorKind::ArrayError,
723        })?;
724
725    let mut quantiles: Vec<Quantile> = Vec::new();
726    for e_quantile in event_quantiles {
727        let quantile = e_quantile
728            .get(path!("quantile"))
729            .ok_or_else(|| TransformError::PathNotFound {
730                path: "summary.quantiles.quantile".to_string(),
731            })?
732            .as_float()
733            .ok_or_else(|| TransformError::ParseError {
734                path: "summary.quantiles.quantile".to_string(),
735                kind: TransformParseErrorKind::FloatError,
736            })?;
737
738        let value = e_quantile
739            .get(path!("value"))
740            .ok_or_else(|| TransformError::PathNotFound {
741                path: "summary.quantiles.value".to_string(),
742            })?
743            .as_float()
744            .ok_or_else(|| TransformError::ParseError {
745                path: "summary.quantiles.value".to_string(),
746                kind: TransformParseErrorKind::FloatError,
747            })?;
748
749        quantiles.push(Quantile {
750            quantile: *quantile,
751            value: *value,
752        })
753    }
754
755    let count = log
756        .get(event_path!("summary", "count"))
757        .ok_or_else(|| TransformError::PathNotFound {
758            path: "summary.count".to_string(),
759        })?
760        .as_integer()
761        .ok_or_else(|| TransformError::ParseError {
762            path: "summary.count".to_string(),
763            kind: TransformParseErrorKind::IntError,
764        })?;
765
766    let sum = log
767        .get(event_path!("summary", "sum"))
768        .ok_or_else(|| TransformError::PathNotFound {
769            path: "summary.sum".to_string(),
770        })?
771        .as_float()
772        .ok_or_else(|| TransformError::ParseError {
773            path: "summary.sum".to_string(),
774            kind: TransformParseErrorKind::FloatError,
775        })?;
776
777    Ok(MetricValue::AggregatedSummary {
778        quantiles,
779        count: count as u64,
780        sum: *sum,
781    })
782}
783
784fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
785    let log = event.as_log();
786    let timestamp = log
787        .get_timestamp()
788        .and_then(Value::as_timestamp)
789        .cloned()
790        .or_else(|| Some(Utc::now()));
791
792    let name = match try_get_string_from_log(log, "name")? {
793        Some(n) => n,
794        None => {
795            return Err(TransformError::PathNotFound {
796                path: "name".to_string(),
797            })
798        }
799    };
800
801    let mut tags = MetricTags::default();
802
803    if let Some(els) = log.get(event_path!("tags")) {
804        if let Some(el) = els.as_object() {
805            for (key, value) in el {
806                tags.insert(key.to_string(), bytes_to_str(value));
807            }
808        }
809    }
810    let tags_result = Some(tags);
811
812    let kind_str = match try_get_string_from_log(log, "kind")? {
813        Some(n) => n,
814        None => {
815            return Err(TransformError::PathNotFound {
816                path: "kind".to_string(),
817            })
818        }
819    };
820
821    let kind = match kind_str.as_str() {
822        "absolute" => Ok(MetricKind::Absolute),
823        "incremental" => Ok(MetricKind::Incremental),
824        value => Err(TransformError::MetricValueError {
825            path: "kind".to_string(),
826            path_value: value.to_string(),
827        }),
828    }?;
829
830    let mut value: Option<MetricValue> = None;
831    if let Some(root_event) = log.as_map() {
832        for key in root_event.keys() {
833            value = match key.as_str() {
834                "gauge" => Some(get_gauge_value(log)?),
835                "distribution" => Some(get_distribution_value(log)?),
836                "histogram" => Some(get_histogram_value(log)?),
837                "summary" => Some(get_summary_value(log)?),
838                "counter" => Some(get_counter_value(log)?),
839                "set" => Some(get_set_value(log)?),
840                _ => None,
841            };
842
843            if value.is_some() {
844                break;
845            }
846        }
847    }
848
849    let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
850
851    let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
852        .with_tags(tags_result)
853        .with_timestamp(timestamp);
854
855    if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
856        metric = metric.with_namespace(namespace);
857    }
858
859    Ok(metric)
860}
861
862impl FunctionTransform for LogToMetric {
863    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
864        // Metrics are "all or none" for a specific log. If a single fails, none are produced.
865        let mut buffer = Vec::with_capacity(self.metrics.len());
866        if self.all_metrics {
867            match to_metrics(&event) {
868                Ok(metric) => {
869                    output.push(Event::Metric(metric));
870                }
871                Err(err) => {
872                    match err {
873                        TransformError::MetricValueError { path, path_value } => {
874                            emit!(MetricMetadataInvalidFieldValueError {
875                                field: path.as_ref(),
876                                field_value: path_value.as_ref()
877                            })
878                        }
879                        TransformError::PathNotFound { path } => {
880                            emit!(ParserMissingFieldError::<DROP_EVENT> {
881                                field: path.as_ref()
882                            })
883                        }
884                        TransformError::ParseError { path, kind } => {
885                            emit!(MetricMetadataParseError {
886                                field: path.as_ref(),
887                                kind: &kind.to_string(),
888                            })
889                        }
890                        TransformError::MetricDetailsNotFound => {
891                            emit!(MetricMetadataMetricDetailsNotFoundError {})
892                        }
893                        TransformError::PairExpansionError { key, value, error } => {
894                            emit!(crate::internal_events::PairExpansionError {
895                                key: &key,
896                                value: &value,
897                                drop_event: true,
898                                error
899                            })
900                        }
901                        _ => {}
902                    };
903                }
904            }
905        } else {
906            for config in self.metrics.iter() {
907                match to_metric_with_config(config, &event) {
908                    Ok(metric) => {
909                        buffer.push(Event::Metric(metric));
910                    }
911                    Err(err) => {
912                        match err {
913                            TransformError::PathNull { path } => {
914                                emit!(LogToMetricFieldNullError {
915                                    field: path.as_ref()
916                                })
917                            }
918                            TransformError::PathNotFound { path } => {
919                                emit!(ParserMissingFieldError::<DROP_EVENT> {
920                                    field: path.as_ref()
921                                })
922                            }
923                            TransformError::ParseFloatError { path, error } => {
924                                emit!(LogToMetricParseFloatError {
925                                    field: path.as_ref(),
926                                    error
927                                })
928                            }
929                            TransformError::TemplateRenderingError(error) => {
930                                emit!(crate::internal_events::TemplateRenderingError {
931                                    error,
932                                    drop_event: true,
933                                    field: None,
934                                })
935                            }
936                            TransformError::PairExpansionError { key, value, error } => {
937                                emit!(crate::internal_events::PairExpansionError {
938                                    key: &key,
939                                    value: &value,
940                                    drop_event: true,
941                                    error
942                                })
943                            }
944                            _ => {}
945                        };
946                        // early return to prevent the partial buffer from being sent
947                        return;
948                    }
949                }
950            }
951        }
952
953        // Metric generation was successful, publish them all.
954        for event in buffer {
955            output.push(event);
956        }
957    }
958}
959
960#[cfg(test)]
961mod tests {
962    use super::*;
963    use crate::test_util::components::assert_transform_compliance;
964    use crate::transforms::test::create_topology;
965    use crate::{
966        config::log_schema,
967        event::{
968            metric::{Metric, MetricKind, MetricValue, StatisticKind},
969            Event, LogEvent,
970        },
971    };
972    use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
973    use std::sync::Arc;
974    use std::time::Duration;
975    use tokio::sync::mpsc;
976    use tokio_stream::wrappers::ReceiverStream;
977    use vector_lib::config::ComponentKey;
978    use vector_lib::event::{EventMetadata, ObjectMap};
979    use vector_lib::metric_tags;
980
981    #[test]
982    fn generate_config() {
983        crate::test_util::test_generate_config::<LogToMetricConfig>();
984    }
985
986    fn parse_config(s: &str) -> LogToMetricConfig {
987        toml::from_str(s).unwrap()
988    }
989
990    fn parse_yaml_config(s: &str) -> LogToMetricConfig {
991        serde_yaml::from_str(s).unwrap()
992    }
993
994    fn ts() -> DateTime<Utc> {
995        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
996            .single()
997            .and_then(|t| t.with_nanosecond(11))
998            .expect("invalid timestamp")
999    }
1000
1001    fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1002        let mut log = Event::Log(LogEvent::from("i am a log"));
1003        log.as_mut_log().insert(key, value);
1004        log.as_mut_log()
1005            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1006        log
1007    }
1008
1009    async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1010        assert_transform_compliance(async move {
1011            let (tx, rx) = mpsc::channel(1);
1012            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1013            tx.send(event).await.unwrap();
1014            let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1015                .await
1016                .unwrap_or(None);
1017            drop(tx);
1018            topology.stop().await;
1019            assert_eq!(out.recv().await, None);
1020            result
1021        })
1022        .await
1023    }
1024
1025    async fn do_transform_multiple_events(
1026        config: LogToMetricConfig,
1027        event: Event,
1028        count: usize,
1029    ) -> Vec<Event> {
1030        assert_transform_compliance(async move {
1031            let (tx, rx) = mpsc::channel(1);
1032            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1033            tx.send(event).await.unwrap();
1034
1035            let mut results = vec![];
1036            for _ in 0..count {
1037                let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1038                    .await
1039                    .unwrap_or(None);
1040                if let Some(event) = result {
1041                    results.push(event);
1042                }
1043            }
1044
1045            drop(tx);
1046            topology.stop().await;
1047            assert_eq!(out.recv().await, None);
1048            results
1049        })
1050        .await
1051    }
1052
1053    #[tokio::test]
1054    async fn count_http_status_codes() {
1055        let config = parse_config(
1056            r#"
1057            [[metrics]]
1058            type = "counter"
1059            field = "status"
1060            "#,
1061        );
1062
1063        let event = create_event("status", "42");
1064        let mut metadata =
1065            event
1066                .metadata()
1067                .clone()
1068                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1069                    None,
1070                    None,
1071                    Some(ORIGIN_SERVICE_VALUE),
1072                ));
1073        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1074        metadata.set_schema_definition(&Arc::new(Definition::any()));
1075        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1076        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1077        let metric = do_transform(config, event).await.unwrap();
1078
1079        assert_eq!(
1080            metric.into_metric(),
1081            Metric::new_with_metadata(
1082                "status",
1083                MetricKind::Incremental,
1084                MetricValue::Counter { value: 1.0 },
1085                metadata,
1086            )
1087            .with_timestamp(Some(ts()))
1088        );
1089    }
1090
1091    #[tokio::test]
1092    async fn count_http_requests_with_tags() {
1093        let config = parse_config(
1094            r#"
1095            [[metrics]]
1096            type = "counter"
1097            field = "message"
1098            name = "http_requests_total"
1099            namespace = "app"
1100            tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1101            "#,
1102        );
1103
1104        let mut event = create_event("message", "i am log");
1105        event.as_mut_log().insert("method", "post");
1106        event.as_mut_log().insert("code", "200");
1107        let mut metadata =
1108            event
1109                .metadata()
1110                .clone()
1111                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1112                    None,
1113                    None,
1114                    Some(ORIGIN_SERVICE_VALUE),
1115                ));
1116        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1117        metadata.set_schema_definition(&Arc::new(Definition::any()));
1118        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1119        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1120
1121        let metric = do_transform(config, event).await.unwrap();
1122
1123        assert_eq!(
1124            metric.into_metric(),
1125            Metric::new_with_metadata(
1126                "http_requests_total",
1127                MetricKind::Incremental,
1128                MetricValue::Counter { value: 1.0 },
1129                metadata,
1130            )
1131            .with_namespace(Some("app"))
1132            .with_tags(Some(metric_tags!(
1133                "method" => "post",
1134                "code" => "200",
1135                "host" => "localhost",
1136            )))
1137            .with_timestamp(Some(ts()))
1138        );
1139    }
1140
1141    #[tokio::test]
1142    async fn count_http_requests_with_tags_expansion() {
1143        let config = parse_config(
1144            r#"
1145            [[metrics]]
1146            type = "counter"
1147            field = "message"
1148            name = "http_requests_total"
1149            namespace = "app"
1150            tags = {"*" = "{{ dict }}"}
1151            "#,
1152        );
1153
1154        let mut event = create_event("message", "i am log");
1155        let log = event.as_mut_log();
1156
1157        let mut test_dict = ObjectMap::default();
1158        test_dict.insert("one".into(), Value::from("foo"));
1159        test_dict.insert("two".into(), Value::from("baz"));
1160        log.insert("dict", Value::from(test_dict));
1161
1162        let mut metadata =
1163            event
1164                .metadata()
1165                .clone()
1166                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1167                    None,
1168                    None,
1169                    Some(ORIGIN_SERVICE_VALUE),
1170                ));
1171        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1172        metadata.set_schema_definition(&Arc::new(Definition::any()));
1173        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1174        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1175
1176        let metric = do_transform(config, event).await.unwrap();
1177
1178        assert_eq!(
1179            metric.into_metric(),
1180            Metric::new_with_metadata(
1181                "http_requests_total",
1182                MetricKind::Incremental,
1183                MetricValue::Counter { value: 1.0 },
1184                metadata,
1185            )
1186            .with_namespace(Some("app"))
1187            .with_tags(Some(metric_tags!(
1188                "one" => "foo",
1189                "two" => "baz",
1190            )))
1191            .with_timestamp(Some(ts()))
1192        );
1193    }
1194    #[tokio::test]
1195    async fn count_http_requests_with_colliding_dynamic_tags() {
1196        let config = parse_config(
1197            r#"
1198            [[metrics]]
1199            type = "counter"
1200            field = "message"
1201            name = "http_requests_total"
1202            namespace = "app"
1203            tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1204            "#,
1205        );
1206
1207        let mut event = create_event("message", "i am log");
1208        let log = event.as_mut_log();
1209
1210        let mut map1 = ObjectMap::default();
1211        map1.insert("key1".into(), Value::from("val1"));
1212        log.insert("map1", Value::from(map1));
1213
1214        let mut map2 = ObjectMap::default();
1215        map2.insert("l1_key1".into(), Value::from("val2"));
1216        log.insert("map2", Value::from(map2));
1217
1218        let mut metadata =
1219            event
1220                .metadata()
1221                .clone()
1222                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1223                    None,
1224                    None,
1225                    Some(ORIGIN_SERVICE_VALUE),
1226                ));
1227        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1228        metadata.set_schema_definition(&Arc::new(Definition::any()));
1229        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1230        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1231
1232        let metric = do_transform(config, event).await.unwrap().into_metric();
1233        let tags = metric.tags().expect("Metric should have tags");
1234
1235        assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1236
1237        assert_eq!(tags.iter_all().count(), 2);
1238        for (name, value) in tags.iter_all() {
1239            assert_eq!(name, "l1_key1");
1240            assert!(value == Some("val1") || value == Some("val2"));
1241        }
1242    }
1243    #[tokio::test]
1244    async fn multi_value_tags_yaml() {
1245        // Have to use YAML to represent bare tags
1246        let config = parse_yaml_config(
1247            r#"
1248            metrics:
1249            - field: "message"
1250              type: "counter"
1251              tags:
1252                tag:
1253                - "one"
1254                - null
1255                - "two"
1256            "#,
1257        );
1258
1259        let event = create_event("message", "I am log");
1260        let metric = do_transform(config, event).await.unwrap().into_metric();
1261        let tags = metric.tags().expect("Metric should have tags");
1262
1263        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1264
1265        assert_eq!(tags.iter_all().count(), 3);
1266        for (name, value) in tags.iter_all() {
1267            assert_eq!(name, "tag");
1268            assert!(value.is_none() || value == Some("one") || value == Some("two"));
1269        }
1270    }
1271    #[tokio::test]
1272    async fn multi_value_tags_expansion_yaml() {
1273        // Have to use YAML to represent bare tags
1274        let config = parse_yaml_config(
1275            r#"
1276            metrics:
1277            - field: "message"
1278              type: "counter"
1279              tags:
1280                "*": "{{dict}}"
1281            "#,
1282        );
1283
1284        let mut event = create_event("message", "I am log");
1285        let log = event.as_mut_log();
1286
1287        let mut test_dict = ObjectMap::default();
1288        test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1289        log.insert("dict", Value::from(test_dict));
1290
1291        let metric = do_transform(config, event).await.unwrap().into_metric();
1292        let tags = metric.tags().expect("Metric should have tags");
1293
1294        assert_eq!(
1295            tags.iter_single().collect::<Vec<_>>(),
1296            vec![("one", "[\"foo\",\"baz\"]")]
1297        );
1298
1299        assert_eq!(tags.iter_all().count(), 1);
1300        for (name, value) in tags.iter_all() {
1301            assert_eq!(name, "one");
1302            assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1303        }
1304    }
1305
1306    #[tokio::test]
1307    async fn multi_value_tags_toml() {
1308        let config = parse_config(
1309            r#"
1310            [[metrics]]
1311            field = "message"
1312            type = "counter"
1313            [metrics.tags]
1314            tag = ["one", "two"]
1315            "#,
1316        );
1317
1318        let event = create_event("message", "I am log");
1319        let metric = do_transform(config, event).await.unwrap().into_metric();
1320        let tags = metric.tags().expect("Metric should have tags");
1321
1322        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1323
1324        assert_eq!(tags.iter_all().count(), 2);
1325        for (name, value) in tags.iter_all() {
1326            assert_eq!(name, "tag");
1327            assert!(value == Some("one") || value == Some("two"));
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn count_exceptions() {
1333        let config = parse_config(
1334            r#"
1335            [[metrics]]
1336            type = "counter"
1337            field = "backtrace"
1338            name = "exception_total"
1339            "#,
1340        );
1341
1342        let event = create_event("backtrace", "message");
1343        let mut metadata =
1344            event
1345                .metadata()
1346                .clone()
1347                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1348                    None,
1349                    None,
1350                    Some(ORIGIN_SERVICE_VALUE),
1351                ));
1352        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1353        metadata.set_schema_definition(&Arc::new(Definition::any()));
1354        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1355        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1356
1357        let metric = do_transform(config, event).await.unwrap();
1358
1359        assert_eq!(
1360            metric.into_metric(),
1361            Metric::new_with_metadata(
1362                "exception_total",
1363                MetricKind::Incremental,
1364                MetricValue::Counter { value: 1.0 },
1365                metadata
1366            )
1367            .with_timestamp(Some(ts()))
1368        );
1369    }
1370
1371    #[tokio::test]
1372    async fn count_exceptions_no_match() {
1373        let config = parse_config(
1374            r#"
1375            [[metrics]]
1376            type = "counter"
1377            field = "backtrace"
1378            name = "exception_total"
1379            "#,
1380        );
1381
1382        let event = create_event("success", "42");
1383        assert_eq!(do_transform(config, event).await, None);
1384    }
1385
1386    #[tokio::test]
1387    async fn sum_order_amounts() {
1388        let config = parse_config(
1389            r#"
1390            [[metrics]]
1391            type = "counter"
1392            field = "amount"
1393            name = "amount_total"
1394            increment_by_value = true
1395            "#,
1396        );
1397
1398        let event = create_event("amount", "33.99");
1399        let mut metadata =
1400            event
1401                .metadata()
1402                .clone()
1403                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1404                    None,
1405                    None,
1406                    Some(ORIGIN_SERVICE_VALUE),
1407                ));
1408        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1409        metadata.set_schema_definition(&Arc::new(Definition::any()));
1410        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1411        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1412        let metric = do_transform(config, event).await.unwrap();
1413
1414        assert_eq!(
1415            metric.into_metric(),
1416            Metric::new_with_metadata(
1417                "amount_total",
1418                MetricKind::Incremental,
1419                MetricValue::Counter { value: 33.99 },
1420                metadata,
1421            )
1422            .with_timestamp(Some(ts()))
1423        );
1424    }
1425
1426    #[tokio::test]
1427    async fn count_absolute() {
1428        let config = parse_config(
1429            r#"
1430            [[metrics]]
1431            type = "counter"
1432            field = "amount"
1433            name = "amount_total"
1434            increment_by_value = true
1435            kind = "absolute"
1436            "#,
1437        );
1438
1439        let event = create_event("amount", "33.99");
1440        let mut metadata =
1441            event
1442                .metadata()
1443                .clone()
1444                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1445                    None,
1446                    None,
1447                    Some(ORIGIN_SERVICE_VALUE),
1448                ));
1449        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1450        metadata.set_schema_definition(&Arc::new(Definition::any()));
1451        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1452        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1453
1454        let metric = do_transform(config, event).await.unwrap();
1455
1456        assert_eq!(
1457            metric.into_metric(),
1458            Metric::new_with_metadata(
1459                "amount_total",
1460                MetricKind::Absolute,
1461                MetricValue::Counter { value: 33.99 },
1462                metadata,
1463            )
1464            .with_timestamp(Some(ts()))
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn memory_usage_gauge() {
1470        let config = parse_config(
1471            r#"
1472            [[metrics]]
1473            type = "gauge"
1474            field = "memory_rss"
1475            name = "memory_rss_bytes"
1476            "#,
1477        );
1478
1479        let event = create_event("memory_rss", "123");
1480        let mut metadata =
1481            event
1482                .metadata()
1483                .clone()
1484                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1485                    None,
1486                    None,
1487                    Some(ORIGIN_SERVICE_VALUE),
1488                ));
1489
1490        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1491        metadata.set_schema_definition(&Arc::new(Definition::any()));
1492
1493        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1494        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1495
1496        let metric = do_transform(config, event).await.unwrap();
1497
1498        assert_eq!(
1499            metric.into_metric(),
1500            Metric::new_with_metadata(
1501                "memory_rss_bytes",
1502                MetricKind::Absolute,
1503                MetricValue::Gauge { value: 123.0 },
1504                metadata,
1505            )
1506            .with_timestamp(Some(ts()))
1507        );
1508    }
1509
1510    #[tokio::test]
1511    async fn parse_failure() {
1512        let config = parse_config(
1513            r#"
1514            [[metrics]]
1515            type = "counter"
1516            field = "status"
1517            name = "status_total"
1518            increment_by_value = true
1519            "#,
1520        );
1521
1522        let event = create_event("status", "not a number");
1523        assert_eq!(do_transform(config, event).await, None);
1524    }
1525
1526    #[tokio::test]
1527    async fn missing_field() {
1528        let config = parse_config(
1529            r#"
1530            [[metrics]]
1531            type = "counter"
1532            field = "status"
1533            name = "status_total"
1534            "#,
1535        );
1536
1537        let event = create_event("not foo", "not a number");
1538        assert_eq!(do_transform(config, event).await, None);
1539    }
1540
1541    #[tokio::test]
1542    async fn null_field() {
1543        let config = parse_config(
1544            r#"
1545            [[metrics]]
1546            type = "counter"
1547            field = "status"
1548            name = "status_total"
1549            "#,
1550        );
1551
1552        let event = create_event("status", Value::Null);
1553        assert_eq!(do_transform(config, event).await, None);
1554    }
1555
1556    #[tokio::test]
1557    async fn multiple_metrics() {
1558        let config = parse_config(
1559            r#"
1560            [[metrics]]
1561            type = "counter"
1562            field = "status"
1563
1564            [[metrics]]
1565            type = "counter"
1566            field = "backtrace"
1567            name = "exception_total"
1568            "#,
1569        );
1570
1571        let mut event = Event::Log(LogEvent::from("i am a log"));
1572        event
1573            .as_mut_log()
1574            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1575        event.as_mut_log().insert("status", "42");
1576        event.as_mut_log().insert("backtrace", "message");
1577        let mut metadata =
1578            event
1579                .metadata()
1580                .clone()
1581                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1582                    None,
1583                    None,
1584                    Some(ORIGIN_SERVICE_VALUE),
1585                ));
1586
1587        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1588        metadata.set_schema_definition(&Arc::new(Definition::any()));
1589        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1590        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1591
1592        let output = do_transform_multiple_events(config, event, 2).await;
1593
1594        assert_eq!(2, output.len());
1595        assert_eq!(
1596            output[0].clone().into_metric(),
1597            Metric::new_with_metadata(
1598                "status",
1599                MetricKind::Incremental,
1600                MetricValue::Counter { value: 1.0 },
1601                metadata.clone(),
1602            )
1603            .with_timestamp(Some(ts()))
1604        );
1605        assert_eq!(
1606            output[1].clone().into_metric(),
1607            Metric::new_with_metadata(
1608                "exception_total",
1609                MetricKind::Incremental,
1610                MetricValue::Counter { value: 1.0 },
1611                metadata,
1612            )
1613            .with_timestamp(Some(ts()))
1614        );
1615    }
1616
1617    #[tokio::test]
1618    async fn multiple_metrics_with_multiple_templates() {
1619        let config = parse_config(
1620            r#"
1621            [[metrics]]
1622            type = "set"
1623            field = "status"
1624            name = "{{host}}_{{worker}}_status_set"
1625
1626            [[metrics]]
1627            type = "counter"
1628            field = "backtrace"
1629            name = "{{service}}_exception_total"
1630            namespace = "{{host}}"
1631            "#,
1632        );
1633
1634        let mut event = Event::Log(LogEvent::from("i am a log"));
1635        event
1636            .as_mut_log()
1637            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1638        event.as_mut_log().insert("status", "42");
1639        event.as_mut_log().insert("backtrace", "message");
1640        event.as_mut_log().insert("host", "local");
1641        event.as_mut_log().insert("worker", "abc");
1642        event.as_mut_log().insert("service", "xyz");
1643        let mut metadata =
1644            event
1645                .metadata()
1646                .clone()
1647                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1648                    None,
1649                    None,
1650                    Some(ORIGIN_SERVICE_VALUE),
1651                ));
1652
1653        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1654        metadata.set_schema_definition(&Arc::new(Definition::any()));
1655        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1656        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1657
1658        let output = do_transform_multiple_events(config, event, 2).await;
1659
1660        assert_eq!(2, output.len());
1661        assert_eq!(
1662            output[0].as_metric(),
1663            &Metric::new_with_metadata(
1664                "local_abc_status_set",
1665                MetricKind::Incremental,
1666                MetricValue::Set {
1667                    values: vec!["42".into()].into_iter().collect()
1668                },
1669                metadata.clone(),
1670            )
1671            .with_timestamp(Some(ts()))
1672        );
1673        assert_eq!(
1674            output[1].as_metric(),
1675            &Metric::new_with_metadata(
1676                "xyz_exception_total",
1677                MetricKind::Incremental,
1678                MetricValue::Counter { value: 1.0 },
1679                metadata,
1680            )
1681            .with_namespace(Some("local"))
1682            .with_timestamp(Some(ts()))
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn user_ip_set() {
1688        let config = parse_config(
1689            r#"
1690            [[metrics]]
1691            type = "set"
1692            field = "user_ip"
1693            name = "unique_user_ip"
1694            "#,
1695        );
1696
1697        let event = create_event("user_ip", "1.2.3.4");
1698        let mut metadata =
1699            event
1700                .metadata()
1701                .clone()
1702                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1703                    None,
1704                    None,
1705                    Some(ORIGIN_SERVICE_VALUE),
1706                ));
1707        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1708        metadata.set_schema_definition(&Arc::new(Definition::any()));
1709        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1710        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1711
1712        let metric = do_transform(config, event).await.unwrap();
1713
1714        assert_eq!(
1715            metric.into_metric(),
1716            Metric::new_with_metadata(
1717                "unique_user_ip",
1718                MetricKind::Incremental,
1719                MetricValue::Set {
1720                    values: vec!["1.2.3.4".into()].into_iter().collect()
1721                },
1722                metadata,
1723            )
1724            .with_timestamp(Some(ts()))
1725        );
1726    }
1727
1728    #[tokio::test]
1729    async fn response_time_histogram() {
1730        let config = parse_config(
1731            r#"
1732            [[metrics]]
1733            type = "histogram"
1734            field = "response_time"
1735            "#,
1736        );
1737
1738        let event = create_event("response_time", "2.5");
1739        let mut metadata =
1740            event
1741                .metadata()
1742                .clone()
1743                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1744                    None,
1745                    None,
1746                    Some(ORIGIN_SERVICE_VALUE),
1747                ));
1748
1749        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1750        metadata.set_schema_definition(&Arc::new(Definition::any()));
1751        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1752        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1753
1754        let metric = do_transform(config, event).await.unwrap();
1755
1756        assert_eq!(
1757            metric.into_metric(),
1758            Metric::new_with_metadata(
1759                "response_time",
1760                MetricKind::Incremental,
1761                MetricValue::Distribution {
1762                    samples: vector_lib::samples![2.5 => 1],
1763                    statistic: StatisticKind::Histogram
1764                },
1765                metadata
1766            )
1767            .with_timestamp(Some(ts()))
1768        );
1769    }
1770
1771    #[tokio::test]
1772    async fn response_time_summary() {
1773        let config = parse_config(
1774            r#"
1775            [[metrics]]
1776            type = "summary"
1777            field = "response_time"
1778            "#,
1779        );
1780
1781        let event = create_event("response_time", "2.5");
1782        let mut metadata =
1783            event
1784                .metadata()
1785                .clone()
1786                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787                    None,
1788                    None,
1789                    Some(ORIGIN_SERVICE_VALUE),
1790                ));
1791
1792        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1793        metadata.set_schema_definition(&Arc::new(Definition::any()));
1794        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1795        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1796
1797        let metric = do_transform(config, event).await.unwrap();
1798
1799        assert_eq!(
1800            metric.into_metric(),
1801            Metric::new_with_metadata(
1802                "response_time",
1803                MetricKind::Incremental,
1804                MetricValue::Distribution {
1805                    samples: vector_lib::samples![2.5 => 1],
1806                    statistic: StatisticKind::Summary
1807                },
1808                metadata
1809            )
1810            .with_timestamp(Some(ts()))
1811        );
1812    }
1813
1814    //  Metric Metadata Tests
1815    //
1816    fn create_log_event(json_str: &str) -> Event {
1817        create_log_event_with_namespace(json_str, Some("test_namespace"))
1818    }
1819
1820    fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1821        let mut log_value: Value =
1822            serde_json::from_str(json_str).expect("JSON was not well-formatted");
1823        log_value.insert("timestamp", ts());
1824
1825        if let Some(namespace) = namespace {
1826            log_value.insert("namespace", namespace);
1827        }
1828
1829        let mut metadata = EventMetadata::default();
1830        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1831        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1832
1833        Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1834    }
1835
1836    #[tokio::test]
1837    async fn transform_gauge() {
1838        let config = LogToMetricConfig {
1839            metrics: None,
1840            all_metrics: Some(true),
1841        };
1842
1843        let json_str = r#"{
1844          "gauge": {
1845            "value": 990.0
1846          },
1847          "kind": "absolute",
1848          "name": "test.transform.gauge",
1849          "tags": {
1850            "env": "test_env",
1851            "host": "localhost"
1852          }
1853        }"#;
1854        let log = create_log_event(json_str);
1855        let metric = do_transform(config, log.clone()).await.unwrap();
1856        assert_eq!(
1857            *metric.as_metric(),
1858            Metric::new_with_metadata(
1859                "test.transform.gauge",
1860                MetricKind::Absolute,
1861                MetricValue::Gauge { value: 990.0 },
1862                metric.metadata().clone(),
1863            )
1864            .with_namespace(Some("test_namespace"))
1865            .with_tags(Some(metric_tags!(
1866                "env" => "test_env",
1867                "host" => "localhost",
1868            )))
1869            .with_timestamp(Some(ts()))
1870        );
1871    }
1872
1873    #[tokio::test]
1874    async fn transform_histogram() {
1875        let config = LogToMetricConfig {
1876            metrics: None,
1877            all_metrics: Some(true),
1878        };
1879
1880        let json_str = r#"{
1881          "histogram": {
1882            "sum": 18.0,
1883            "count": 5,
1884            "buckets": [
1885              {
1886                "upper_limit": 1.0,
1887                "count": 1
1888              },
1889              {
1890                "upper_limit": 2.0,
1891                "count": 2
1892              },
1893              {
1894                "upper_limit": 5.0,
1895                "count": 1
1896              },
1897              {
1898                "upper_limit": 10.0,
1899                "count": 1
1900              }
1901            ]
1902          },
1903          "kind": "absolute",
1904          "name": "test.transform.histogram",
1905          "tags": {
1906            "env": "test_env",
1907            "host": "localhost"
1908          }
1909        }"#;
1910        let log = create_log_event(json_str);
1911        let metric = do_transform(config, log.clone()).await.unwrap();
1912        assert_eq!(
1913            *metric.as_metric(),
1914            Metric::new_with_metadata(
1915                "test.transform.histogram",
1916                MetricKind::Absolute,
1917                MetricValue::AggregatedHistogram {
1918                    count: 5,
1919                    sum: 18.0,
1920                    buckets: vec![
1921                        Bucket {
1922                            upper_limit: 1.0,
1923                            count: 1,
1924                        },
1925                        Bucket {
1926                            upper_limit: 2.0,
1927                            count: 2,
1928                        },
1929                        Bucket {
1930                            upper_limit: 5.0,
1931                            count: 1,
1932                        },
1933                        Bucket {
1934                            upper_limit: 10.0,
1935                            count: 1,
1936                        },
1937                    ],
1938                },
1939                metric.metadata().clone(),
1940            )
1941            .with_namespace(Some("test_namespace"))
1942            .with_tags(Some(metric_tags!(
1943                "env" => "test_env",
1944                "host" => "localhost",
1945            )))
1946            .with_timestamp(Some(ts()))
1947        );
1948    }
1949
1950    #[tokio::test]
1951    async fn transform_distribution_histogram() {
1952        let config = LogToMetricConfig {
1953            metrics: None,
1954            all_metrics: Some(true),
1955        };
1956
1957        let json_str = r#"{
1958          "distribution": {
1959            "samples": [
1960              {
1961                "value": 1.0,
1962                "rate": 1
1963              },
1964              {
1965                "value": 2.0,
1966                "rate": 2
1967              }
1968            ],
1969            "statistic": "histogram"
1970          },
1971          "kind": "absolute",
1972          "name": "test.transform.distribution_histogram",
1973          "tags": {
1974            "env": "test_env",
1975            "host": "localhost"
1976          }
1977        }"#;
1978        let log = create_log_event(json_str);
1979        let metric = do_transform(config, log.clone()).await.unwrap();
1980        assert_eq!(
1981            *metric.as_metric(),
1982            Metric::new_with_metadata(
1983                "test.transform.distribution_histogram",
1984                MetricKind::Absolute,
1985                MetricValue::Distribution {
1986                    samples: vec![
1987                        Sample {
1988                            value: 1.0,
1989                            rate: 1
1990                        },
1991                        Sample {
1992                            value: 2.0,
1993                            rate: 2
1994                        },
1995                    ],
1996                    statistic: StatisticKind::Histogram,
1997                },
1998                metric.metadata().clone(),
1999            )
2000            .with_namespace(Some("test_namespace"))
2001            .with_tags(Some(metric_tags!(
2002                "env" => "test_env",
2003                "host" => "localhost",
2004            )))
2005            .with_timestamp(Some(ts()))
2006        );
2007    }
2008
2009    #[tokio::test]
2010    async fn transform_distribution_summary() {
2011        let config = LogToMetricConfig {
2012            metrics: None,
2013            all_metrics: Some(true),
2014        };
2015
2016        let json_str = r#"{
2017          "distribution": {
2018            "samples": [
2019              {
2020                "value": 1.0,
2021                "rate": 1
2022              },
2023              {
2024                "value": 2.0,
2025                "rate": 2
2026              }
2027            ],
2028            "statistic": "summary"
2029          },
2030          "kind": "absolute",
2031          "name": "test.transform.distribution_summary",
2032          "tags": {
2033            "env": "test_env",
2034            "host": "localhost"
2035          }
2036        }"#;
2037        let log = create_log_event(json_str);
2038        let metric = do_transform(config, log.clone()).await.unwrap();
2039        assert_eq!(
2040            *metric.as_metric(),
2041            Metric::new_with_metadata(
2042                "test.transform.distribution_summary",
2043                MetricKind::Absolute,
2044                MetricValue::Distribution {
2045                    samples: vec![
2046                        Sample {
2047                            value: 1.0,
2048                            rate: 1
2049                        },
2050                        Sample {
2051                            value: 2.0,
2052                            rate: 2
2053                        },
2054                    ],
2055                    statistic: StatisticKind::Summary,
2056                },
2057                metric.metadata().clone(),
2058            )
2059            .with_namespace(Some("test_namespace"))
2060            .with_tags(Some(metric_tags!(
2061                "env" => "test_env",
2062                "host" => "localhost",
2063            )))
2064            .with_timestamp(Some(ts()))
2065        );
2066    }
2067
2068    #[tokio::test]
2069    async fn transform_summary() {
2070        let config = LogToMetricConfig {
2071            metrics: None,
2072            all_metrics: Some(true),
2073        };
2074
2075        let json_str = r#"{
2076          "summary": {
2077            "sum": 100.0,
2078            "count": 7,
2079            "quantiles": [
2080              {
2081                "quantile": 0.05,
2082                "value": 10.0
2083              },
2084              {
2085                "quantile": 0.95,
2086                "value": 25.0
2087              }
2088            ]
2089          },
2090          "kind": "absolute",
2091          "name": "test.transform.histogram",
2092          "tags": {
2093            "env": "test_env",
2094            "host": "localhost"
2095          }
2096        }"#;
2097        let log = create_log_event(json_str);
2098        let metric = do_transform(config, log.clone()).await.unwrap();
2099        assert_eq!(
2100            *metric.as_metric(),
2101            Metric::new_with_metadata(
2102                "test.transform.histogram",
2103                MetricKind::Absolute,
2104                MetricValue::AggregatedSummary {
2105                    quantiles: vec![
2106                        Quantile {
2107                            quantile: 0.05,
2108                            value: 10.0,
2109                        },
2110                        Quantile {
2111                            quantile: 0.95,
2112                            value: 25.0,
2113                        },
2114                    ],
2115                    count: 7,
2116                    sum: 100.0,
2117                },
2118                metric.metadata().clone(),
2119            )
2120            .with_namespace(Some("test_namespace"))
2121            .with_tags(Some(metric_tags!(
2122                "env" => "test_env",
2123                "host" => "localhost",
2124            )))
2125            .with_timestamp(Some(ts()))
2126        );
2127    }
2128
2129    #[tokio::test]
2130    async fn transform_counter() {
2131        let config = LogToMetricConfig {
2132            metrics: None,
2133            all_metrics: Some(true),
2134        };
2135
2136        let json_str = r#"{
2137          "counter": {
2138            "value": 10.0
2139          },
2140          "kind": "incremental",
2141          "name": "test.transform.counter",
2142          "tags": {
2143            "env": "test_env",
2144            "host": "localhost"
2145          }
2146        }"#;
2147        let log = create_log_event(json_str);
2148        let metric = do_transform(config, log.clone()).await.unwrap();
2149        assert_eq!(
2150            *metric.as_metric(),
2151            Metric::new_with_metadata(
2152                "test.transform.counter",
2153                MetricKind::Incremental,
2154                MetricValue::Counter { value: 10.0 },
2155                metric.metadata().clone(),
2156            )
2157            .with_namespace(Some("test_namespace"))
2158            .with_tags(Some(metric_tags!(
2159                "env" => "test_env",
2160                "host" => "localhost",
2161            )))
2162            .with_timestamp(Some(ts()))
2163        );
2164    }
2165
2166    #[tokio::test]
2167    async fn transform_set() {
2168        let config = LogToMetricConfig {
2169            metrics: None,
2170            all_metrics: Some(true),
2171        };
2172
2173        let json_str = r#"{
2174          "set": {
2175            "values": ["990.0", "1234"]
2176          },
2177          "kind": "incremental",
2178          "name": "test.transform.set",
2179          "tags": {
2180            "env": "test_env",
2181            "host": "localhost"
2182          }
2183        }"#;
2184        let log = create_log_event(json_str);
2185        let metric = do_transform(config, log.clone()).await.unwrap();
2186        assert_eq!(
2187            *metric.as_metric(),
2188            Metric::new_with_metadata(
2189                "test.transform.set",
2190                MetricKind::Incremental,
2191                MetricValue::Set {
2192                    values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2193                },
2194                metric.metadata().clone(),
2195            )
2196            .with_namespace(Some("test_namespace"))
2197            .with_tags(Some(metric_tags!(
2198                "env" => "test_env",
2199                "host" => "localhost",
2200            )))
2201            .with_timestamp(Some(ts()))
2202        );
2203    }
2204
2205    #[tokio::test]
2206    async fn transform_all_metrics_optional_namespace() {
2207        let config = LogToMetricConfig {
2208            metrics: None,
2209            all_metrics: Some(true),
2210        };
2211
2212        let json_str = r#"{
2213          "counter": {
2214            "value": 10.0
2215          },
2216          "kind": "incremental",
2217          "name": "test.transform.counter",
2218          "tags": {
2219            "env": "test_env",
2220            "host": "localhost"
2221          }
2222        }"#;
2223        let log = create_log_event_with_namespace(json_str, None);
2224        let metric = do_transform(config, log.clone()).await.unwrap();
2225        assert_eq!(
2226            *metric.as_metric(),
2227            Metric::new_with_metadata(
2228                "test.transform.counter",
2229                MetricKind::Incremental,
2230                MetricValue::Counter { value: 10.0 },
2231                metric.metadata().clone(),
2232            )
2233            .with_tags(Some(metric_tags!(
2234                "env" => "test_env",
2235                "host" => "localhost",
2236            )))
2237            .with_timestamp(Some(ts()))
2238        );
2239    }
2240}