vector/transforms/
log_to_metric.rs

1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6    configurable::configurable_component,
7    event::{
8        DatadogMetricOriginMetadata, LogEvent,
9        metric::{Bucket, Quantile, Sample},
10    },
11};
12use vrl::{
13    event_path, path,
14    path::{PathParseError, parse_target_path},
15};
16
17use crate::{
18    common::expansion::pair_expansion,
19    config::{
20        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
21        TransformOutput, schema::Definition,
22    },
23    event::{
24        Event, Value,
25        metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
26    },
27    internal_events::{
28        DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
29        MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
30        MetricMetadataParseError, ParserMissingFieldError,
31    },
32    schema,
33    template::{Template, TemplateRenderingError},
34    transforms::{
35        FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
36    },
37};
38
39const ORIGIN_SERVICE_VALUE: u32 = 3;
40
41/// Configuration for the `log_to_metric` transform.
42#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct LogToMetricConfig {
46    /// A list of metrics to generate.
47    pub metrics: Option<Vec<MetricConfig>>,
48
49    /// Setting this flag changes the behavior of this transformation.
50    /// Notably the `metrics` field will be ignored.
51    /// All incoming events will be processed and if possible they will be converted to log events.
52    /// Otherwise, only items specified in the `metrics` field will be processed.
53    ///
54    /// Example:
55    /// <pre class="chroma"><code class="language-toml" data-lang="toml">{
56    ///     "counter": {
57    ///         "value": 10.0
58    ///     },
59    ///     "kind": "incremental",
60    ///     "name": "test.transform.counter",
61    ///     "tags": {
62    ///         "env": "test_env",
63    ///         "host": "localhost"
64    ///     }
65    /// }
66    /// </code></pre>
67    ///
68    /// This is a JSON representation of a counter with the following properties:
69    ///
70    /// - `counter`: An object with a single property `value` representing the counter value, in this case, `10.0`).
71    /// - `kind`: A string indicating the kind of counter, in this case, "incremental".
72    /// - `name`: A string representing the name of the counter, here set to "test.transform.counter".
73    /// - `tags`: An object containing additional tags such as "env" and "host".
74    ///
75    /// Objects that can be processed include counter, histogram, gauge, set and summary.
76    pub all_metrics: Option<bool>,
77}
78
79/// Specification of a counter derived from a log event.
80#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct CounterConfig {
83    /// Increments the counter by the value in `field`, instead of only by `1`.
84    #[serde(default = "default_increment_by_value")]
85    pub increment_by_value: bool,
86
87    #[configurable(derived)]
88    #[serde(default = "default_kind")]
89    pub kind: MetricKind,
90}
91
92/// Specification of a metric derived from a log event.
93// TODO: While we're resolving the schema for this enum somewhat reasonably (in
94// `generate-components-docs.rb`), we have a problem where an overlapping field (overlap between two
95// or more of the subschemas) takes the details of the last subschema to be iterated over that
96// contains that field, such that, for example, the `Summary` variant below is overriding the
97// description for almost all of the fields because they're shared across all of the variants.
98#[configurable_component]
99#[derive(Clone, Debug)]
100pub struct MetricConfig {
101    /// Name of the field in the event to generate the metric.
102    pub field: Template,
103
104    /// Overrides the name of the counter.
105    ///
106    /// If not specified, `field` is used as the name of the metric.
107    pub name: Option<Template>,
108
109    /// Sets the namespace for the metric.
110    pub namespace: Option<Template>,
111
112    /// Tags to apply to the metric.
113    ///
114    /// Both keys and values can be templated, allowing you to attach dynamic tags to events.
115    ///
116    #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
117    pub tags: Option<IndexMap<Template, TagConfig>>,
118
119    #[configurable(derived)]
120    #[serde(flatten)]
121    pub metric: MetricTypeConfig,
122}
123
124/// Specification of the value of a created tag.
125///
126/// This may be a single value, a `null` for a bare tag, or an array of either.
127#[configurable_component]
128#[derive(Clone, Debug)]
129#[serde(untagged)]
130pub enum TagConfig {
131    /// A single tag value.
132    Plain(Option<Template>),
133
134    /// An array of values to give to the same tag name.
135    Multi(Vec<Option<Template>>),
136}
137
138/// Specification of the type of an individual metric, and any associated data.
139#[configurable_component]
140#[derive(Clone, Debug)]
141#[serde(tag = "type", rename_all = "snake_case")]
142#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
143pub enum MetricTypeConfig {
144    /// A counter.
145    Counter(CounterConfig),
146
147    /// A histogram.
148    Histogram,
149
150    /// A gauge.
151    Gauge,
152
153    /// A set.
154    Set,
155
156    /// A summary.
157    Summary,
158}
159
160impl MetricConfig {
161    fn field(&self) -> &str {
162        self.field.get_ref()
163    }
164}
165
166const fn default_increment_by_value() -> bool {
167    false
168}
169
170const fn default_kind() -> MetricKind {
171    MetricKind::Incremental
172}
173
174#[derive(Debug, Clone)]
175pub struct LogToMetric {
176    pub metrics: Vec<MetricConfig>,
177    pub all_metrics: bool,
178}
179
180impl GenerateConfig for LogToMetricConfig {
181    fn generate_config() -> toml::Value {
182        toml::Value::try_from(Self {
183            metrics: Some(vec![MetricConfig {
184                field: "field_name".try_into().expect("Fixed template"),
185                name: None,
186                namespace: None,
187                tags: None,
188                metric: MetricTypeConfig::Counter(CounterConfig {
189                    increment_by_value: false,
190                    kind: MetricKind::Incremental,
191                }),
192            }]),
193            all_metrics: Some(true),
194        })
195        .unwrap()
196    }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "log_to_metric")]
201impl TransformConfig for LogToMetricConfig {
202    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
203        Ok(Transform::function(LogToMetric {
204            metrics: self.metrics.clone().unwrap_or_default(),
205            all_metrics: self.all_metrics.unwrap_or_default(),
206        }))
207    }
208
209    fn input(&self) -> Input {
210        Input::log()
211    }
212
213    fn outputs(
214        &self,
215        _: &TransformContext,
216        _: &[(OutputId, schema::Definition)],
217    ) -> Vec<TransformOutput> {
218        // Converting the log to a metric means we lose all incoming `Definition`s.
219        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
220    }
221
222    fn enable_concurrency(&self) -> bool {
223        true
224    }
225}
226
227/// Kinds of TranformError for Parsing
228#[configurable_component]
229#[derive(Clone, Debug)]
230pub enum TransformParseErrorKind {
231    ///  Error when Parsing a Float
232    FloatError,
233    ///  Error when Parsing an Int
234    IntError,
235    /// Errors when Parsing Arrays
236    ArrayError,
237}
238
239impl std::fmt::Display for TransformParseErrorKind {
240    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
241        write!(f, "{self:?}")
242    }
243}
244
245enum TransformError {
246    PathNotFound {
247        path: String,
248    },
249    PathNull {
250        path: String,
251    },
252    MetricDetailsNotFound,
253    MetricValueError {
254        path: String,
255        path_value: String,
256    },
257    ParseError {
258        path: String,
259        kind: TransformParseErrorKind,
260    },
261    ParseFloatError {
262        path: String,
263        error: ParseFloatError,
264    },
265    TemplateRenderingError(TemplateRenderingError),
266    PairExpansionError {
267        key: String,
268        value: String,
269        error: serde_json::Error,
270    },
271}
272
273fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
274    template
275        .render_string(event)
276        .map_err(TransformError::TemplateRenderingError)
277}
278
279fn render_tags(
280    tags: &Option<IndexMap<Template, TagConfig>>,
281    event: &Event,
282) -> Result<Option<MetricTags>, TransformError> {
283    let mut static_tags: HashMap<String, String> = HashMap::new();
284    let mut dynamic_tags: HashMap<String, String> = HashMap::new();
285    Ok(match tags {
286        None => None,
287        Some(tags) => {
288            let mut result = MetricTags::default();
289            for (name, config) in tags {
290                match config {
291                    TagConfig::Plain(template) => {
292                        render_tag_into(
293                            event,
294                            name,
295                            template.as_ref(),
296                            &mut result,
297                            &mut static_tags,
298                            &mut dynamic_tags,
299                        )?;
300                    }
301                    TagConfig::Multi(vec) => {
302                        for template in vec {
303                            render_tag_into(
304                                event,
305                                name,
306                                template.as_ref(),
307                                &mut result,
308                                &mut static_tags,
309                                &mut dynamic_tags,
310                            )?;
311                        }
312                    }
313                }
314            }
315            for (k, v) in static_tags {
316                if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
317                    warn!(
318                        "Static tags overrides dynamic tags. \
319                key: {}, value: {:?}, discarded value: {:?}",
320                        k, v, discarded_v
321                    );
322                };
323            }
324            result.as_option()
325        }
326    })
327}
328
329fn render_tag_into(
330    event: &Event,
331    key_template: &Template,
332    value_template: Option<&Template>,
333    result: &mut MetricTags,
334    static_tags: &mut HashMap<String, String>,
335    dynamic_tags: &mut HashMap<String, String>,
336) -> Result<(), TransformError> {
337    let key = match render_template(key_template, event) {
338        Ok(key_s) => key_s,
339        Err(TransformError::TemplateRenderingError(err)) => {
340            emit!(crate::internal_events::TemplateRenderingError {
341                error: err,
342                drop_event: false,
343                field: Some(key_template.get_ref()),
344            });
345            return Ok(());
346        }
347        Err(err) => return Err(err),
348    };
349    match value_template {
350        None => {
351            result.insert(key, TagValue::Bare);
352        }
353        Some(template) => match render_template(template, event) {
354            Ok(value) => {
355                let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
356                    .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
357                result.extend(expanded_pairs);
358            }
359            Err(TransformError::TemplateRenderingError(value_error)) => {
360                emit!(crate::internal_events::TemplateRenderingError {
361                    error: value_error,
362                    drop_event: false,
363                    field: Some(template.get_ref()),
364                });
365                return Ok(());
366            }
367            Err(other) => return Err(other),
368        },
369    };
370    Ok(())
371}
372
373fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
374    let log = event.as_log();
375
376    let timestamp = log
377        .get_timestamp()
378        .and_then(Value::as_timestamp)
379        .cloned()
380        .or_else(|| Some(Utc::now()));
381
382    // Assign the OriginService for the new metric
383    let metadata = event
384        .metadata()
385        .clone()
386        .with_schema_definition(&Arc::new(Definition::any()))
387        .with_origin_metadata(DatadogMetricOriginMetadata::new(
388            None,
389            None,
390            Some(ORIGIN_SERVICE_VALUE),
391        ));
392
393    let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
394        path: config.field().to_string(),
395    })?;
396
397    let value = match log.get(&field) {
398        None => Err(TransformError::PathNotFound {
399            path: field.to_string(),
400        }),
401        Some(Value::Null) => Err(TransformError::PathNull {
402            path: field.to_string(),
403        }),
404        Some(value) => Ok(value),
405    }?;
406
407    let name = config.name.as_ref().unwrap_or(&config.field);
408    let name = render_template(name, event)?;
409
410    let namespace = config.namespace.as_ref();
411    let namespace = namespace
412        .map(|namespace| render_template(namespace, event))
413        .transpose()?;
414
415    let tags = render_tags(&config.tags, event)?;
416
417    let (kind, value) = match &config.metric {
418        MetricTypeConfig::Counter(counter) => {
419            let value = if counter.increment_by_value {
420                value.to_string_lossy().parse().map_err(|error| {
421                    TransformError::ParseFloatError {
422                        path: config.field.get_ref().to_owned(),
423                        error,
424                    }
425                })?
426            } else {
427                1.0
428            };
429
430            (counter.kind, MetricValue::Counter { value })
431        }
432        MetricTypeConfig::Histogram => {
433            let value = value.to_string_lossy().parse().map_err(|error| {
434                TransformError::ParseFloatError {
435                    path: field.to_string(),
436                    error,
437                }
438            })?;
439
440            (
441                MetricKind::Incremental,
442                MetricValue::Distribution {
443                    samples: vector_lib::samples![value => 1],
444                    statistic: StatisticKind::Histogram,
445                },
446            )
447        }
448        MetricTypeConfig::Summary => {
449            let value = value.to_string_lossy().parse().map_err(|error| {
450                TransformError::ParseFloatError {
451                    path: field.to_string(),
452                    error,
453                }
454            })?;
455
456            (
457                MetricKind::Incremental,
458                MetricValue::Distribution {
459                    samples: vector_lib::samples![value => 1],
460                    statistic: StatisticKind::Summary,
461                },
462            )
463        }
464        MetricTypeConfig::Gauge => {
465            let value = value.to_string_lossy().parse().map_err(|error| {
466                TransformError::ParseFloatError {
467                    path: field.to_string(),
468                    error,
469                }
470            })?;
471
472            (MetricKind::Absolute, MetricValue::Gauge { value })
473        }
474        MetricTypeConfig::Set => {
475            let value = value.to_string_lossy().into_owned();
476
477            (
478                MetricKind::Incremental,
479                MetricValue::Set {
480                    values: std::iter::once(value).collect(),
481                },
482            )
483        }
484    };
485    Ok(Metric::new_with_metadata(name, kind, value, metadata)
486        .with_namespace(namespace)
487        .with_tags(tags)
488        .with_timestamp(timestamp))
489}
490
491fn bytes_to_str(value: &Value) -> Option<String> {
492    match value {
493        Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
494        _ => None,
495    }
496}
497
498fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
499    // TODO: update returned errors after `TransformError` is refactored.
500    let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
501        PathParseError::InvalidPathSyntax { path } => PathNotFound {
502            path: path.to_string(),
503        },
504    })?;
505    match maybe_value {
506        None => Err(PathNotFound {
507            path: path.to_string(),
508        }),
509        Some(v) => Ok(bytes_to_str(v)),
510    }
511}
512
513fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
514    let counter_value = log
515        .get(event_path!("counter", "value"))
516        .ok_or_else(|| TransformError::PathNotFound {
517            path: "counter.value".to_string(),
518        })?
519        .as_float()
520        .ok_or_else(|| TransformError::ParseError {
521            path: "counter.value".to_string(),
522            kind: TransformParseErrorKind::FloatError,
523        })?;
524
525    Ok(MetricValue::Counter {
526        value: *counter_value,
527    })
528}
529
530fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
531    let gauge_value = log
532        .get(event_path!("gauge", "value"))
533        .ok_or_else(|| TransformError::PathNotFound {
534            path: "gauge.value".to_string(),
535        })?
536        .as_float()
537        .ok_or_else(|| TransformError::ParseError {
538            path: "gauge.value".to_string(),
539            kind: TransformParseErrorKind::FloatError,
540        })?;
541    Ok(MetricValue::Gauge {
542        value: *gauge_value,
543    })
544}
545
546fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
547    let set_values = log
548        .get(event_path!("set", "values"))
549        .ok_or_else(|| TransformError::PathNotFound {
550            path: "set.values".to_string(),
551        })?
552        .as_array()
553        .ok_or_else(|| TransformError::ParseError {
554            path: "set.values".to_string(),
555            kind: TransformParseErrorKind::ArrayError,
556        })?;
557
558    let mut values: Vec<String> = Vec::new();
559    for e_value in set_values {
560        let value = e_value
561            .as_bytes()
562            .ok_or_else(|| TransformError::ParseError {
563                path: "set.values".to_string(),
564                kind: TransformParseErrorKind::ArrayError,
565            })?;
566        values.push(String::from_utf8_lossy(value).to_string());
567    }
568
569    Ok(MetricValue::Set {
570        values: values.into_iter().collect(),
571    })
572}
573
574fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
575    let event_samples = log
576        .get(event_path!("distribution", "samples"))
577        .ok_or_else(|| TransformError::PathNotFound {
578            path: "distribution.samples".to_string(),
579        })?
580        .as_array()
581        .ok_or_else(|| TransformError::ParseError {
582            path: "distribution.samples".to_string(),
583            kind: TransformParseErrorKind::ArrayError,
584        })?;
585
586    let mut samples: Vec<Sample> = Vec::new();
587    for e_sample in event_samples {
588        let value = e_sample
589            .get(path!("value"))
590            .ok_or_else(|| TransformError::PathNotFound {
591                path: "value".to_string(),
592            })?
593            .as_float()
594            .ok_or_else(|| TransformError::ParseError {
595                path: "value".to_string(),
596                kind: TransformParseErrorKind::FloatError,
597            })?;
598
599        let rate = e_sample
600            .get(path!("rate"))
601            .ok_or_else(|| TransformError::PathNotFound {
602                path: "rate".to_string(),
603            })?
604            .as_integer()
605            .ok_or_else(|| TransformError::ParseError {
606                path: "rate".to_string(),
607                kind: TransformParseErrorKind::IntError,
608            })?;
609
610        samples.push(Sample {
611            value: *value,
612            rate: rate as u32,
613        });
614    }
615
616    let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
617        Some(n) => n,
618        None => {
619            return Err(TransformError::PathNotFound {
620                path: "distribution.statistic".to_string(),
621            });
622        }
623    };
624    let statistic_kind = match statistic_str.as_str() {
625        "histogram" => Ok(StatisticKind::Histogram),
626        "summary" => Ok(StatisticKind::Summary),
627        _ => Err(TransformError::MetricValueError {
628            path: "distribution.statistic".to_string(),
629            path_value: statistic_str.to_string(),
630        }),
631    }?;
632
633    Ok(MetricValue::Distribution {
634        samples,
635        statistic: statistic_kind,
636    })
637}
638
639fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
640    let event_buckets = log
641        .get(event_path!("histogram", "buckets"))
642        .ok_or_else(|| TransformError::PathNotFound {
643            path: "histogram.buckets".to_string(),
644        })?
645        .as_array()
646        .ok_or_else(|| TransformError::ParseError {
647            path: "histogram.buckets".to_string(),
648            kind: TransformParseErrorKind::ArrayError,
649        })?;
650
651    let mut buckets: Vec<Bucket> = Vec::new();
652    for e_bucket in event_buckets {
653        let upper_limit = e_bucket
654            .get(path!("upper_limit"))
655            .ok_or_else(|| TransformError::PathNotFound {
656                path: "histogram.buckets.upper_limit".to_string(),
657            })?
658            .as_float()
659            .ok_or_else(|| TransformError::ParseError {
660                path: "histogram.buckets.upper_limit".to_string(),
661                kind: TransformParseErrorKind::FloatError,
662            })?;
663
664        let count = e_bucket
665            .get(path!("count"))
666            .ok_or_else(|| TransformError::PathNotFound {
667                path: "histogram.buckets.count".to_string(),
668            })?
669            .as_integer()
670            .ok_or_else(|| TransformError::ParseError {
671                path: "histogram.buckets.count".to_string(),
672                kind: TransformParseErrorKind::IntError,
673            })?;
674
675        buckets.push(Bucket {
676            upper_limit: *upper_limit,
677            count: count as u64,
678        });
679    }
680
681    let count = log
682        .get(event_path!("histogram", "count"))
683        .ok_or_else(|| TransformError::PathNotFound {
684            path: "histogram.count".to_string(),
685        })?
686        .as_integer()
687        .ok_or_else(|| TransformError::ParseError {
688            path: "histogram.count".to_string(),
689            kind: TransformParseErrorKind::IntError,
690        })?;
691
692    let sum = log
693        .get(event_path!("histogram", "sum"))
694        .ok_or_else(|| TransformError::PathNotFound {
695            path: "histogram.sum".to_string(),
696        })?
697        .as_float()
698        .ok_or_else(|| TransformError::ParseError {
699            path: "histogram.sum".to_string(),
700            kind: TransformParseErrorKind::FloatError,
701        })?;
702
703    Ok(MetricValue::AggregatedHistogram {
704        buckets,
705        count: count as u64,
706        sum: *sum,
707    })
708}
709
710fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
711    let event_quantiles = log
712        .get(event_path!("summary", "quantiles"))
713        .ok_or_else(|| TransformError::PathNotFound {
714            path: "summary.quantiles".to_string(),
715        })?
716        .as_array()
717        .ok_or_else(|| TransformError::ParseError {
718            path: "summary.quantiles".to_string(),
719            kind: TransformParseErrorKind::ArrayError,
720        })?;
721
722    let mut quantiles: Vec<Quantile> = Vec::new();
723    for e_quantile in event_quantiles {
724        let quantile = e_quantile
725            .get(path!("quantile"))
726            .ok_or_else(|| TransformError::PathNotFound {
727                path: "summary.quantiles.quantile".to_string(),
728            })?
729            .as_float()
730            .ok_or_else(|| TransformError::ParseError {
731                path: "summary.quantiles.quantile".to_string(),
732                kind: TransformParseErrorKind::FloatError,
733            })?;
734
735        let value = e_quantile
736            .get(path!("value"))
737            .ok_or_else(|| TransformError::PathNotFound {
738                path: "summary.quantiles.value".to_string(),
739            })?
740            .as_float()
741            .ok_or_else(|| TransformError::ParseError {
742                path: "summary.quantiles.value".to_string(),
743                kind: TransformParseErrorKind::FloatError,
744            })?;
745
746        quantiles.push(Quantile {
747            quantile: *quantile,
748            value: *value,
749        })
750    }
751
752    let count = log
753        .get(event_path!("summary", "count"))
754        .ok_or_else(|| TransformError::PathNotFound {
755            path: "summary.count".to_string(),
756        })?
757        .as_integer()
758        .ok_or_else(|| TransformError::ParseError {
759            path: "summary.count".to_string(),
760            kind: TransformParseErrorKind::IntError,
761        })?;
762
763    let sum = log
764        .get(event_path!("summary", "sum"))
765        .ok_or_else(|| TransformError::PathNotFound {
766            path: "summary.sum".to_string(),
767        })?
768        .as_float()
769        .ok_or_else(|| TransformError::ParseError {
770            path: "summary.sum".to_string(),
771            kind: TransformParseErrorKind::FloatError,
772        })?;
773
774    Ok(MetricValue::AggregatedSummary {
775        quantiles,
776        count: count as u64,
777        sum: *sum,
778    })
779}
780
781fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
782    let log = event.as_log();
783    let timestamp = log
784        .get_timestamp()
785        .and_then(Value::as_timestamp)
786        .cloned()
787        .or_else(|| Some(Utc::now()));
788
789    let name = match try_get_string_from_log(log, "name")? {
790        Some(n) => n,
791        None => {
792            return Err(TransformError::PathNotFound {
793                path: "name".to_string(),
794            });
795        }
796    };
797
798    let mut tags = MetricTags::default();
799
800    if let Some(els) = log.get(event_path!("tags"))
801        && let Some(el) = els.as_object()
802    {
803        for (key, value) in el {
804            tags.insert(key.to_string(), bytes_to_str(value));
805        }
806    }
807    let tags_result = Some(tags);
808
809    let kind_str = match try_get_string_from_log(log, "kind")? {
810        Some(n) => n,
811        None => {
812            return Err(TransformError::PathNotFound {
813                path: "kind".to_string(),
814            });
815        }
816    };
817
818    let kind = match kind_str.as_str() {
819        "absolute" => Ok(MetricKind::Absolute),
820        "incremental" => Ok(MetricKind::Incremental),
821        value => Err(TransformError::MetricValueError {
822            path: "kind".to_string(),
823            path_value: value.to_string(),
824        }),
825    }?;
826
827    let mut value: Option<MetricValue> = None;
828    if let Some(root_event) = log.as_map() {
829        for key in root_event.keys() {
830            value = match key.as_str() {
831                "gauge" => Some(get_gauge_value(log)?),
832                "distribution" => Some(get_distribution_value(log)?),
833                "histogram" => Some(get_histogram_value(log)?),
834                "summary" => Some(get_summary_value(log)?),
835                "counter" => Some(get_counter_value(log)?),
836                "set" => Some(get_set_value(log)?),
837                _ => None,
838            };
839
840            if value.is_some() {
841                break;
842            }
843        }
844    }
845
846    let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
847
848    let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
849        .with_tags(tags_result)
850        .with_timestamp(timestamp);
851
852    if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
853        metric = metric.with_namespace(namespace);
854    }
855
856    Ok(metric)
857}
858
859impl FunctionTransform for LogToMetric {
860    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
861        // Metrics are "all or none" for a specific log. If a single fails, none are produced.
862        let mut buffer = Vec::with_capacity(self.metrics.len());
863        if self.all_metrics {
864            match to_metrics(&event) {
865                Ok(metric) => {
866                    output.push(Event::Metric(metric));
867                }
868                Err(err) => {
869                    match err {
870                        TransformError::MetricValueError { path, path_value } => {
871                            emit!(MetricMetadataInvalidFieldValueError {
872                                field: path.as_ref(),
873                                field_value: path_value.as_ref()
874                            })
875                        }
876                        TransformError::PathNotFound { path } => {
877                            emit!(ParserMissingFieldError::<DROP_EVENT> {
878                                field: path.as_ref()
879                            })
880                        }
881                        TransformError::ParseError { path, kind } => {
882                            emit!(MetricMetadataParseError {
883                                field: path.as_ref(),
884                                kind: &kind.to_string(),
885                            })
886                        }
887                        TransformError::MetricDetailsNotFound => {
888                            emit!(MetricMetadataMetricDetailsNotFoundError {})
889                        }
890                        TransformError::PairExpansionError { key, value, error } => {
891                            emit!(crate::internal_events::PairExpansionError {
892                                key: &key,
893                                value: &value,
894                                drop_event: true,
895                                error
896                            })
897                        }
898                        _ => {}
899                    };
900                }
901            }
902        } else {
903            for config in self.metrics.iter() {
904                match to_metric_with_config(config, &event) {
905                    Ok(metric) => {
906                        buffer.push(Event::Metric(metric));
907                    }
908                    Err(err) => {
909                        match err {
910                            TransformError::PathNull { path } => {
911                                emit!(LogToMetricFieldNullError {
912                                    field: path.as_ref()
913                                })
914                            }
915                            TransformError::PathNotFound { path } => {
916                                emit!(ParserMissingFieldError::<DROP_EVENT> {
917                                    field: path.as_ref()
918                                })
919                            }
920                            TransformError::ParseFloatError { path, error } => {
921                                emit!(LogToMetricParseFloatError {
922                                    field: path.as_ref(),
923                                    error
924                                })
925                            }
926                            TransformError::TemplateRenderingError(error) => {
927                                emit!(crate::internal_events::TemplateRenderingError {
928                                    error,
929                                    drop_event: true,
930                                    field: None,
931                                })
932                            }
933                            TransformError::PairExpansionError { key, value, error } => {
934                                emit!(crate::internal_events::PairExpansionError {
935                                    key: &key,
936                                    value: &value,
937                                    drop_event: true,
938                                    error
939                                })
940                            }
941                            _ => {}
942                        };
943                        // early return to prevent the partial buffer from being sent
944                        return;
945                    }
946                }
947            }
948        }
949
950        // Metric generation was successful, publish them all.
951        for event in buffer {
952            output.push(event);
953        }
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use std::{sync::Arc, time::Duration};
960
961    use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
962    use tokio::sync::mpsc;
963    use tokio_stream::wrappers::ReceiverStream;
964    use vector_lib::{
965        config::ComponentKey,
966        event::{EventMetadata, ObjectMap},
967        metric_tags,
968    };
969
970    use super::*;
971    use crate::{
972        config::log_schema,
973        event::{
974            Event, LogEvent,
975            metric::{Metric, MetricKind, MetricValue, StatisticKind},
976        },
977        test_util::components::assert_transform_compliance,
978        transforms::test::create_topology,
979    };
980
981    #[test]
982    fn generate_config() {
983        crate::test_util::test_generate_config::<LogToMetricConfig>();
984    }
985
986    fn parse_config(s: &str) -> LogToMetricConfig {
987        toml::from_str(s).unwrap()
988    }
989
990    fn parse_yaml_config(s: &str) -> LogToMetricConfig {
991        serde_yaml::from_str(s).unwrap()
992    }
993
994    fn ts() -> DateTime<Utc> {
995        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
996            .single()
997            .and_then(|t| t.with_nanosecond(11))
998            .expect("invalid timestamp")
999    }
1000
1001    fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1002        let mut log = Event::Log(LogEvent::from("i am a log"));
1003        log.as_mut_log().insert(key, value);
1004        log.as_mut_log()
1005            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1006        log
1007    }
1008
1009    async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1010        assert_transform_compliance(async move {
1011            let (tx, rx) = mpsc::channel(1);
1012            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1013            tx.send(event).await.unwrap();
1014            let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1015                .await
1016                .unwrap_or(None);
1017            drop(tx);
1018            topology.stop().await;
1019            assert_eq!(out.recv().await, None);
1020            result
1021        })
1022        .await
1023    }
1024
1025    async fn do_transform_multiple_events(
1026        config: LogToMetricConfig,
1027        event: Event,
1028        count: usize,
1029    ) -> Vec<Event> {
1030        assert_transform_compliance(async move {
1031            let (tx, rx) = mpsc::channel(1);
1032            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1033            tx.send(event).await.unwrap();
1034
1035            let mut results = vec![];
1036            for _ in 0..count {
1037                let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1038                    .await
1039                    .unwrap_or(None);
1040                if let Some(event) = result {
1041                    results.push(event);
1042                }
1043            }
1044
1045            drop(tx);
1046            topology.stop().await;
1047            assert_eq!(out.recv().await, None);
1048            results
1049        })
1050        .await
1051    }
1052
1053    #[tokio::test]
1054    async fn count_http_status_codes() {
1055        let config = parse_config(
1056            r#"
1057            [[metrics]]
1058            type = "counter"
1059            field = "status"
1060            "#,
1061        );
1062
1063        let event = create_event("status", "42");
1064        let mut metadata =
1065            event
1066                .metadata()
1067                .clone()
1068                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1069                    None,
1070                    None,
1071                    Some(ORIGIN_SERVICE_VALUE),
1072                ));
1073        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1074        metadata.set_schema_definition(&Arc::new(Definition::any()));
1075        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1076        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1077        let metric = do_transform(config, event).await.unwrap();
1078
1079        assert_eq!(
1080            metric.into_metric(),
1081            Metric::new_with_metadata(
1082                "status",
1083                MetricKind::Incremental,
1084                MetricValue::Counter { value: 1.0 },
1085                metadata,
1086            )
1087            .with_timestamp(Some(ts()))
1088        );
1089    }
1090
1091    #[tokio::test]
1092    async fn count_http_requests_with_tags() {
1093        let config = parse_config(
1094            r#"
1095            [[metrics]]
1096            type = "counter"
1097            field = "message"
1098            name = "http_requests_total"
1099            namespace = "app"
1100            tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1101            "#,
1102        );
1103
1104        let mut event = create_event("message", "i am log");
1105        event.as_mut_log().insert("method", "post");
1106        event.as_mut_log().insert("code", "200");
1107        let mut metadata =
1108            event
1109                .metadata()
1110                .clone()
1111                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1112                    None,
1113                    None,
1114                    Some(ORIGIN_SERVICE_VALUE),
1115                ));
1116        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1117        metadata.set_schema_definition(&Arc::new(Definition::any()));
1118        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1119        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1120
1121        let metric = do_transform(config, event).await.unwrap();
1122
1123        assert_eq!(
1124            metric.into_metric(),
1125            Metric::new_with_metadata(
1126                "http_requests_total",
1127                MetricKind::Incremental,
1128                MetricValue::Counter { value: 1.0 },
1129                metadata,
1130            )
1131            .with_namespace(Some("app"))
1132            .with_tags(Some(metric_tags!(
1133                "method" => "post",
1134                "code" => "200",
1135                "host" => "localhost",
1136            )))
1137            .with_timestamp(Some(ts()))
1138        );
1139    }
1140
1141    #[tokio::test]
1142    async fn count_http_requests_with_tags_expansion() {
1143        let config = parse_config(
1144            r#"
1145            [[metrics]]
1146            type = "counter"
1147            field = "message"
1148            name = "http_requests_total"
1149            namespace = "app"
1150            tags = {"*" = "{{ dict }}"}
1151            "#,
1152        );
1153
1154        let mut event = create_event("message", "i am log");
1155        let log = event.as_mut_log();
1156
1157        let mut test_dict = ObjectMap::default();
1158        test_dict.insert("one".into(), Value::from("foo"));
1159        test_dict.insert("two".into(), Value::from("baz"));
1160        log.insert("dict", Value::from(test_dict));
1161
1162        let mut metadata =
1163            event
1164                .metadata()
1165                .clone()
1166                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1167                    None,
1168                    None,
1169                    Some(ORIGIN_SERVICE_VALUE),
1170                ));
1171        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1172        metadata.set_schema_definition(&Arc::new(Definition::any()));
1173        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1174        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1175
1176        let metric = do_transform(config, event).await.unwrap();
1177
1178        assert_eq!(
1179            metric.into_metric(),
1180            Metric::new_with_metadata(
1181                "http_requests_total",
1182                MetricKind::Incremental,
1183                MetricValue::Counter { value: 1.0 },
1184                metadata,
1185            )
1186            .with_namespace(Some("app"))
1187            .with_tags(Some(metric_tags!(
1188                "one" => "foo",
1189                "two" => "baz",
1190            )))
1191            .with_timestamp(Some(ts()))
1192        );
1193    }
1194    #[tokio::test]
1195    async fn count_http_requests_with_colliding_dynamic_tags() {
1196        let config = parse_config(
1197            r#"
1198            [[metrics]]
1199            type = "counter"
1200            field = "message"
1201            name = "http_requests_total"
1202            namespace = "app"
1203            tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1204            "#,
1205        );
1206
1207        let mut event = create_event("message", "i am log");
1208        let log = event.as_mut_log();
1209
1210        let mut map1 = ObjectMap::default();
1211        map1.insert("key1".into(), Value::from("val1"));
1212        log.insert("map1", Value::from(map1));
1213
1214        let mut map2 = ObjectMap::default();
1215        map2.insert("l1_key1".into(), Value::from("val2"));
1216        log.insert("map2", Value::from(map2));
1217
1218        let mut metadata =
1219            event
1220                .metadata()
1221                .clone()
1222                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1223                    None,
1224                    None,
1225                    Some(ORIGIN_SERVICE_VALUE),
1226                ));
1227        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1228        metadata.set_schema_definition(&Arc::new(Definition::any()));
1229        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1230        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1231
1232        let metric = do_transform(config, event).await.unwrap().into_metric();
1233        let tags = metric.tags().expect("Metric should have tags");
1234
1235        assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1236
1237        assert_eq!(tags.iter_all().count(), 2);
1238        for (name, value) in tags.iter_all() {
1239            assert_eq!(name, "l1_key1");
1240            assert!(value == Some("val1") || value == Some("val2"));
1241        }
1242    }
1243    #[tokio::test]
1244    async fn multi_value_tags_yaml() {
1245        // Have to use YAML to represent bare tags
1246        let config = parse_yaml_config(
1247            r#"
1248            metrics:
1249            - field: "message"
1250              type: "counter"
1251              tags:
1252                tag:
1253                - "one"
1254                - null
1255                - "two"
1256            "#,
1257        );
1258
1259        let event = create_event("message", "I am log");
1260        let metric = do_transform(config, event).await.unwrap().into_metric();
1261        let tags = metric.tags().expect("Metric should have tags");
1262
1263        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1264
1265        assert_eq!(tags.iter_all().count(), 3);
1266        for (name, value) in tags.iter_all() {
1267            assert_eq!(name, "tag");
1268            assert!(value.is_none() || value == Some("one") || value == Some("two"));
1269        }
1270    }
1271    #[tokio::test]
1272    async fn multi_value_tags_expansion_yaml() {
1273        // Have to use YAML to represent bare tags
1274        let config = parse_yaml_config(
1275            r#"
1276            metrics:
1277            - field: "message"
1278              type: "counter"
1279              tags:
1280                "*": "{{dict}}"
1281            "#,
1282        );
1283
1284        let mut event = create_event("message", "I am log");
1285        let log = event.as_mut_log();
1286
1287        let mut test_dict = ObjectMap::default();
1288        test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1289        log.insert("dict", Value::from(test_dict));
1290
1291        let metric = do_transform(config, event).await.unwrap().into_metric();
1292        let tags = metric.tags().expect("Metric should have tags");
1293
1294        assert_eq!(
1295            tags.iter_single().collect::<Vec<_>>(),
1296            vec![("one", "[\"foo\",\"baz\"]")]
1297        );
1298
1299        assert_eq!(tags.iter_all().count(), 1);
1300        for (name, value) in tags.iter_all() {
1301            assert_eq!(name, "one");
1302            assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1303        }
1304    }
1305
1306    #[tokio::test]
1307    async fn multi_value_tags_toml() {
1308        let config = parse_config(
1309            r#"
1310            [[metrics]]
1311            field = "message"
1312            type = "counter"
1313            [metrics.tags]
1314            tag = ["one", "two"]
1315            "#,
1316        );
1317
1318        let event = create_event("message", "I am log");
1319        let metric = do_transform(config, event).await.unwrap().into_metric();
1320        let tags = metric.tags().expect("Metric should have tags");
1321
1322        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1323
1324        assert_eq!(tags.iter_all().count(), 2);
1325        for (name, value) in tags.iter_all() {
1326            assert_eq!(name, "tag");
1327            assert!(value == Some("one") || value == Some("two"));
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn count_exceptions() {
1333        let config = parse_config(
1334            r#"
1335            [[metrics]]
1336            type = "counter"
1337            field = "backtrace"
1338            name = "exception_total"
1339            "#,
1340        );
1341
1342        let event = create_event("backtrace", "message");
1343        let mut metadata =
1344            event
1345                .metadata()
1346                .clone()
1347                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1348                    None,
1349                    None,
1350                    Some(ORIGIN_SERVICE_VALUE),
1351                ));
1352        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1353        metadata.set_schema_definition(&Arc::new(Definition::any()));
1354        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1355        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1356
1357        let metric = do_transform(config, event).await.unwrap();
1358
1359        assert_eq!(
1360            metric.into_metric(),
1361            Metric::new_with_metadata(
1362                "exception_total",
1363                MetricKind::Incremental,
1364                MetricValue::Counter { value: 1.0 },
1365                metadata
1366            )
1367            .with_timestamp(Some(ts()))
1368        );
1369    }
1370
1371    #[tokio::test]
1372    async fn count_exceptions_no_match() {
1373        let config = parse_config(
1374            r#"
1375            [[metrics]]
1376            type = "counter"
1377            field = "backtrace"
1378            name = "exception_total"
1379            "#,
1380        );
1381
1382        let event = create_event("success", "42");
1383        assert_eq!(do_transform(config, event).await, None);
1384    }
1385
1386    #[tokio::test]
1387    async fn sum_order_amounts() {
1388        let config = parse_config(
1389            r#"
1390            [[metrics]]
1391            type = "counter"
1392            field = "amount"
1393            name = "amount_total"
1394            increment_by_value = true
1395            "#,
1396        );
1397
1398        let event = create_event("amount", "33.99");
1399        let mut metadata =
1400            event
1401                .metadata()
1402                .clone()
1403                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1404                    None,
1405                    None,
1406                    Some(ORIGIN_SERVICE_VALUE),
1407                ));
1408        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1409        metadata.set_schema_definition(&Arc::new(Definition::any()));
1410        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1411        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1412        let metric = do_transform(config, event).await.unwrap();
1413
1414        assert_eq!(
1415            metric.into_metric(),
1416            Metric::new_with_metadata(
1417                "amount_total",
1418                MetricKind::Incremental,
1419                MetricValue::Counter { value: 33.99 },
1420                metadata,
1421            )
1422            .with_timestamp(Some(ts()))
1423        );
1424    }
1425
1426    #[tokio::test]
1427    async fn count_absolute() {
1428        let config = parse_config(
1429            r#"
1430            [[metrics]]
1431            type = "counter"
1432            field = "amount"
1433            name = "amount_total"
1434            increment_by_value = true
1435            kind = "absolute"
1436            "#,
1437        );
1438
1439        let event = create_event("amount", "33.99");
1440        let mut metadata =
1441            event
1442                .metadata()
1443                .clone()
1444                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1445                    None,
1446                    None,
1447                    Some(ORIGIN_SERVICE_VALUE),
1448                ));
1449        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1450        metadata.set_schema_definition(&Arc::new(Definition::any()));
1451        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1452        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1453
1454        let metric = do_transform(config, event).await.unwrap();
1455
1456        assert_eq!(
1457            metric.into_metric(),
1458            Metric::new_with_metadata(
1459                "amount_total",
1460                MetricKind::Absolute,
1461                MetricValue::Counter { value: 33.99 },
1462                metadata,
1463            )
1464            .with_timestamp(Some(ts()))
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn memory_usage_gauge() {
1470        let config = parse_config(
1471            r#"
1472            [[metrics]]
1473            type = "gauge"
1474            field = "memory_rss"
1475            name = "memory_rss_bytes"
1476            "#,
1477        );
1478
1479        let event = create_event("memory_rss", "123");
1480        let mut metadata =
1481            event
1482                .metadata()
1483                .clone()
1484                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1485                    None,
1486                    None,
1487                    Some(ORIGIN_SERVICE_VALUE),
1488                ));
1489
1490        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1491        metadata.set_schema_definition(&Arc::new(Definition::any()));
1492
1493        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1494        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1495
1496        let metric = do_transform(config, event).await.unwrap();
1497
1498        assert_eq!(
1499            metric.into_metric(),
1500            Metric::new_with_metadata(
1501                "memory_rss_bytes",
1502                MetricKind::Absolute,
1503                MetricValue::Gauge { value: 123.0 },
1504                metadata,
1505            )
1506            .with_timestamp(Some(ts()))
1507        );
1508    }
1509
1510    #[tokio::test]
1511    async fn parse_failure() {
1512        let config = parse_config(
1513            r#"
1514            [[metrics]]
1515            type = "counter"
1516            field = "status"
1517            name = "status_total"
1518            increment_by_value = true
1519            "#,
1520        );
1521
1522        let event = create_event("status", "not a number");
1523        assert_eq!(do_transform(config, event).await, None);
1524    }
1525
1526    #[tokio::test]
1527    async fn missing_field() {
1528        let config = parse_config(
1529            r#"
1530            [[metrics]]
1531            type = "counter"
1532            field = "status"
1533            name = "status_total"
1534            "#,
1535        );
1536
1537        let event = create_event("not foo", "not a number");
1538        assert_eq!(do_transform(config, event).await, None);
1539    }
1540
1541    #[tokio::test]
1542    async fn null_field() {
1543        let config = parse_config(
1544            r#"
1545            [[metrics]]
1546            type = "counter"
1547            field = "status"
1548            name = "status_total"
1549            "#,
1550        );
1551
1552        let event = create_event("status", Value::Null);
1553        assert_eq!(do_transform(config, event).await, None);
1554    }
1555
1556    #[tokio::test]
1557    async fn multiple_metrics() {
1558        let config = parse_config(
1559            r#"
1560            [[metrics]]
1561            type = "counter"
1562            field = "status"
1563
1564            [[metrics]]
1565            type = "counter"
1566            field = "backtrace"
1567            name = "exception_total"
1568            "#,
1569        );
1570
1571        let mut event = Event::Log(LogEvent::from("i am a log"));
1572        event
1573            .as_mut_log()
1574            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1575        event.as_mut_log().insert("status", "42");
1576        event.as_mut_log().insert("backtrace", "message");
1577        let mut metadata =
1578            event
1579                .metadata()
1580                .clone()
1581                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1582                    None,
1583                    None,
1584                    Some(ORIGIN_SERVICE_VALUE),
1585                ));
1586
1587        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1588        metadata.set_schema_definition(&Arc::new(Definition::any()));
1589        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1590        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1591
1592        let output = do_transform_multiple_events(config, event, 2).await;
1593
1594        assert_eq!(2, output.len());
1595        assert_eq!(
1596            output[0].clone().into_metric(),
1597            Metric::new_with_metadata(
1598                "status",
1599                MetricKind::Incremental,
1600                MetricValue::Counter { value: 1.0 },
1601                metadata.clone(),
1602            )
1603            .with_timestamp(Some(ts()))
1604        );
1605        assert_eq!(
1606            output[1].clone().into_metric(),
1607            Metric::new_with_metadata(
1608                "exception_total",
1609                MetricKind::Incremental,
1610                MetricValue::Counter { value: 1.0 },
1611                metadata,
1612            )
1613            .with_timestamp(Some(ts()))
1614        );
1615    }
1616
1617    #[tokio::test]
1618    async fn multiple_metrics_with_multiple_templates() {
1619        let config = parse_config(
1620            r#"
1621            [[metrics]]
1622            type = "set"
1623            field = "status"
1624            name = "{{host}}_{{worker}}_status_set"
1625
1626            [[metrics]]
1627            type = "counter"
1628            field = "backtrace"
1629            name = "{{service}}_exception_total"
1630            namespace = "{{host}}"
1631            "#,
1632        );
1633
1634        let mut event = Event::Log(LogEvent::from("i am a log"));
1635        event
1636            .as_mut_log()
1637            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1638        event.as_mut_log().insert("status", "42");
1639        event.as_mut_log().insert("backtrace", "message");
1640        event.as_mut_log().insert("host", "local");
1641        event.as_mut_log().insert("worker", "abc");
1642        event.as_mut_log().insert("service", "xyz");
1643        let mut metadata =
1644            event
1645                .metadata()
1646                .clone()
1647                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1648                    None,
1649                    None,
1650                    Some(ORIGIN_SERVICE_VALUE),
1651                ));
1652
1653        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1654        metadata.set_schema_definition(&Arc::new(Definition::any()));
1655        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1656        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1657
1658        let output = do_transform_multiple_events(config, event, 2).await;
1659
1660        assert_eq!(2, output.len());
1661        assert_eq!(
1662            output[0].as_metric(),
1663            &Metric::new_with_metadata(
1664                "local_abc_status_set",
1665                MetricKind::Incremental,
1666                MetricValue::Set {
1667                    values: vec!["42".into()].into_iter().collect()
1668                },
1669                metadata.clone(),
1670            )
1671            .with_timestamp(Some(ts()))
1672        );
1673        assert_eq!(
1674            output[1].as_metric(),
1675            &Metric::new_with_metadata(
1676                "xyz_exception_total",
1677                MetricKind::Incremental,
1678                MetricValue::Counter { value: 1.0 },
1679                metadata,
1680            )
1681            .with_namespace(Some("local"))
1682            .with_timestamp(Some(ts()))
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn user_ip_set() {
1688        let config = parse_config(
1689            r#"
1690            [[metrics]]
1691            type = "set"
1692            field = "user_ip"
1693            name = "unique_user_ip"
1694            "#,
1695        );
1696
1697        let event = create_event("user_ip", "1.2.3.4");
1698        let mut metadata =
1699            event
1700                .metadata()
1701                .clone()
1702                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1703                    None,
1704                    None,
1705                    Some(ORIGIN_SERVICE_VALUE),
1706                ));
1707        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1708        metadata.set_schema_definition(&Arc::new(Definition::any()));
1709        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1710        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1711
1712        let metric = do_transform(config, event).await.unwrap();
1713
1714        assert_eq!(
1715            metric.into_metric(),
1716            Metric::new_with_metadata(
1717                "unique_user_ip",
1718                MetricKind::Incremental,
1719                MetricValue::Set {
1720                    values: vec!["1.2.3.4".into()].into_iter().collect()
1721                },
1722                metadata,
1723            )
1724            .with_timestamp(Some(ts()))
1725        );
1726    }
1727
1728    #[tokio::test]
1729    async fn response_time_histogram() {
1730        let config = parse_config(
1731            r#"
1732            [[metrics]]
1733            type = "histogram"
1734            field = "response_time"
1735            "#,
1736        );
1737
1738        let event = create_event("response_time", "2.5");
1739        let mut metadata =
1740            event
1741                .metadata()
1742                .clone()
1743                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1744                    None,
1745                    None,
1746                    Some(ORIGIN_SERVICE_VALUE),
1747                ));
1748
1749        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1750        metadata.set_schema_definition(&Arc::new(Definition::any()));
1751        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1752        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1753
1754        let metric = do_transform(config, event).await.unwrap();
1755
1756        assert_eq!(
1757            metric.into_metric(),
1758            Metric::new_with_metadata(
1759                "response_time",
1760                MetricKind::Incremental,
1761                MetricValue::Distribution {
1762                    samples: vector_lib::samples![2.5 => 1],
1763                    statistic: StatisticKind::Histogram
1764                },
1765                metadata
1766            )
1767            .with_timestamp(Some(ts()))
1768        );
1769    }
1770
1771    #[tokio::test]
1772    async fn response_time_summary() {
1773        let config = parse_config(
1774            r#"
1775            [[metrics]]
1776            type = "summary"
1777            field = "response_time"
1778            "#,
1779        );
1780
1781        let event = create_event("response_time", "2.5");
1782        let mut metadata =
1783            event
1784                .metadata()
1785                .clone()
1786                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787                    None,
1788                    None,
1789                    Some(ORIGIN_SERVICE_VALUE),
1790                ));
1791
1792        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1793        metadata.set_schema_definition(&Arc::new(Definition::any()));
1794        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1795        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1796
1797        let metric = do_transform(config, event).await.unwrap();
1798
1799        assert_eq!(
1800            metric.into_metric(),
1801            Metric::new_with_metadata(
1802                "response_time",
1803                MetricKind::Incremental,
1804                MetricValue::Distribution {
1805                    samples: vector_lib::samples![2.5 => 1],
1806                    statistic: StatisticKind::Summary
1807                },
1808                metadata
1809            )
1810            .with_timestamp(Some(ts()))
1811        );
1812    }
1813
1814    //  Metric Metadata Tests
1815    //
1816    fn create_log_event(json_str: &str) -> Event {
1817        create_log_event_with_namespace(json_str, Some("test_namespace"))
1818    }
1819
1820    fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1821        let mut log_value: Value =
1822            serde_json::from_str(json_str).expect("JSON was not well-formatted");
1823        log_value.insert("timestamp", ts());
1824
1825        if let Some(namespace) = namespace {
1826            log_value.insert("namespace", namespace);
1827        }
1828
1829        let mut metadata = EventMetadata::default();
1830        metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1831        metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1832
1833        Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1834    }
1835
1836    #[tokio::test]
1837    async fn transform_gauge() {
1838        let config = LogToMetricConfig {
1839            metrics: None,
1840            all_metrics: Some(true),
1841        };
1842
1843        let json_str = r#"{
1844          "gauge": {
1845            "value": 990.0
1846          },
1847          "kind": "absolute",
1848          "name": "test.transform.gauge",
1849          "tags": {
1850            "env": "test_env",
1851            "host": "localhost"
1852          }
1853        }"#;
1854        let log = create_log_event(json_str);
1855        let metric = do_transform(config, log.clone()).await.unwrap();
1856        assert_eq!(
1857            *metric.as_metric(),
1858            Metric::new_with_metadata(
1859                "test.transform.gauge",
1860                MetricKind::Absolute,
1861                MetricValue::Gauge { value: 990.0 },
1862                metric.metadata().clone(),
1863            )
1864            .with_namespace(Some("test_namespace"))
1865            .with_tags(Some(metric_tags!(
1866                "env" => "test_env",
1867                "host" => "localhost",
1868            )))
1869            .with_timestamp(Some(ts()))
1870        );
1871    }
1872
1873    #[tokio::test]
1874    async fn transform_histogram() {
1875        let config = LogToMetricConfig {
1876            metrics: None,
1877            all_metrics: Some(true),
1878        };
1879
1880        let json_str = r#"{
1881          "histogram": {
1882            "sum": 18.0,
1883            "count": 5,
1884            "buckets": [
1885              {
1886                "upper_limit": 1.0,
1887                "count": 1
1888              },
1889              {
1890                "upper_limit": 2.0,
1891                "count": 2
1892              },
1893              {
1894                "upper_limit": 5.0,
1895                "count": 1
1896              },
1897              {
1898                "upper_limit": 10.0,
1899                "count": 1
1900              }
1901            ]
1902          },
1903          "kind": "absolute",
1904          "name": "test.transform.histogram",
1905          "tags": {
1906            "env": "test_env",
1907            "host": "localhost"
1908          }
1909        }"#;
1910        let log = create_log_event(json_str);
1911        let metric = do_transform(config, log.clone()).await.unwrap();
1912        assert_eq!(
1913            *metric.as_metric(),
1914            Metric::new_with_metadata(
1915                "test.transform.histogram",
1916                MetricKind::Absolute,
1917                MetricValue::AggregatedHistogram {
1918                    count: 5,
1919                    sum: 18.0,
1920                    buckets: vec![
1921                        Bucket {
1922                            upper_limit: 1.0,
1923                            count: 1,
1924                        },
1925                        Bucket {
1926                            upper_limit: 2.0,
1927                            count: 2,
1928                        },
1929                        Bucket {
1930                            upper_limit: 5.0,
1931                            count: 1,
1932                        },
1933                        Bucket {
1934                            upper_limit: 10.0,
1935                            count: 1,
1936                        },
1937                    ],
1938                },
1939                metric.metadata().clone(),
1940            )
1941            .with_namespace(Some("test_namespace"))
1942            .with_tags(Some(metric_tags!(
1943                "env" => "test_env",
1944                "host" => "localhost",
1945            )))
1946            .with_timestamp(Some(ts()))
1947        );
1948    }
1949
1950    #[tokio::test]
1951    async fn transform_distribution_histogram() {
1952        let config = LogToMetricConfig {
1953            metrics: None,
1954            all_metrics: Some(true),
1955        };
1956
1957        let json_str = r#"{
1958          "distribution": {
1959            "samples": [
1960              {
1961                "value": 1.0,
1962                "rate": 1
1963              },
1964              {
1965                "value": 2.0,
1966                "rate": 2
1967              }
1968            ],
1969            "statistic": "histogram"
1970          },
1971          "kind": "absolute",
1972          "name": "test.transform.distribution_histogram",
1973          "tags": {
1974            "env": "test_env",
1975            "host": "localhost"
1976          }
1977        }"#;
1978        let log = create_log_event(json_str);
1979        let metric = do_transform(config, log.clone()).await.unwrap();
1980        assert_eq!(
1981            *metric.as_metric(),
1982            Metric::new_with_metadata(
1983                "test.transform.distribution_histogram",
1984                MetricKind::Absolute,
1985                MetricValue::Distribution {
1986                    samples: vec![
1987                        Sample {
1988                            value: 1.0,
1989                            rate: 1
1990                        },
1991                        Sample {
1992                            value: 2.0,
1993                            rate: 2
1994                        },
1995                    ],
1996                    statistic: StatisticKind::Histogram,
1997                },
1998                metric.metadata().clone(),
1999            )
2000            .with_namespace(Some("test_namespace"))
2001            .with_tags(Some(metric_tags!(
2002                "env" => "test_env",
2003                "host" => "localhost",
2004            )))
2005            .with_timestamp(Some(ts()))
2006        );
2007    }
2008
2009    #[tokio::test]
2010    async fn transform_distribution_summary() {
2011        let config = LogToMetricConfig {
2012            metrics: None,
2013            all_metrics: Some(true),
2014        };
2015
2016        let json_str = r#"{
2017          "distribution": {
2018            "samples": [
2019              {
2020                "value": 1.0,
2021                "rate": 1
2022              },
2023              {
2024                "value": 2.0,
2025                "rate": 2
2026              }
2027            ],
2028            "statistic": "summary"
2029          },
2030          "kind": "absolute",
2031          "name": "test.transform.distribution_summary",
2032          "tags": {
2033            "env": "test_env",
2034            "host": "localhost"
2035          }
2036        }"#;
2037        let log = create_log_event(json_str);
2038        let metric = do_transform(config, log.clone()).await.unwrap();
2039        assert_eq!(
2040            *metric.as_metric(),
2041            Metric::new_with_metadata(
2042                "test.transform.distribution_summary",
2043                MetricKind::Absolute,
2044                MetricValue::Distribution {
2045                    samples: vec![
2046                        Sample {
2047                            value: 1.0,
2048                            rate: 1
2049                        },
2050                        Sample {
2051                            value: 2.0,
2052                            rate: 2
2053                        },
2054                    ],
2055                    statistic: StatisticKind::Summary,
2056                },
2057                metric.metadata().clone(),
2058            )
2059            .with_namespace(Some("test_namespace"))
2060            .with_tags(Some(metric_tags!(
2061                "env" => "test_env",
2062                "host" => "localhost",
2063            )))
2064            .with_timestamp(Some(ts()))
2065        );
2066    }
2067
2068    #[tokio::test]
2069    async fn transform_summary() {
2070        let config = LogToMetricConfig {
2071            metrics: None,
2072            all_metrics: Some(true),
2073        };
2074
2075        let json_str = r#"{
2076          "summary": {
2077            "sum": 100.0,
2078            "count": 7,
2079            "quantiles": [
2080              {
2081                "quantile": 0.05,
2082                "value": 10.0
2083              },
2084              {
2085                "quantile": 0.95,
2086                "value": 25.0
2087              }
2088            ]
2089          },
2090          "kind": "absolute",
2091          "name": "test.transform.histogram",
2092          "tags": {
2093            "env": "test_env",
2094            "host": "localhost"
2095          }
2096        }"#;
2097        let log = create_log_event(json_str);
2098        let metric = do_transform(config, log.clone()).await.unwrap();
2099        assert_eq!(
2100            *metric.as_metric(),
2101            Metric::new_with_metadata(
2102                "test.transform.histogram",
2103                MetricKind::Absolute,
2104                MetricValue::AggregatedSummary {
2105                    quantiles: vec![
2106                        Quantile {
2107                            quantile: 0.05,
2108                            value: 10.0,
2109                        },
2110                        Quantile {
2111                            quantile: 0.95,
2112                            value: 25.0,
2113                        },
2114                    ],
2115                    count: 7,
2116                    sum: 100.0,
2117                },
2118                metric.metadata().clone(),
2119            )
2120            .with_namespace(Some("test_namespace"))
2121            .with_tags(Some(metric_tags!(
2122                "env" => "test_env",
2123                "host" => "localhost",
2124            )))
2125            .with_timestamp(Some(ts()))
2126        );
2127    }
2128
2129    #[tokio::test]
2130    async fn transform_counter() {
2131        let config = LogToMetricConfig {
2132            metrics: None,
2133            all_metrics: Some(true),
2134        };
2135
2136        let json_str = r#"{
2137          "counter": {
2138            "value": 10.0
2139          },
2140          "kind": "incremental",
2141          "name": "test.transform.counter",
2142          "tags": {
2143            "env": "test_env",
2144            "host": "localhost"
2145          }
2146        }"#;
2147        let log = create_log_event(json_str);
2148        let metric = do_transform(config, log.clone()).await.unwrap();
2149        assert_eq!(
2150            *metric.as_metric(),
2151            Metric::new_with_metadata(
2152                "test.transform.counter",
2153                MetricKind::Incremental,
2154                MetricValue::Counter { value: 10.0 },
2155                metric.metadata().clone(),
2156            )
2157            .with_namespace(Some("test_namespace"))
2158            .with_tags(Some(metric_tags!(
2159                "env" => "test_env",
2160                "host" => "localhost",
2161            )))
2162            .with_timestamp(Some(ts()))
2163        );
2164    }
2165
2166    #[tokio::test]
2167    async fn transform_set() {
2168        let config = LogToMetricConfig {
2169            metrics: None,
2170            all_metrics: Some(true),
2171        };
2172
2173        let json_str = r#"{
2174          "set": {
2175            "values": ["990.0", "1234"]
2176          },
2177          "kind": "incremental",
2178          "name": "test.transform.set",
2179          "tags": {
2180            "env": "test_env",
2181            "host": "localhost"
2182          }
2183        }"#;
2184        let log = create_log_event(json_str);
2185        let metric = do_transform(config, log.clone()).await.unwrap();
2186        assert_eq!(
2187            *metric.as_metric(),
2188            Metric::new_with_metadata(
2189                "test.transform.set",
2190                MetricKind::Incremental,
2191                MetricValue::Set {
2192                    values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2193                },
2194                metric.metadata().clone(),
2195            )
2196            .with_namespace(Some("test_namespace"))
2197            .with_tags(Some(metric_tags!(
2198                "env" => "test_env",
2199                "host" => "localhost",
2200            )))
2201            .with_timestamp(Some(ts()))
2202        );
2203    }
2204
2205    #[tokio::test]
2206    async fn transform_all_metrics_optional_namespace() {
2207        let config = LogToMetricConfig {
2208            metrics: None,
2209            all_metrics: Some(true),
2210        };
2211
2212        let json_str = r#"{
2213          "counter": {
2214            "value": 10.0
2215          },
2216          "kind": "incremental",
2217          "name": "test.transform.counter",
2218          "tags": {
2219            "env": "test_env",
2220            "host": "localhost"
2221          }
2222        }"#;
2223        let log = create_log_event_with_namespace(json_str, None);
2224        let metric = do_transform(config, log.clone()).await.unwrap();
2225        assert_eq!(
2226            *metric.as_metric(),
2227            Metric::new_with_metadata(
2228                "test.transform.counter",
2229                MetricKind::Incremental,
2230                MetricValue::Counter { value: 10.0 },
2231                metric.metadata().clone(),
2232            )
2233            .with_tags(Some(metric_tags!(
2234                "env" => "test_env",
2235                "host" => "localhost",
2236            )))
2237            .with_timestamp(Some(ts()))
2238        );
2239    }
2240}