vector/transforms/
log_to_metric.rs

1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6    configurable::configurable_component,
7    event::{
8        DatadogMetricOriginMetadata, LogEvent,
9        metric::{Bucket, Quantile, Sample},
10    },
11};
12use vrl::{
13    event_path, path,
14    path::{PathParseError, parse_target_path},
15};
16
17use crate::{
18    common::expansion::pair_expansion,
19    config::{
20        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
21        TransformOutput, schema::Definition,
22    },
23    event::{
24        Event, Value,
25        metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
26    },
27    internal_events::{
28        DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
29        MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
30        MetricMetadataParseError, ParserMissingFieldError,
31    },
32    schema,
33    template::{Template, TemplateRenderingError},
34    transforms::{
35        FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
36    },
37};
38
39const ORIGIN_SERVICE_VALUE: u32 = 3;
40
41/// Configuration for the `log_to_metric` transform.
42#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct LogToMetricConfig {
46    /// A list of metrics to generate.
47    pub metrics: Option<Vec<MetricConfig>>,
48
49    /// Setting this flag changes the behavior of this transformation.
50    /// Notably the `metrics` field will be ignored.
51    /// All incoming events will be processed and if possible they will be converted to log events.
52    /// Otherwise, only items specified in the `metrics` field will be processed.
53    ///
54    /// Example:
55    /// <pre class="chroma"><code class="language-toml" data-lang="toml">{
56    ///     "counter": {
57    ///         "value": 10.0
58    ///     },
59    ///     "kind": "incremental",
60    ///     "name": "test.transform.counter",
61    ///     "tags": {
62    ///         "env": "test_env",
63    ///         "host": "localhost"
64    ///     }
65    /// }
66    /// </code></pre>
67    ///
68    /// This is a JSON representation of a counter with the following properties:
69    ///
70    /// - `counter`: An object with a single property `value` representing the counter value, in this case, `10.0`).
71    /// - `kind`: A string indicating the kind of counter, in this case, "incremental".
72    /// - `name`: A string representing the name of the counter, here set to "test.transform.counter".
73    /// - `tags`: An object containing additional tags such as "env" and "host".
74    ///
75    /// Objects that can be processed include counter, histogram, gauge, set and summary.
76    pub all_metrics: Option<bool>,
77}
78
79/// Specification of a counter derived from a log event.
80#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct CounterConfig {
83    /// Increments the counter by the value in `field`, instead of only by `1`.
84    #[serde(default = "default_increment_by_value")]
85    pub increment_by_value: bool,
86
87    #[configurable(derived)]
88    #[serde(default = "default_kind")]
89    pub kind: MetricKind,
90}
91
92/// Specification of a metric derived from a log event.
93// TODO: While we're resolving the schema for this enum somewhat reasonably (in
94// `generate-components-docs.rb`), we have a problem where an overlapping field (overlap between two
95// or more of the subschemas) takes the details of the last subschema to be iterated over that
96// contains that field, such that, for example, the `Summary` variant below is overriding the
97// description for almost all of the fields because they're shared across all of the variants.
98#[configurable_component]
99#[derive(Clone, Debug)]
100pub struct MetricConfig {
101    /// Name of the field in the event to generate the metric.
102    pub field: Template,
103
104    /// Overrides the name of the counter.
105    ///
106    /// If not specified, `field` is used as the name of the metric.
107    pub name: Option<Template>,
108
109    /// Sets the namespace for the metric.
110    pub namespace: Option<Template>,
111
112    /// Tags to apply to the metric.
113    ///
114    /// Both keys and values can be templated, allowing you to attach dynamic tags to events.
115    ///
116    #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
117    pub tags: Option<IndexMap<Template, TagConfig>>,
118
119    #[configurable(derived)]
120    #[serde(flatten)]
121    pub metric: MetricTypeConfig,
122}
123
124/// Specification of the value of a created tag.
125///
126/// This may be a single value, a `null` for a bare tag, or an array of either.
127#[configurable_component]
128#[derive(Clone, Debug)]
129#[serde(untagged)]
130pub enum TagConfig {
131    /// A single tag value.
132    Plain(Option<Template>),
133
134    /// An array of values to give to the same tag name.
135    Multi(Vec<Option<Template>>),
136}
137
138/// Specification of the type of an individual metric, and any associated data.
139#[configurable_component]
140#[derive(Clone, Debug)]
141#[serde(tag = "type", rename_all = "snake_case")]
142#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
143pub enum MetricTypeConfig {
144    /// A counter.
145    Counter(CounterConfig),
146
147    /// A histogram.
148    Histogram,
149
150    /// A gauge.
151    Gauge,
152
153    /// A set.
154    Set,
155
156    /// A summary.
157    Summary,
158}
159
160impl MetricConfig {
161    fn field(&self) -> &str {
162        self.field.get_ref()
163    }
164}
165
166const fn default_increment_by_value() -> bool {
167    false
168}
169
170const fn default_kind() -> MetricKind {
171    MetricKind::Incremental
172}
173
174#[derive(Debug, Clone)]
175pub struct LogToMetric {
176    pub metrics: Vec<MetricConfig>,
177    pub all_metrics: bool,
178}
179
180impl GenerateConfig for LogToMetricConfig {
181    fn generate_config() -> toml::Value {
182        toml::Value::try_from(Self {
183            metrics: Some(vec![MetricConfig {
184                field: "field_name".try_into().expect("Fixed template"),
185                name: None,
186                namespace: None,
187                tags: None,
188                metric: MetricTypeConfig::Counter(CounterConfig {
189                    increment_by_value: false,
190                    kind: MetricKind::Incremental,
191                }),
192            }]),
193            all_metrics: Some(true),
194        })
195        .unwrap()
196    }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "log_to_metric")]
201impl TransformConfig for LogToMetricConfig {
202    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
203        Ok(Transform::function(LogToMetric {
204            metrics: self.metrics.clone().unwrap_or_default(),
205            all_metrics: self.all_metrics.unwrap_or_default(),
206        }))
207    }
208
209    fn input(&self) -> Input {
210        Input::log()
211    }
212
213    fn outputs(
214        &self,
215        _: &TransformContext,
216        _: &[(OutputId, schema::Definition)],
217    ) -> Vec<TransformOutput> {
218        // Converting the log to a metric means we lose all incoming `Definition`s.
219        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
220    }
221
222    fn enable_concurrency(&self) -> bool {
223        true
224    }
225}
226
227/// Kinds of TranformError for Parsing
228#[configurable_component]
229#[derive(Clone, Debug)]
230pub enum TransformParseErrorKind {
231    ///  Error when Parsing a Float
232    FloatError,
233    ///  Error when Parsing an Int
234    IntError,
235    /// Errors when Parsing Arrays
236    ArrayError,
237}
238
239impl std::fmt::Display for TransformParseErrorKind {
240    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
241        write!(f, "{self:?}")
242    }
243}
244
245enum TransformError {
246    PathNotFound {
247        path: String,
248    },
249    PathNull {
250        path: String,
251    },
252    MetricDetailsNotFound,
253    MetricValueError {
254        path: String,
255        path_value: String,
256    },
257    ParseError {
258        path: String,
259        kind: TransformParseErrorKind,
260    },
261    ParseFloatError {
262        path: String,
263        error: ParseFloatError,
264    },
265    TemplateRenderingError(TemplateRenderingError),
266    PairExpansionError {
267        key: String,
268        value: String,
269        error: serde_json::Error,
270    },
271}
272
273fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
274    template
275        .render_string(event)
276        .map_err(TransformError::TemplateRenderingError)
277}
278
279fn render_tags(
280    tags: &Option<IndexMap<Template, TagConfig>>,
281    event: &Event,
282) -> Result<Option<MetricTags>, TransformError> {
283    let mut static_tags: HashMap<String, String> = HashMap::new();
284    let mut dynamic_tags: HashMap<String, String> = HashMap::new();
285    Ok(match tags {
286        None => None,
287        Some(tags) => {
288            let mut result = MetricTags::default();
289            for (name, config) in tags {
290                match config {
291                    TagConfig::Plain(template) => {
292                        render_tag_into(
293                            event,
294                            name,
295                            template.as_ref(),
296                            &mut result,
297                            &mut static_tags,
298                            &mut dynamic_tags,
299                        )?;
300                    }
301                    TagConfig::Multi(vec) => {
302                        for template in vec {
303                            render_tag_into(
304                                event,
305                                name,
306                                template.as_ref(),
307                                &mut result,
308                                &mut static_tags,
309                                &mut dynamic_tags,
310                            )?;
311                        }
312                    }
313                }
314            }
315            for (k, v) in static_tags {
316                if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
317                    warn!(
318                        "Static tags overrides dynamic tags. \
319                key: {}, value: {:?}, discarded value: {:?}",
320                        k, v, discarded_v
321                    );
322                };
323            }
324            result.as_option()
325        }
326    })
327}
328
329fn render_tag_into(
330    event: &Event,
331    key_template: &Template,
332    value_template: Option<&Template>,
333    result: &mut MetricTags,
334    static_tags: &mut HashMap<String, String>,
335    dynamic_tags: &mut HashMap<String, String>,
336) -> Result<(), TransformError> {
337    let key = match render_template(key_template, event) {
338        Ok(key_s) => key_s,
339        Err(TransformError::TemplateRenderingError(err)) => {
340            emit!(crate::internal_events::TemplateRenderingError {
341                error: err,
342                drop_event: false,
343                field: Some(key_template.get_ref()),
344            });
345            return Ok(());
346        }
347        Err(err) => return Err(err),
348    };
349    match value_template {
350        None => {
351            result.insert(key, TagValue::Bare);
352        }
353        Some(template) => match render_template(template, event) {
354            Ok(value) => {
355                let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
356                    .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
357                result.extend(expanded_pairs);
358            }
359            Err(TransformError::TemplateRenderingError(value_error)) => {
360                emit!(crate::internal_events::TemplateRenderingError {
361                    error: value_error,
362                    drop_event: false,
363                    field: Some(template.get_ref()),
364                });
365                return Ok(());
366            }
367            Err(other) => return Err(other),
368        },
369    };
370    Ok(())
371}
372
373fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
374    let log = event.as_log();
375
376    let timestamp = log
377        .get_timestamp()
378        .and_then(Value::as_timestamp)
379        .cloned()
380        .or_else(|| Some(Utc::now()));
381
382    // Assign the OriginService for the new metric
383    let metadata = event
384        .metadata()
385        .clone()
386        .with_schema_definition(&Arc::new(Definition::any()))
387        .with_origin_metadata(DatadogMetricOriginMetadata::new(
388            None,
389            None,
390            Some(ORIGIN_SERVICE_VALUE),
391        ));
392
393    let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
394        path: config.field().to_string(),
395    })?;
396
397    let value = match log.get(&field) {
398        None => Err(TransformError::PathNotFound {
399            path: field.to_string(),
400        }),
401        Some(Value::Null) => Err(TransformError::PathNull {
402            path: field.to_string(),
403        }),
404        Some(value) => Ok(value),
405    }?;
406
407    let name = config.name.as_ref().unwrap_or(&config.field);
408    let name = render_template(name, event)?;
409
410    let namespace = config.namespace.as_ref();
411    let namespace = namespace
412        .map(|namespace| render_template(namespace, event))
413        .transpose()?;
414
415    let tags = render_tags(&config.tags, event)?;
416
417    let (kind, value) = match &config.metric {
418        MetricTypeConfig::Counter(counter) => {
419            let value = if counter.increment_by_value {
420                value.to_string_lossy().parse().map_err(|error| {
421                    TransformError::ParseFloatError {
422                        path: config.field.get_ref().to_owned(),
423                        error,
424                    }
425                })?
426            } else {
427                1.0
428            };
429
430            (counter.kind, MetricValue::Counter { value })
431        }
432        MetricTypeConfig::Histogram => {
433            let value = value.to_string_lossy().parse().map_err(|error| {
434                TransformError::ParseFloatError {
435                    path: field.to_string(),
436                    error,
437                }
438            })?;
439
440            (
441                MetricKind::Incremental,
442                MetricValue::Distribution {
443                    samples: vector_lib::samples![value => 1],
444                    statistic: StatisticKind::Histogram,
445                },
446            )
447        }
448        MetricTypeConfig::Summary => {
449            let value = value.to_string_lossy().parse().map_err(|error| {
450                TransformError::ParseFloatError {
451                    path: field.to_string(),
452                    error,
453                }
454            })?;
455
456            (
457                MetricKind::Incremental,
458                MetricValue::Distribution {
459                    samples: vector_lib::samples![value => 1],
460                    statistic: StatisticKind::Summary,
461                },
462            )
463        }
464        MetricTypeConfig::Gauge => {
465            let value = value.to_string_lossy().parse().map_err(|error| {
466                TransformError::ParseFloatError {
467                    path: field.to_string(),
468                    error,
469                }
470            })?;
471
472            (MetricKind::Absolute, MetricValue::Gauge { value })
473        }
474        MetricTypeConfig::Set => {
475            let value = value.to_string_lossy().into_owned();
476
477            (
478                MetricKind::Incremental,
479                MetricValue::Set {
480                    values: std::iter::once(value).collect(),
481                },
482            )
483        }
484    };
485    Ok(Metric::new_with_metadata(name, kind, value, metadata)
486        .with_namespace(namespace)
487        .with_tags(tags)
488        .with_timestamp(timestamp))
489}
490
491fn bytes_to_str(value: &Value) -> Option<String> {
492    match value {
493        Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
494        _ => None,
495    }
496}
497
498fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
499    // TODO: update returned errors after `TransformError` is refactored.
500    let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
501        PathParseError::InvalidPathSyntax { path } => PathNotFound {
502            path: path.to_string(),
503        },
504    })?;
505    match maybe_value {
506        None => Err(PathNotFound {
507            path: path.to_string(),
508        }),
509        Some(v) => Ok(bytes_to_str(v)),
510    }
511}
512
513fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
514    let counter_value = log
515        .get(event_path!("counter", "value"))
516        .ok_or_else(|| TransformError::PathNotFound {
517            path: "counter.value".to_string(),
518        })?
519        .as_float()
520        .ok_or_else(|| TransformError::ParseError {
521            path: "counter.value".to_string(),
522            kind: TransformParseErrorKind::FloatError,
523        })?;
524
525    Ok(MetricValue::Counter {
526        value: *counter_value,
527    })
528}
529
530fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
531    let gauge_value = log
532        .get(event_path!("gauge", "value"))
533        .ok_or_else(|| TransformError::PathNotFound {
534            path: "gauge.value".to_string(),
535        })?
536        .as_float()
537        .ok_or_else(|| TransformError::ParseError {
538            path: "gauge.value".to_string(),
539            kind: TransformParseErrorKind::FloatError,
540        })?;
541    Ok(MetricValue::Gauge {
542        value: *gauge_value,
543    })
544}
545
546fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
547    let set_values = log
548        .get(event_path!("set", "values"))
549        .ok_or_else(|| TransformError::PathNotFound {
550            path: "set.values".to_string(),
551        })?
552        .as_array()
553        .ok_or_else(|| TransformError::ParseError {
554            path: "set.values".to_string(),
555            kind: TransformParseErrorKind::ArrayError,
556        })?;
557
558    let mut values: Vec<String> = Vec::new();
559    for e_value in set_values {
560        let value = e_value
561            .as_bytes()
562            .ok_or_else(|| TransformError::ParseError {
563                path: "set.values".to_string(),
564                kind: TransformParseErrorKind::ArrayError,
565            })?;
566        values.push(String::from_utf8_lossy(value).to_string());
567    }
568
569    Ok(MetricValue::Set {
570        values: values.into_iter().collect(),
571    })
572}
573
574fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
575    let event_samples = log
576        .get(event_path!("distribution", "samples"))
577        .ok_or_else(|| TransformError::PathNotFound {
578            path: "distribution.samples".to_string(),
579        })?
580        .as_array()
581        .ok_or_else(|| TransformError::ParseError {
582            path: "distribution.samples".to_string(),
583            kind: TransformParseErrorKind::ArrayError,
584        })?;
585
586    let mut samples: Vec<Sample> = Vec::new();
587    for e_sample in event_samples {
588        let value = e_sample
589            .get(path!("value"))
590            .ok_or_else(|| TransformError::PathNotFound {
591                path: "value".to_string(),
592            })?
593            .as_float()
594            .ok_or_else(|| TransformError::ParseError {
595                path: "value".to_string(),
596                kind: TransformParseErrorKind::FloatError,
597            })?;
598
599        let rate = e_sample
600            .get(path!("rate"))
601            .ok_or_else(|| TransformError::PathNotFound {
602                path: "rate".to_string(),
603            })?
604            .as_integer()
605            .ok_or_else(|| TransformError::ParseError {
606                path: "rate".to_string(),
607                kind: TransformParseErrorKind::IntError,
608            })?;
609
610        samples.push(Sample {
611            value: *value,
612            rate: rate as u32,
613        });
614    }
615
616    let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
617        Some(n) => n,
618        None => {
619            return Err(TransformError::PathNotFound {
620                path: "distribution.statistic".to_string(),
621            });
622        }
623    };
624    let statistic_kind = match statistic_str.as_str() {
625        "histogram" => Ok(StatisticKind::Histogram),
626        "summary" => Ok(StatisticKind::Summary),
627        _ => Err(TransformError::MetricValueError {
628            path: "distribution.statistic".to_string(),
629            path_value: statistic_str.to_string(),
630        }),
631    }?;
632
633    Ok(MetricValue::Distribution {
634        samples,
635        statistic: statistic_kind,
636    })
637}
638
639fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
640    let event_buckets = log
641        .get(event_path!("aggregated_histogram", "buckets"))
642        .ok_or_else(|| TransformError::PathNotFound {
643            path: "aggregated_histogram.buckets".to_string(),
644        })?
645        .as_array()
646        .ok_or_else(|| TransformError::ParseError {
647            path: "aggregated_histogram.buckets".to_string(),
648            kind: TransformParseErrorKind::ArrayError,
649        })?;
650
651    let mut buckets: Vec<Bucket> = Vec::new();
652    for e_bucket in event_buckets {
653        let upper_limit = e_bucket
654            .get(path!("upper_limit"))
655            .ok_or_else(|| TransformError::PathNotFound {
656                path: "aggregated_histogram.buckets.upper_limit".to_string(),
657            })?
658            .as_float()
659            .ok_or_else(|| TransformError::ParseError {
660                path: "aggregated_histogram.buckets.upper_limit".to_string(),
661                kind: TransformParseErrorKind::FloatError,
662            })?;
663
664        let count = e_bucket
665            .get(path!("count"))
666            .ok_or_else(|| TransformError::PathNotFound {
667                path: "aggregated_histogram.buckets.count".to_string(),
668            })?
669            .as_integer()
670            .ok_or_else(|| TransformError::ParseError {
671                path: "aggregated_histogram.buckets.count".to_string(),
672                kind: TransformParseErrorKind::IntError,
673            })?;
674
675        buckets.push(Bucket {
676            upper_limit: *upper_limit,
677            count: count as u64,
678        });
679    }
680
681    let count = log
682        .get(event_path!("aggregated_histogram", "count"))
683        .ok_or_else(|| TransformError::PathNotFound {
684            path: "aggregated_histogram.count".to_string(),
685        })?
686        .as_integer()
687        .ok_or_else(|| TransformError::ParseError {
688            path: "aggregated_histogram.count".to_string(),
689            kind: TransformParseErrorKind::IntError,
690        })?;
691
692    let sum = log
693        .get(event_path!("aggregated_histogram", "sum"))
694        .ok_or_else(|| TransformError::PathNotFound {
695            path: "aggregated_histogram.sum".to_string(),
696        })?
697        .as_float()
698        .ok_or_else(|| TransformError::ParseError {
699            path: "aggregated_histogram.sum".to_string(),
700            kind: TransformParseErrorKind::FloatError,
701        })?;
702
703    Ok(MetricValue::AggregatedHistogram {
704        buckets,
705        count: count as u64,
706        sum: *sum,
707    })
708}
709
710fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
711    let event_quantiles = log
712        .get(event_path!("aggregated_summary", "quantiles"))
713        .ok_or_else(|| TransformError::PathNotFound {
714            path: "aggregated_summary.quantiles".to_string(),
715        })?
716        .as_array()
717        .ok_or_else(|| TransformError::ParseError {
718            path: "aggregated_summary.quantiles".to_string(),
719            kind: TransformParseErrorKind::ArrayError,
720        })?;
721
722    let mut quantiles: Vec<Quantile> = Vec::new();
723    for e_quantile in event_quantiles {
724        let quantile = e_quantile
725            .get(path!("quantile"))
726            .ok_or_else(|| TransformError::PathNotFound {
727                path: "aggregated_summary.quantiles.quantile".to_string(),
728            })?
729            .as_float()
730            .ok_or_else(|| TransformError::ParseError {
731                path: "aggregated_summary.quantiles.quantile".to_string(),
732                kind: TransformParseErrorKind::FloatError,
733            })?;
734
735        let value = e_quantile
736            .get(path!("value"))
737            .ok_or_else(|| TransformError::PathNotFound {
738                path: "aggregated_summary.quantiles.value".to_string(),
739            })?
740            .as_float()
741            .ok_or_else(|| TransformError::ParseError {
742                path: "aggregated_summary.quantiles.value".to_string(),
743                kind: TransformParseErrorKind::FloatError,
744            })?;
745
746        quantiles.push(Quantile {
747            quantile: *quantile,
748            value: *value,
749        })
750    }
751
752    let count = log
753        .get(event_path!("aggregated_summary", "count"))
754        .ok_or_else(|| TransformError::PathNotFound {
755            path: "aggregated_summary.count".to_string(),
756        })?
757        .as_integer()
758        .ok_or_else(|| TransformError::ParseError {
759            path: "aggregated_summary.count".to_string(),
760            kind: TransformParseErrorKind::IntError,
761        })?;
762
763    let sum = log
764        .get(event_path!("aggregated_summary", "sum"))
765        .ok_or_else(|| TransformError::PathNotFound {
766            path: "aggregated_summary.sum".to_string(),
767        })?
768        .as_float()
769        .ok_or_else(|| TransformError::ParseError {
770            path: "aggregated_summary.sum".to_string(),
771            kind: TransformParseErrorKind::FloatError,
772        })?;
773
774    Ok(MetricValue::AggregatedSummary {
775        quantiles,
776        count: count as u64,
777        sum: *sum,
778    })
779}
780
781fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
782    let log = event.as_log();
783    let timestamp = log
784        .get_timestamp()
785        .and_then(Value::as_timestamp)
786        .cloned()
787        .or_else(|| Some(Utc::now()));
788
789    let name = match try_get_string_from_log(log, "name")? {
790        Some(n) => n,
791        None => {
792            return Err(TransformError::PathNotFound {
793                path: "name".to_string(),
794            });
795        }
796    };
797
798    let mut tags = MetricTags::default();
799
800    if let Some(els) = log.get(event_path!("tags"))
801        && let Some(el) = els.as_object()
802    {
803        for (key, value) in el {
804            tags.insert(key.to_string(), bytes_to_str(value));
805        }
806    }
807    let tags_result = Some(tags);
808
809    let kind_str = match try_get_string_from_log(log, "kind")? {
810        Some(n) => n,
811        None => {
812            return Err(TransformError::PathNotFound {
813                path: "kind".to_string(),
814            });
815        }
816    };
817
818    let kind = match kind_str.as_str() {
819        "absolute" => Ok(MetricKind::Absolute),
820        "incremental" => Ok(MetricKind::Incremental),
821        value => Err(TransformError::MetricValueError {
822            path: "kind".to_string(),
823            path_value: value.to_string(),
824        }),
825    }?;
826
827    let mut value: Option<MetricValue> = None;
828    if let Some(root_event) = log.as_map() {
829        for key in root_event.keys() {
830            value = match key.as_str() {
831                "gauge" => Some(get_gauge_value(log)?),
832                "distribution" => Some(get_distribution_value(log)?),
833                "aggregated_histogram" => Some(get_histogram_value(log)?),
834                "aggregated_summary" => Some(get_summary_value(log)?),
835                "counter" => Some(get_counter_value(log)?),
836                "set" => Some(get_set_value(log)?),
837                _ => None,
838            };
839
840            if value.is_some() {
841                break;
842            }
843        }
844    }
845
846    let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
847
848    let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
849        .with_tags(tags_result)
850        .with_timestamp(timestamp);
851
852    if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
853        metric = metric.with_namespace(namespace);
854    }
855
856    Ok(metric)
857}
858
859impl FunctionTransform for LogToMetric {
860    fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
861        // Metrics are "all or none" for a specific log. If a single fails, none are produced.
862        let mut buffer = Vec::with_capacity(self.metrics.len());
863        if self.all_metrics {
864            match to_metrics(&event) {
865                Ok(metric) => {
866                    output.push(Event::Metric(metric));
867                }
868                Err(err) => {
869                    match err {
870                        TransformError::MetricValueError { path, path_value } => {
871                            emit!(MetricMetadataInvalidFieldValueError {
872                                field: path.as_ref(),
873                                field_value: path_value.as_ref()
874                            })
875                        }
876                        TransformError::PathNotFound { path } => {
877                            emit!(ParserMissingFieldError::<DROP_EVENT> {
878                                field: path.as_ref()
879                            })
880                        }
881                        TransformError::ParseError { path, kind } => {
882                            emit!(MetricMetadataParseError {
883                                field: path.as_ref(),
884                                kind: &kind.to_string(),
885                            })
886                        }
887                        TransformError::MetricDetailsNotFound => {
888                            emit!(MetricMetadataMetricDetailsNotFoundError {})
889                        }
890                        TransformError::PairExpansionError { key, value, error } => {
891                            emit!(crate::internal_events::PairExpansionError {
892                                key: &key,
893                                value: &value,
894                                drop_event: true,
895                                error
896                            })
897                        }
898                        _ => {}
899                    };
900                }
901            }
902        } else {
903            for config in self.metrics.iter() {
904                match to_metric_with_config(config, &event) {
905                    Ok(metric) => {
906                        buffer.push(Event::Metric(metric));
907                    }
908                    Err(err) => {
909                        match err {
910                            TransformError::PathNull { path } => {
911                                emit!(LogToMetricFieldNullError {
912                                    field: path.as_ref()
913                                })
914                            }
915                            TransformError::PathNotFound { path } => {
916                                emit!(ParserMissingFieldError::<DROP_EVENT> {
917                                    field: path.as_ref()
918                                })
919                            }
920                            TransformError::ParseFloatError { path, error } => {
921                                emit!(LogToMetricParseFloatError {
922                                    field: path.as_ref(),
923                                    error
924                                })
925                            }
926                            TransformError::TemplateRenderingError(error) => {
927                                emit!(crate::internal_events::TemplateRenderingError {
928                                    error,
929                                    drop_event: true,
930                                    field: None,
931                                })
932                            }
933                            TransformError::PairExpansionError { key, value, error } => {
934                                emit!(crate::internal_events::PairExpansionError {
935                                    key: &key,
936                                    value: &value,
937                                    drop_event: true,
938                                    error
939                                })
940                            }
941                            _ => {}
942                        };
943                        // early return to prevent the partial buffer from being sent
944                        return;
945                    }
946                }
947            }
948        }
949
950        // Metric generation was successful, publish them all.
951        for event in buffer {
952            output.push(event);
953        }
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use std::{sync::Arc, time::Duration};
960
961    use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
962    use similar_asserts::assert_eq;
963    use tokio::sync::mpsc;
964    use tokio_stream::wrappers::ReceiverStream;
965    use vector_lib::{
966        config::ComponentKey,
967        event::{EventMetadata, ObjectMap},
968        metric_tags,
969    };
970
971    use super::*;
972    use crate::{
973        config::log_schema,
974        event::{
975            Event, LogEvent,
976            metric::{Metric, MetricKind, MetricValue, StatisticKind},
977        },
978        test_util::components::assert_transform_compliance,
979        transforms::test::create_topology,
980    };
981
982    const TEST_SOURCE_COMPONENT_ID: &str = "in";
983    const TEST_UPSTREAM_COMPONENT_ID: &str = "transform";
984    const TEST_SOURCE_TYPE: &str = "unit_test_stream";
985    const TEST_NAMESPACE: &str = "test_namespace";
986
987    #[test]
988    fn generate_config() {
989        crate::test_util::test_generate_config::<LogToMetricConfig>();
990    }
991
992    fn parse_config(s: &str) -> LogToMetricConfig {
993        toml::from_str(s).unwrap()
994    }
995
996    fn parse_yaml_config(s: &str) -> LogToMetricConfig {
997        serde_yaml::from_str(s).unwrap()
998    }
999
1000    fn ts() -> DateTime<Utc> {
1001        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1002            .single()
1003            .and_then(|t| t.with_nanosecond(11))
1004            .expect("invalid timestamp")
1005    }
1006
1007    fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1008        let mut log = Event::Log(LogEvent::from("i am a log"));
1009        log.as_mut_log().insert(key, value);
1010        log.as_mut_log()
1011            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1012        log
1013    }
1014
1015    fn set_test_source_metadata(metadata: &mut EventMetadata) {
1016        metadata.set_upstream_id(Arc::new(OutputId::from(TEST_UPSTREAM_COMPONENT_ID)));
1017        metadata.set_source_id(Arc::new(ComponentKey::from(TEST_SOURCE_COMPONENT_ID)));
1018        metadata.set_source_type(TEST_SOURCE_TYPE);
1019    }
1020
1021    async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1022        assert_transform_compliance(async move {
1023            let (tx, rx) = mpsc::channel(1);
1024            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1025            tx.send(event).await.unwrap();
1026            let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1027                .await
1028                .unwrap_or(None);
1029            drop(tx);
1030            topology.stop().await;
1031            assert_eq!(out.recv().await, None);
1032            result
1033        })
1034        .await
1035    }
1036
1037    async fn do_transform_multiple_events(
1038        config: LogToMetricConfig,
1039        event: Event,
1040        count: usize,
1041    ) -> Vec<Event> {
1042        assert_transform_compliance(async move {
1043            let (tx, rx) = mpsc::channel(1);
1044            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1045            tx.send(event).await.unwrap();
1046
1047            let mut results = vec![];
1048            for _ in 0..count {
1049                let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1050                    .await
1051                    .unwrap_or(None);
1052                if let Some(event) = result {
1053                    results.push(event);
1054                }
1055            }
1056
1057            drop(tx);
1058            topology.stop().await;
1059            assert_eq!(out.recv().await, None);
1060            results
1061        })
1062        .await
1063    }
1064
1065    #[tokio::test]
1066    async fn count_http_status_codes() {
1067        let config = parse_config(
1068            r#"
1069            [[metrics]]
1070            type = "counter"
1071            field = "status"
1072            "#,
1073        );
1074
1075        let event = create_event("status", "42");
1076        let mut metadata =
1077            event
1078                .metadata()
1079                .clone()
1080                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1081                    None,
1082                    None,
1083                    Some(ORIGIN_SERVICE_VALUE),
1084                ));
1085        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1086        metadata.set_schema_definition(&Arc::new(Definition::any()));
1087        set_test_source_metadata(&mut metadata);
1088        let metric = do_transform(config, event).await.unwrap();
1089
1090        assert_eq!(
1091            metric.into_metric(),
1092            Metric::new_with_metadata(
1093                "status",
1094                MetricKind::Incremental,
1095                MetricValue::Counter { value: 1.0 },
1096                metadata,
1097            )
1098            .with_timestamp(Some(ts()))
1099        );
1100    }
1101
1102    #[tokio::test]
1103    async fn count_http_requests_with_tags() {
1104        let config = parse_config(
1105            r#"
1106            [[metrics]]
1107            type = "counter"
1108            field = "message"
1109            name = "http_requests_total"
1110            namespace = "app"
1111            tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1112            "#,
1113        );
1114
1115        let mut event = create_event("message", "i am log");
1116        event.as_mut_log().insert("method", "post");
1117        event.as_mut_log().insert("code", "200");
1118        let mut metadata =
1119            event
1120                .metadata()
1121                .clone()
1122                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1123                    None,
1124                    None,
1125                    Some(ORIGIN_SERVICE_VALUE),
1126                ));
1127        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1128        metadata.set_schema_definition(&Arc::new(Definition::any()));
1129        set_test_source_metadata(&mut metadata);
1130
1131        let metric = do_transform(config, event).await.unwrap();
1132
1133        assert_eq!(
1134            metric.into_metric(),
1135            Metric::new_with_metadata(
1136                "http_requests_total",
1137                MetricKind::Incremental,
1138                MetricValue::Counter { value: 1.0 },
1139                metadata,
1140            )
1141            .with_namespace(Some("app"))
1142            .with_tags(Some(metric_tags!(
1143                "method" => "post",
1144                "code" => "200",
1145                "host" => "localhost",
1146            )))
1147            .with_timestamp(Some(ts()))
1148        );
1149    }
1150
1151    #[tokio::test]
1152    async fn count_http_requests_with_tags_expansion() {
1153        let config = parse_config(
1154            r#"
1155            [[metrics]]
1156            type = "counter"
1157            field = "message"
1158            name = "http_requests_total"
1159            namespace = "app"
1160            tags = {"*" = "{{ dict }}"}
1161            "#,
1162        );
1163
1164        let mut event = create_event("message", "i am log");
1165        let log = event.as_mut_log();
1166
1167        let mut test_dict = ObjectMap::default();
1168        test_dict.insert("one".into(), Value::from("foo"));
1169        test_dict.insert("two".into(), Value::from("baz"));
1170        log.insert("dict", Value::from(test_dict));
1171
1172        let mut metadata =
1173            event
1174                .metadata()
1175                .clone()
1176                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1177                    None,
1178                    None,
1179                    Some(ORIGIN_SERVICE_VALUE),
1180                ));
1181        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1182        metadata.set_schema_definition(&Arc::new(Definition::any()));
1183        set_test_source_metadata(&mut metadata);
1184
1185        let metric = do_transform(config, event).await.unwrap();
1186
1187        assert_eq!(
1188            metric.into_metric(),
1189            Metric::new_with_metadata(
1190                "http_requests_total",
1191                MetricKind::Incremental,
1192                MetricValue::Counter { value: 1.0 },
1193                metadata,
1194            )
1195            .with_namespace(Some("app"))
1196            .with_tags(Some(metric_tags!(
1197                "one" => "foo",
1198                "two" => "baz",
1199            )))
1200            .with_timestamp(Some(ts()))
1201        );
1202    }
1203    #[tokio::test]
1204    async fn count_http_requests_with_colliding_dynamic_tags() {
1205        let config = parse_config(
1206            r#"
1207            [[metrics]]
1208            type = "counter"
1209            field = "message"
1210            name = "http_requests_total"
1211            namespace = "app"
1212            tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1213            "#,
1214        );
1215
1216        let mut event = create_event("message", "i am log");
1217        let log = event.as_mut_log();
1218
1219        let mut map1 = ObjectMap::default();
1220        map1.insert("key1".into(), Value::from("val1"));
1221        log.insert("map1", Value::from(map1));
1222
1223        let mut map2 = ObjectMap::default();
1224        map2.insert("l1_key1".into(), Value::from("val2"));
1225        log.insert("map2", Value::from(map2));
1226
1227        let mut metadata =
1228            event
1229                .metadata()
1230                .clone()
1231                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1232                    None,
1233                    None,
1234                    Some(ORIGIN_SERVICE_VALUE),
1235                ));
1236        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1237        metadata.set_schema_definition(&Arc::new(Definition::any()));
1238        set_test_source_metadata(&mut metadata);
1239
1240        let metric = do_transform(config, event).await.unwrap().into_metric();
1241        let tags = metric.tags().expect("Metric should have tags");
1242
1243        assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1244
1245        assert_eq!(tags.iter_all().count(), 2);
1246        for (name, value) in tags.iter_all() {
1247            assert_eq!(name, "l1_key1");
1248            assert!(value == Some("val1") || value == Some("val2"));
1249        }
1250    }
1251    #[tokio::test]
1252    async fn multi_value_tags_yaml() {
1253        // Have to use YAML to represent bare tags
1254        let config = parse_yaml_config(
1255            r#"
1256            metrics:
1257            - field: "message"
1258              type: "counter"
1259              tags:
1260                tag:
1261                - "one"
1262                - null
1263                - "two"
1264            "#,
1265        );
1266
1267        let event = create_event("message", "I am log");
1268        let metric = do_transform(config, event).await.unwrap().into_metric();
1269        let tags = metric.tags().expect("Metric should have tags");
1270
1271        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1272
1273        assert_eq!(tags.iter_all().count(), 3);
1274        for (name, value) in tags.iter_all() {
1275            assert_eq!(name, "tag");
1276            assert!(value.is_none() || value == Some("one") || value == Some("two"));
1277        }
1278    }
1279    #[tokio::test]
1280    async fn multi_value_tags_expansion_yaml() {
1281        // Have to use YAML to represent bare tags
1282        let config = parse_yaml_config(
1283            r#"
1284            metrics:
1285            - field: "message"
1286              type: "counter"
1287              tags:
1288                "*": "{{dict}}"
1289            "#,
1290        );
1291
1292        let mut event = create_event("message", "I am log");
1293        let log = event.as_mut_log();
1294
1295        let mut test_dict = ObjectMap::default();
1296        test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1297        log.insert("dict", Value::from(test_dict));
1298
1299        let metric = do_transform(config, event).await.unwrap().into_metric();
1300        let tags = metric.tags().expect("Metric should have tags");
1301
1302        assert_eq!(
1303            tags.iter_single().collect::<Vec<_>>(),
1304            vec![("one", "[\"foo\",\"baz\"]")]
1305        );
1306
1307        assert_eq!(tags.iter_all().count(), 1);
1308        for (name, value) in tags.iter_all() {
1309            assert_eq!(name, "one");
1310            assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1311        }
1312    }
1313
1314    #[tokio::test]
1315    async fn multi_value_tags_toml() {
1316        let config = parse_config(
1317            r#"
1318            [[metrics]]
1319            field = "message"
1320            type = "counter"
1321            [metrics.tags]
1322            tag = ["one", "two"]
1323            "#,
1324        );
1325
1326        let event = create_event("message", "I am log");
1327        let metric = do_transform(config, event).await.unwrap().into_metric();
1328        let tags = metric.tags().expect("Metric should have tags");
1329
1330        assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1331
1332        assert_eq!(tags.iter_all().count(), 2);
1333        for (name, value) in tags.iter_all() {
1334            assert_eq!(name, "tag");
1335            assert!(value == Some("one") || value == Some("two"));
1336        }
1337    }
1338
1339    #[tokio::test]
1340    async fn count_exceptions() {
1341        let config = parse_config(
1342            r#"
1343            [[metrics]]
1344            type = "counter"
1345            field = "backtrace"
1346            name = "exception_total"
1347            "#,
1348        );
1349
1350        let event = create_event("backtrace", "message");
1351        let mut metadata =
1352            event
1353                .metadata()
1354                .clone()
1355                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1356                    None,
1357                    None,
1358                    Some(ORIGIN_SERVICE_VALUE),
1359                ));
1360        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1361        metadata.set_schema_definition(&Arc::new(Definition::any()));
1362        set_test_source_metadata(&mut metadata);
1363
1364        let metric = do_transform(config, event).await.unwrap();
1365
1366        assert_eq!(
1367            metric.into_metric(),
1368            Metric::new_with_metadata(
1369                "exception_total",
1370                MetricKind::Incremental,
1371                MetricValue::Counter { value: 1.0 },
1372                metadata
1373            )
1374            .with_timestamp(Some(ts()))
1375        );
1376    }
1377
1378    #[tokio::test]
1379    async fn count_exceptions_no_match() {
1380        let config = parse_config(
1381            r#"
1382            [[metrics]]
1383            type = "counter"
1384            field = "backtrace"
1385            name = "exception_total"
1386            "#,
1387        );
1388
1389        let event = create_event("success", "42");
1390        assert_eq!(do_transform(config, event).await, None);
1391    }
1392
1393    #[tokio::test]
1394    async fn sum_order_amounts() {
1395        let config = parse_config(
1396            r#"
1397            [[metrics]]
1398            type = "counter"
1399            field = "amount"
1400            name = "amount_total"
1401            increment_by_value = true
1402            "#,
1403        );
1404
1405        let event = create_event("amount", "33.99");
1406        let mut metadata =
1407            event
1408                .metadata()
1409                .clone()
1410                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1411                    None,
1412                    None,
1413                    Some(ORIGIN_SERVICE_VALUE),
1414                ));
1415        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1416        metadata.set_schema_definition(&Arc::new(Definition::any()));
1417        set_test_source_metadata(&mut metadata);
1418        let metric = do_transform(config, event).await.unwrap();
1419
1420        assert_eq!(
1421            metric.into_metric(),
1422            Metric::new_with_metadata(
1423                "amount_total",
1424                MetricKind::Incremental,
1425                MetricValue::Counter { value: 33.99 },
1426                metadata,
1427            )
1428            .with_timestamp(Some(ts()))
1429        );
1430    }
1431
1432    #[tokio::test]
1433    async fn count_absolute() {
1434        let config = parse_config(
1435            r#"
1436            [[metrics]]
1437            type = "counter"
1438            field = "amount"
1439            name = "amount_total"
1440            increment_by_value = true
1441            kind = "absolute"
1442            "#,
1443        );
1444
1445        let event = create_event("amount", "33.99");
1446        let mut metadata =
1447            event
1448                .metadata()
1449                .clone()
1450                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1451                    None,
1452                    None,
1453                    Some(ORIGIN_SERVICE_VALUE),
1454                ));
1455        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1456        metadata.set_schema_definition(&Arc::new(Definition::any()));
1457        set_test_source_metadata(&mut metadata);
1458
1459        let metric = do_transform(config, event).await.unwrap();
1460
1461        assert_eq!(
1462            metric.into_metric(),
1463            Metric::new_with_metadata(
1464                "amount_total",
1465                MetricKind::Absolute,
1466                MetricValue::Counter { value: 33.99 },
1467                metadata,
1468            )
1469            .with_timestamp(Some(ts()))
1470        );
1471    }
1472
1473    #[tokio::test]
1474    async fn memory_usage_gauge() {
1475        let config = parse_config(
1476            r#"
1477            [[metrics]]
1478            type = "gauge"
1479            field = "memory_rss"
1480            name = "memory_rss_bytes"
1481            "#,
1482        );
1483
1484        let event = create_event("memory_rss", "123");
1485        let mut metadata =
1486            event
1487                .metadata()
1488                .clone()
1489                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1490                    None,
1491                    None,
1492                    Some(ORIGIN_SERVICE_VALUE),
1493                ));
1494
1495        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1496        metadata.set_schema_definition(&Arc::new(Definition::any()));
1497
1498        set_test_source_metadata(&mut metadata);
1499
1500        let metric = do_transform(config, event).await.unwrap();
1501
1502        assert_eq!(
1503            metric.into_metric(),
1504            Metric::new_with_metadata(
1505                "memory_rss_bytes",
1506                MetricKind::Absolute,
1507                MetricValue::Gauge { value: 123.0 },
1508                metadata,
1509            )
1510            .with_timestamp(Some(ts()))
1511        );
1512    }
1513
1514    #[tokio::test]
1515    async fn parse_failure() {
1516        let config = parse_config(
1517            r#"
1518            [[metrics]]
1519            type = "counter"
1520            field = "status"
1521            name = "status_total"
1522            increment_by_value = true
1523            "#,
1524        );
1525
1526        let event = create_event("status", "not a number");
1527        assert_eq!(do_transform(config, event).await, None);
1528    }
1529
1530    #[tokio::test]
1531    async fn missing_field() {
1532        let config = parse_config(
1533            r#"
1534            [[metrics]]
1535            type = "counter"
1536            field = "status"
1537            name = "status_total"
1538            "#,
1539        );
1540
1541        let event = create_event("not foo", "not a number");
1542        assert_eq!(do_transform(config, event).await, None);
1543    }
1544
1545    #[tokio::test]
1546    async fn null_field() {
1547        let config = parse_config(
1548            r#"
1549            [[metrics]]
1550            type = "counter"
1551            field = "status"
1552            name = "status_total"
1553            "#,
1554        );
1555
1556        let event = create_event("status", Value::Null);
1557        assert_eq!(do_transform(config, event).await, None);
1558    }
1559
1560    #[tokio::test]
1561    async fn multiple_metrics() {
1562        let config = parse_config(
1563            r#"
1564            [[metrics]]
1565            type = "counter"
1566            field = "status"
1567
1568            [[metrics]]
1569            type = "counter"
1570            field = "backtrace"
1571            name = "exception_total"
1572            "#,
1573        );
1574
1575        let mut event = Event::Log(LogEvent::from("i am a log"));
1576        event
1577            .as_mut_log()
1578            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1579        event.as_mut_log().insert("status", "42");
1580        event.as_mut_log().insert("backtrace", "message");
1581        let mut metadata =
1582            event
1583                .metadata()
1584                .clone()
1585                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1586                    None,
1587                    None,
1588                    Some(ORIGIN_SERVICE_VALUE),
1589                ));
1590
1591        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1592        metadata.set_schema_definition(&Arc::new(Definition::any()));
1593        set_test_source_metadata(&mut metadata);
1594
1595        let output = do_transform_multiple_events(config, event, 2).await;
1596
1597        assert_eq!(2, output.len());
1598        assert_eq!(
1599            output[0].clone().into_metric(),
1600            Metric::new_with_metadata(
1601                "status",
1602                MetricKind::Incremental,
1603                MetricValue::Counter { value: 1.0 },
1604                metadata.clone(),
1605            )
1606            .with_timestamp(Some(ts()))
1607        );
1608        assert_eq!(
1609            output[1].clone().into_metric(),
1610            Metric::new_with_metadata(
1611                "exception_total",
1612                MetricKind::Incremental,
1613                MetricValue::Counter { value: 1.0 },
1614                metadata,
1615            )
1616            .with_timestamp(Some(ts()))
1617        );
1618    }
1619
1620    #[tokio::test]
1621    async fn multiple_metrics_with_multiple_templates() {
1622        let config = parse_config(
1623            r#"
1624            [[metrics]]
1625            type = "set"
1626            field = "status"
1627            name = "{{host}}_{{worker}}_status_set"
1628
1629            [[metrics]]
1630            type = "counter"
1631            field = "backtrace"
1632            name = "{{service}}_exception_total"
1633            namespace = "{{host}}"
1634            "#,
1635        );
1636
1637        let mut event = Event::Log(LogEvent::from("i am a log"));
1638        event
1639            .as_mut_log()
1640            .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1641        event.as_mut_log().insert("status", "42");
1642        event.as_mut_log().insert("backtrace", "message");
1643        event.as_mut_log().insert("host", "local");
1644        event.as_mut_log().insert("worker", "abc");
1645        event.as_mut_log().insert("service", "xyz");
1646        let mut metadata =
1647            event
1648                .metadata()
1649                .clone()
1650                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1651                    None,
1652                    None,
1653                    Some(ORIGIN_SERVICE_VALUE),
1654                ));
1655
1656        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1657        metadata.set_schema_definition(&Arc::new(Definition::any()));
1658        set_test_source_metadata(&mut metadata);
1659
1660        let output = do_transform_multiple_events(config, event, 2).await;
1661
1662        assert_eq!(2, output.len());
1663        assert_eq!(
1664            output[0].as_metric(),
1665            &Metric::new_with_metadata(
1666                "local_abc_status_set",
1667                MetricKind::Incremental,
1668                MetricValue::Set {
1669                    values: vec!["42".into()].into_iter().collect()
1670                },
1671                metadata.clone(),
1672            )
1673            .with_timestamp(Some(ts()))
1674        );
1675        assert_eq!(
1676            output[1].as_metric(),
1677            &Metric::new_with_metadata(
1678                "xyz_exception_total",
1679                MetricKind::Incremental,
1680                MetricValue::Counter { value: 1.0 },
1681                metadata,
1682            )
1683            .with_namespace(Some("local"))
1684            .with_timestamp(Some(ts()))
1685        );
1686    }
1687
1688    #[tokio::test]
1689    async fn user_ip_set() {
1690        let config = parse_config(
1691            r#"
1692            [[metrics]]
1693            type = "set"
1694            field = "user_ip"
1695            name = "unique_user_ip"
1696            "#,
1697        );
1698
1699        let event = create_event("user_ip", "1.2.3.4");
1700        let mut metadata =
1701            event
1702                .metadata()
1703                .clone()
1704                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1705                    None,
1706                    None,
1707                    Some(ORIGIN_SERVICE_VALUE),
1708                ));
1709        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1710        metadata.set_schema_definition(&Arc::new(Definition::any()));
1711        set_test_source_metadata(&mut metadata);
1712
1713        let metric = do_transform(config, event).await.unwrap();
1714
1715        assert_eq!(
1716            metric.into_metric(),
1717            Metric::new_with_metadata(
1718                "unique_user_ip",
1719                MetricKind::Incremental,
1720                MetricValue::Set {
1721                    values: vec!["1.2.3.4".into()].into_iter().collect()
1722                },
1723                metadata,
1724            )
1725            .with_timestamp(Some(ts()))
1726        );
1727    }
1728
1729    #[tokio::test]
1730    async fn response_time_histogram() {
1731        let config = parse_config(
1732            r#"
1733            [[metrics]]
1734            type = "histogram"
1735            field = "response_time"
1736            "#,
1737        );
1738
1739        let event = create_event("response_time", "2.5");
1740        let mut metadata =
1741            event
1742                .metadata()
1743                .clone()
1744                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1745                    None,
1746                    None,
1747                    Some(ORIGIN_SERVICE_VALUE),
1748                ));
1749
1750        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1751        metadata.set_schema_definition(&Arc::new(Definition::any()));
1752        set_test_source_metadata(&mut metadata);
1753
1754        let metric = do_transform(config, event).await.unwrap();
1755
1756        assert_eq!(
1757            metric.into_metric(),
1758            Metric::new_with_metadata(
1759                "response_time",
1760                MetricKind::Incremental,
1761                MetricValue::Distribution {
1762                    samples: vector_lib::samples![2.5 => 1],
1763                    statistic: StatisticKind::Histogram
1764                },
1765                metadata
1766            )
1767            .with_timestamp(Some(ts()))
1768        );
1769    }
1770
1771    #[tokio::test]
1772    async fn response_time_summary() {
1773        let config = parse_config(
1774            r#"
1775            [[metrics]]
1776            type = "summary"
1777            field = "response_time"
1778            "#,
1779        );
1780
1781        let event = create_event("response_time", "2.5");
1782        let mut metadata =
1783            event
1784                .metadata()
1785                .clone()
1786                .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787                    None,
1788                    None,
1789                    Some(ORIGIN_SERVICE_VALUE),
1790                ));
1791
1792        // definitions aren't valid for metrics yet, it's just set to the default (anything).
1793        metadata.set_schema_definition(&Arc::new(Definition::any()));
1794        set_test_source_metadata(&mut metadata);
1795
1796        let metric = do_transform(config, event).await.unwrap();
1797
1798        assert_eq!(
1799            metric.into_metric(),
1800            Metric::new_with_metadata(
1801                "response_time",
1802                MetricKind::Incremental,
1803                MetricValue::Distribution {
1804                    samples: vector_lib::samples![2.5 => 1],
1805                    statistic: StatisticKind::Summary
1806                },
1807                metadata
1808            )
1809            .with_timestamp(Some(ts()))
1810        );
1811    }
1812
1813    //  Metric Metadata Tests
1814    //
1815    fn create_log_event(json_str: &str) -> Event {
1816        create_log_event_with_namespace(json_str, Some(TEST_NAMESPACE))
1817    }
1818
1819    fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1820        let mut log_value: Value =
1821            serde_json::from_str(json_str).expect("JSON was not well-formatted");
1822        log_value.insert("timestamp", ts());
1823
1824        if let Some(namespace) = namespace {
1825            log_value.insert("namespace", namespace);
1826        }
1827
1828        let mut metadata = EventMetadata::default();
1829        set_test_source_metadata(&mut metadata);
1830
1831        Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1832    }
1833
1834    #[tokio::test]
1835    async fn transform_gauge() {
1836        let config = LogToMetricConfig {
1837            metrics: None,
1838            all_metrics: Some(true),
1839        };
1840
1841        let json_str = r#"{
1842          "gauge": {
1843            "value": 990.0
1844          },
1845          "kind": "absolute",
1846          "name": "test.transform.gauge",
1847          "tags": {
1848            "env": "test_env",
1849            "host": "localhost"
1850          }
1851        }"#;
1852        let log = create_log_event(json_str);
1853        let metric = do_transform(config, log.clone()).await.unwrap();
1854        assert_eq!(
1855            *metric.as_metric(),
1856            Metric::new_with_metadata(
1857                "test.transform.gauge",
1858                MetricKind::Absolute,
1859                MetricValue::Gauge { value: 990.0 },
1860                metric.metadata().clone(),
1861            )
1862            .with_namespace(Some(TEST_NAMESPACE))
1863            .with_tags(Some(metric_tags!(
1864                "env" => "test_env",
1865                "host" => "localhost",
1866            )))
1867            .with_timestamp(Some(ts()))
1868        );
1869    }
1870
1871    #[tokio::test]
1872    async fn transform_histogram() {
1873        let config = LogToMetricConfig {
1874            metrics: None,
1875            all_metrics: Some(true),
1876        };
1877
1878        let json_str = r#"{
1879          "aggregated_histogram": {
1880            "sum": 18.0,
1881            "count": 5,
1882            "buckets": [
1883              {
1884                "upper_limit": 1.0,
1885                "count": 1
1886              },
1887              {
1888                "upper_limit": 2.0,
1889                "count": 2
1890              },
1891              {
1892                "upper_limit": 5.0,
1893                "count": 1
1894              },
1895              {
1896                "upper_limit": 10.0,
1897                "count": 1
1898              }
1899            ]
1900          },
1901          "kind": "absolute",
1902          "name": "test.transform.histogram",
1903          "tags": {
1904            "env": "test_env",
1905            "host": "localhost"
1906          }
1907        }"#;
1908        let log = create_log_event(json_str);
1909        let metric = do_transform(config, log.clone()).await.unwrap();
1910        assert_eq!(
1911            *metric.as_metric(),
1912            Metric::new_with_metadata(
1913                "test.transform.histogram",
1914                MetricKind::Absolute,
1915                MetricValue::AggregatedHistogram {
1916                    count: 5,
1917                    sum: 18.0,
1918                    buckets: vec![
1919                        Bucket {
1920                            upper_limit: 1.0,
1921                            count: 1,
1922                        },
1923                        Bucket {
1924                            upper_limit: 2.0,
1925                            count: 2,
1926                        },
1927                        Bucket {
1928                            upper_limit: 5.0,
1929                            count: 1,
1930                        },
1931                        Bucket {
1932                            upper_limit: 10.0,
1933                            count: 1,
1934                        },
1935                    ],
1936                },
1937                metric.metadata().clone(),
1938            )
1939            .with_namespace(Some(TEST_NAMESPACE))
1940            .with_tags(Some(metric_tags!(
1941                "env" => "test_env",
1942                "host" => "localhost",
1943            )))
1944            .with_timestamp(Some(ts()))
1945        );
1946    }
1947
1948    #[tokio::test]
1949    async fn transform_distribution_histogram() {
1950        let config = LogToMetricConfig {
1951            metrics: None,
1952            all_metrics: Some(true),
1953        };
1954
1955        let json_str = r#"{
1956          "distribution": {
1957            "samples": [
1958              {
1959                "value": 1.0,
1960                "rate": 1
1961              },
1962              {
1963                "value": 2.0,
1964                "rate": 2
1965              }
1966            ],
1967            "statistic": "histogram"
1968          },
1969          "kind": "absolute",
1970          "name": "test.transform.distribution_histogram",
1971          "tags": {
1972            "env": "test_env",
1973            "host": "localhost"
1974          }
1975        }"#;
1976        let log = create_log_event(json_str);
1977        let metric = do_transform(config, log.clone()).await.unwrap();
1978        assert_eq!(
1979            *metric.as_metric(),
1980            Metric::new_with_metadata(
1981                "test.transform.distribution_histogram",
1982                MetricKind::Absolute,
1983                MetricValue::Distribution {
1984                    samples: vec![
1985                        Sample {
1986                            value: 1.0,
1987                            rate: 1
1988                        },
1989                        Sample {
1990                            value: 2.0,
1991                            rate: 2
1992                        },
1993                    ],
1994                    statistic: StatisticKind::Histogram,
1995                },
1996                metric.metadata().clone(),
1997            )
1998            .with_namespace(Some(TEST_NAMESPACE))
1999            .with_tags(Some(metric_tags!(
2000                "env" => "test_env",
2001                "host" => "localhost",
2002            )))
2003            .with_timestamp(Some(ts()))
2004        );
2005    }
2006
2007    #[tokio::test]
2008    async fn transform_distribution_summary() {
2009        let config = LogToMetricConfig {
2010            metrics: None,
2011            all_metrics: Some(true),
2012        };
2013
2014        let json_str = r#"{
2015          "distribution": {
2016            "samples": [
2017              {
2018                "value": 1.0,
2019                "rate": 1
2020              },
2021              {
2022                "value": 2.0,
2023                "rate": 2
2024              }
2025            ],
2026            "statistic": "summary"
2027          },
2028          "kind": "absolute",
2029          "name": "test.transform.distribution_summary",
2030          "tags": {
2031            "env": "test_env",
2032            "host": "localhost"
2033          }
2034        }"#;
2035        let log = create_log_event(json_str);
2036        let metric = do_transform(config, log.clone()).await.unwrap();
2037        assert_eq!(
2038            *metric.as_metric(),
2039            Metric::new_with_metadata(
2040                "test.transform.distribution_summary",
2041                MetricKind::Absolute,
2042                MetricValue::Distribution {
2043                    samples: vec![
2044                        Sample {
2045                            value: 1.0,
2046                            rate: 1
2047                        },
2048                        Sample {
2049                            value: 2.0,
2050                            rate: 2
2051                        },
2052                    ],
2053                    statistic: StatisticKind::Summary,
2054                },
2055                metric.metadata().clone(),
2056            )
2057            .with_namespace(Some(TEST_NAMESPACE))
2058            .with_tags(Some(metric_tags!(
2059                "env" => "test_env",
2060                "host" => "localhost",
2061            )))
2062            .with_timestamp(Some(ts()))
2063        );
2064    }
2065
2066    #[tokio::test]
2067    async fn transform_summary() {
2068        let config = LogToMetricConfig {
2069            metrics: None,
2070            all_metrics: Some(true),
2071        };
2072
2073        let json_str = r#"{
2074          "aggregated_summary": {
2075            "sum": 100.0,
2076            "count": 7,
2077            "quantiles": [
2078              {
2079                "quantile": 0.05,
2080                "value": 10.0
2081              },
2082              {
2083                "quantile": 0.95,
2084                "value": 25.0
2085              }
2086            ]
2087          },
2088          "kind": "absolute",
2089          "name": "test.transform.histogram",
2090          "tags": {
2091            "env": "test_env",
2092            "host": "localhost"
2093          }
2094        }"#;
2095        let log = create_log_event(json_str);
2096        let metric = do_transform(config, log.clone()).await.unwrap();
2097        assert_eq!(
2098            *metric.as_metric(),
2099            Metric::new_with_metadata(
2100                "test.transform.histogram",
2101                MetricKind::Absolute,
2102                MetricValue::AggregatedSummary {
2103                    quantiles: vec![
2104                        Quantile {
2105                            quantile: 0.05,
2106                            value: 10.0,
2107                        },
2108                        Quantile {
2109                            quantile: 0.95,
2110                            value: 25.0,
2111                        },
2112                    ],
2113                    count: 7,
2114                    sum: 100.0,
2115                },
2116                metric.metadata().clone(),
2117            )
2118            .with_namespace(Some(TEST_NAMESPACE))
2119            .with_tags(Some(metric_tags!(
2120                "env" => "test_env",
2121                "host" => "localhost",
2122            )))
2123            .with_timestamp(Some(ts()))
2124        );
2125    }
2126
2127    #[tokio::test]
2128    async fn transform_counter() {
2129        let config = LogToMetricConfig {
2130            metrics: None,
2131            all_metrics: Some(true),
2132        };
2133
2134        let json_str = r#"{
2135          "counter": {
2136            "value": 10.0
2137          },
2138          "kind": "incremental",
2139          "name": "test.transform.counter",
2140          "tags": {
2141            "env": "test_env",
2142            "host": "localhost"
2143          }
2144        }"#;
2145        let log = create_log_event(json_str);
2146        let metric = do_transform(config, log.clone()).await.unwrap();
2147        assert_eq!(
2148            *metric.as_metric(),
2149            Metric::new_with_metadata(
2150                "test.transform.counter",
2151                MetricKind::Incremental,
2152                MetricValue::Counter { value: 10.0 },
2153                metric.metadata().clone(),
2154            )
2155            .with_namespace(Some(TEST_NAMESPACE))
2156            .with_tags(Some(metric_tags!(
2157                "env" => "test_env",
2158                "host" => "localhost",
2159            )))
2160            .with_timestamp(Some(ts()))
2161        );
2162    }
2163
2164    #[tokio::test]
2165    async fn transform_set() {
2166        let config = LogToMetricConfig {
2167            metrics: None,
2168            all_metrics: Some(true),
2169        };
2170
2171        let json_str = r#"{
2172          "set": {
2173            "values": ["990.0", "1234"]
2174          },
2175          "kind": "incremental",
2176          "name": "test.transform.set",
2177          "tags": {
2178            "env": "test_env",
2179            "host": "localhost"
2180          }
2181        }"#;
2182        let log = create_log_event(json_str);
2183        let metric = do_transform(config, log.clone()).await.unwrap();
2184        assert_eq!(
2185            *metric.as_metric(),
2186            Metric::new_with_metadata(
2187                "test.transform.set",
2188                MetricKind::Incremental,
2189                MetricValue::Set {
2190                    values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2191                },
2192                metric.metadata().clone(),
2193            )
2194            .with_namespace(Some(TEST_NAMESPACE))
2195            .with_tags(Some(metric_tags!(
2196                "env" => "test_env",
2197                "host" => "localhost",
2198            )))
2199            .with_timestamp(Some(ts()))
2200        );
2201    }
2202
2203    #[tokio::test]
2204    async fn transform_all_metrics_optional_namespace() {
2205        let config = LogToMetricConfig {
2206            metrics: None,
2207            all_metrics: Some(true),
2208        };
2209
2210        let json_str = r#"{
2211          "counter": {
2212            "value": 10.0
2213          },
2214          "kind": "incremental",
2215          "name": "test.transform.counter",
2216          "tags": {
2217            "env": "test_env",
2218            "host": "localhost"
2219          }
2220        }"#;
2221        let log = create_log_event_with_namespace(json_str, None);
2222        let metric = do_transform(config, log.clone()).await.unwrap();
2223        assert_eq!(
2224            *metric.as_metric(),
2225            Metric::new_with_metadata(
2226                "test.transform.counter",
2227                MetricKind::Incremental,
2228                MetricValue::Counter { value: 10.0 },
2229                metric.metadata().clone(),
2230            )
2231            .with_tags(Some(metric_tags!(
2232                "env" => "test_env",
2233                "host" => "localhost",
2234            )))
2235            .with_timestamp(Some(ts()))
2236        );
2237    }
2238}