1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6 config::LogNamespace,
7 configurable::configurable_component,
8 event::{
9 DatadogMetricOriginMetadata, LogEvent,
10 metric::{Bucket, Quantile, Sample},
11 },
12};
13use vrl::{
14 event_path, path,
15 path::{PathParseError, parse_target_path},
16};
17
18use crate::{
19 common::expansion::pair_expansion,
20 config::{
21 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
22 TransformOutput, schema::Definition,
23 },
24 event::{
25 Event, Value,
26 metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
27 },
28 internal_events::{
29 DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
30 MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
31 MetricMetadataParseError, ParserMissingFieldError,
32 },
33 schema,
34 template::{Template, TemplateRenderingError},
35 transforms::{
36 FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
37 },
38};
39
40const ORIGIN_SERVICE_VALUE: u32 = 3;
41
42#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct LogToMetricConfig {
47 pub metrics: Option<Vec<MetricConfig>>,
49
50 pub all_metrics: Option<bool>,
78}
79
80#[configurable_component]
82#[derive(Clone, Debug)]
83pub struct CounterConfig {
84 #[serde(default = "default_increment_by_value")]
86 pub increment_by_value: bool,
87
88 #[configurable(derived)]
89 #[serde(default = "default_kind")]
90 pub kind: MetricKind,
91}
92
93#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct MetricConfig {
102 pub field: Template,
104
105 pub name: Option<Template>,
109
110 pub namespace: Option<Template>,
112
113 #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
118 pub tags: Option<IndexMap<Template, TagConfig>>,
119
120 #[configurable(derived)]
121 #[serde(flatten)]
122 pub metric: MetricTypeConfig,
123}
124
125#[configurable_component]
129#[derive(Clone, Debug)]
130#[serde(untagged)]
131pub enum TagConfig {
132 Plain(Option<Template>),
134
135 Multi(Vec<Option<Template>>),
137}
138
139#[configurable_component]
141#[derive(Clone, Debug)]
142#[serde(tag = "type", rename_all = "snake_case")]
143#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
144pub enum MetricTypeConfig {
145 Counter(CounterConfig),
147
148 Histogram,
150
151 Gauge,
153
154 Set,
156
157 Summary,
159}
160
161impl MetricConfig {
162 fn field(&self) -> &str {
163 self.field.get_ref()
164 }
165}
166
167const fn default_increment_by_value() -> bool {
168 false
169}
170
171const fn default_kind() -> MetricKind {
172 MetricKind::Incremental
173}
174
175#[derive(Debug, Clone)]
176pub struct LogToMetric {
177 pub metrics: Vec<MetricConfig>,
178 pub all_metrics: bool,
179}
180
181impl GenerateConfig for LogToMetricConfig {
182 fn generate_config() -> toml::Value {
183 toml::Value::try_from(Self {
184 metrics: Some(vec![MetricConfig {
185 field: "field_name".try_into().expect("Fixed template"),
186 name: None,
187 namespace: None,
188 tags: None,
189 metric: MetricTypeConfig::Counter(CounterConfig {
190 increment_by_value: false,
191 kind: MetricKind::Incremental,
192 }),
193 }]),
194 all_metrics: Some(true),
195 })
196 .unwrap()
197 }
198}
199
200#[async_trait::async_trait]
201#[typetag::serde(name = "log_to_metric")]
202impl TransformConfig for LogToMetricConfig {
203 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
204 Ok(Transform::function(LogToMetric {
205 metrics: self.metrics.clone().unwrap_or_default(),
206 all_metrics: self.all_metrics.unwrap_or_default(),
207 }))
208 }
209
210 fn input(&self) -> Input {
211 Input::log()
212 }
213
214 fn outputs(
215 &self,
216 _: vector_lib::enrichment::TableRegistry,
217 _: &[(OutputId, schema::Definition)],
218 _: LogNamespace,
219 ) -> Vec<TransformOutput> {
220 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
222 }
223
224 fn enable_concurrency(&self) -> bool {
225 true
226 }
227}
228
229#[configurable_component]
231#[derive(Clone, Debug)]
232pub enum TransformParseErrorKind {
233 FloatError,
235 IntError,
237 ArrayError,
239}
240
241impl std::fmt::Display for TransformParseErrorKind {
242 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
243 write!(f, "{self:?}")
244 }
245}
246
247enum TransformError {
248 PathNotFound {
249 path: String,
250 },
251 PathNull {
252 path: String,
253 },
254 MetricDetailsNotFound,
255 MetricValueError {
256 path: String,
257 path_value: String,
258 },
259 ParseError {
260 path: String,
261 kind: TransformParseErrorKind,
262 },
263 ParseFloatError {
264 path: String,
265 error: ParseFloatError,
266 },
267 TemplateRenderingError(TemplateRenderingError),
268 PairExpansionError {
269 key: String,
270 value: String,
271 error: serde_json::Error,
272 },
273}
274
275fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
276 template
277 .render_string(event)
278 .map_err(TransformError::TemplateRenderingError)
279}
280
281fn render_tags(
282 tags: &Option<IndexMap<Template, TagConfig>>,
283 event: &Event,
284) -> Result<Option<MetricTags>, TransformError> {
285 let mut static_tags: HashMap<String, String> = HashMap::new();
286 let mut dynamic_tags: HashMap<String, String> = HashMap::new();
287 Ok(match tags {
288 None => None,
289 Some(tags) => {
290 let mut result = MetricTags::default();
291 for (name, config) in tags {
292 match config {
293 TagConfig::Plain(template) => {
294 render_tag_into(
295 event,
296 name,
297 template.as_ref(),
298 &mut result,
299 &mut static_tags,
300 &mut dynamic_tags,
301 )?;
302 }
303 TagConfig::Multi(vec) => {
304 for template in vec {
305 render_tag_into(
306 event,
307 name,
308 template.as_ref(),
309 &mut result,
310 &mut static_tags,
311 &mut dynamic_tags,
312 )?;
313 }
314 }
315 }
316 }
317 for (k, v) in static_tags {
318 if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
319 warn!(
320 "Static tags overrides dynamic tags. \
321 key: {}, value: {:?}, discarded value: {:?}",
322 k, v, discarded_v
323 );
324 };
325 }
326 result.as_option()
327 }
328 })
329}
330
331fn render_tag_into(
332 event: &Event,
333 key_template: &Template,
334 value_template: Option<&Template>,
335 result: &mut MetricTags,
336 static_tags: &mut HashMap<String, String>,
337 dynamic_tags: &mut HashMap<String, String>,
338) -> Result<(), TransformError> {
339 let key = match render_template(key_template, event) {
340 Ok(key_s) => key_s,
341 Err(TransformError::TemplateRenderingError(err)) => {
342 emit!(crate::internal_events::TemplateRenderingError {
343 error: err,
344 drop_event: false,
345 field: Some(key_template.get_ref()),
346 });
347 return Ok(());
348 }
349 Err(err) => return Err(err),
350 };
351 match value_template {
352 None => {
353 result.insert(key, TagValue::Bare);
354 }
355 Some(template) => match render_template(template, event) {
356 Ok(value) => {
357 let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
358 .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
359 result.extend(expanded_pairs);
360 }
361 Err(TransformError::TemplateRenderingError(value_error)) => {
362 emit!(crate::internal_events::TemplateRenderingError {
363 error: value_error,
364 drop_event: false,
365 field: Some(template.get_ref()),
366 });
367 return Ok(());
368 }
369 Err(other) => return Err(other),
370 },
371 };
372 Ok(())
373}
374
375fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
376 let log = event.as_log();
377
378 let timestamp = log
379 .get_timestamp()
380 .and_then(Value::as_timestamp)
381 .cloned()
382 .or_else(|| Some(Utc::now()));
383
384 let metadata = event
386 .metadata()
387 .clone()
388 .with_schema_definition(&Arc::new(Definition::any()))
389 .with_origin_metadata(DatadogMetricOriginMetadata::new(
390 None,
391 None,
392 Some(ORIGIN_SERVICE_VALUE),
393 ));
394
395 let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
396 path: config.field().to_string(),
397 })?;
398
399 let value = match log.get(&field) {
400 None => Err(TransformError::PathNotFound {
401 path: field.to_string(),
402 }),
403 Some(Value::Null) => Err(TransformError::PathNull {
404 path: field.to_string(),
405 }),
406 Some(value) => Ok(value),
407 }?;
408
409 let name = config.name.as_ref().unwrap_or(&config.field);
410 let name = render_template(name, event)?;
411
412 let namespace = config.namespace.as_ref();
413 let namespace = namespace
414 .map(|namespace| render_template(namespace, event))
415 .transpose()?;
416
417 let tags = render_tags(&config.tags, event)?;
418
419 let (kind, value) = match &config.metric {
420 MetricTypeConfig::Counter(counter) => {
421 let value = if counter.increment_by_value {
422 value.to_string_lossy().parse().map_err(|error| {
423 TransformError::ParseFloatError {
424 path: config.field.get_ref().to_owned(),
425 error,
426 }
427 })?
428 } else {
429 1.0
430 };
431
432 (counter.kind, MetricValue::Counter { value })
433 }
434 MetricTypeConfig::Histogram => {
435 let value = value.to_string_lossy().parse().map_err(|error| {
436 TransformError::ParseFloatError {
437 path: field.to_string(),
438 error,
439 }
440 })?;
441
442 (
443 MetricKind::Incremental,
444 MetricValue::Distribution {
445 samples: vector_lib::samples![value => 1],
446 statistic: StatisticKind::Histogram,
447 },
448 )
449 }
450 MetricTypeConfig::Summary => {
451 let value = value.to_string_lossy().parse().map_err(|error| {
452 TransformError::ParseFloatError {
453 path: field.to_string(),
454 error,
455 }
456 })?;
457
458 (
459 MetricKind::Incremental,
460 MetricValue::Distribution {
461 samples: vector_lib::samples![value => 1],
462 statistic: StatisticKind::Summary,
463 },
464 )
465 }
466 MetricTypeConfig::Gauge => {
467 let value = value.to_string_lossy().parse().map_err(|error| {
468 TransformError::ParseFloatError {
469 path: field.to_string(),
470 error,
471 }
472 })?;
473
474 (MetricKind::Absolute, MetricValue::Gauge { value })
475 }
476 MetricTypeConfig::Set => {
477 let value = value.to_string_lossy().into_owned();
478
479 (
480 MetricKind::Incremental,
481 MetricValue::Set {
482 values: std::iter::once(value).collect(),
483 },
484 )
485 }
486 };
487 Ok(Metric::new_with_metadata(name, kind, value, metadata)
488 .with_namespace(namespace)
489 .with_tags(tags)
490 .with_timestamp(timestamp))
491}
492
493fn bytes_to_str(value: &Value) -> Option<String> {
494 match value {
495 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
496 _ => None,
497 }
498}
499
500fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
501 let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
503 PathParseError::InvalidPathSyntax { path } => PathNotFound {
504 path: path.to_string(),
505 },
506 })?;
507 match maybe_value {
508 None => Err(PathNotFound {
509 path: path.to_string(),
510 }),
511 Some(v) => Ok(bytes_to_str(v)),
512 }
513}
514
515fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
516 let counter_value = log
517 .get(event_path!("counter", "value"))
518 .ok_or_else(|| TransformError::PathNotFound {
519 path: "counter.value".to_string(),
520 })?
521 .as_float()
522 .ok_or_else(|| TransformError::ParseError {
523 path: "counter.value".to_string(),
524 kind: TransformParseErrorKind::FloatError,
525 })?;
526
527 Ok(MetricValue::Counter {
528 value: *counter_value,
529 })
530}
531
532fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
533 let gauge_value = log
534 .get(event_path!("gauge", "value"))
535 .ok_or_else(|| TransformError::PathNotFound {
536 path: "gauge.value".to_string(),
537 })?
538 .as_float()
539 .ok_or_else(|| TransformError::ParseError {
540 path: "gauge.value".to_string(),
541 kind: TransformParseErrorKind::FloatError,
542 })?;
543 Ok(MetricValue::Gauge {
544 value: *gauge_value,
545 })
546}
547
548fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
549 let set_values = log
550 .get(event_path!("set", "values"))
551 .ok_or_else(|| TransformError::PathNotFound {
552 path: "set.values".to_string(),
553 })?
554 .as_array()
555 .ok_or_else(|| TransformError::ParseError {
556 path: "set.values".to_string(),
557 kind: TransformParseErrorKind::ArrayError,
558 })?;
559
560 let mut values: Vec<String> = Vec::new();
561 for e_value in set_values {
562 let value = e_value
563 .as_bytes()
564 .ok_or_else(|| TransformError::ParseError {
565 path: "set.values".to_string(),
566 kind: TransformParseErrorKind::ArrayError,
567 })?;
568 values.push(String::from_utf8_lossy(value).to_string());
569 }
570
571 Ok(MetricValue::Set {
572 values: values.into_iter().collect(),
573 })
574}
575
576fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
577 let event_samples = log
578 .get(event_path!("distribution", "samples"))
579 .ok_or_else(|| TransformError::PathNotFound {
580 path: "distribution.samples".to_string(),
581 })?
582 .as_array()
583 .ok_or_else(|| TransformError::ParseError {
584 path: "distribution.samples".to_string(),
585 kind: TransformParseErrorKind::ArrayError,
586 })?;
587
588 let mut samples: Vec<Sample> = Vec::new();
589 for e_sample in event_samples {
590 let value = e_sample
591 .get(path!("value"))
592 .ok_or_else(|| TransformError::PathNotFound {
593 path: "value".to_string(),
594 })?
595 .as_float()
596 .ok_or_else(|| TransformError::ParseError {
597 path: "value".to_string(),
598 kind: TransformParseErrorKind::FloatError,
599 })?;
600
601 let rate = e_sample
602 .get(path!("rate"))
603 .ok_or_else(|| TransformError::PathNotFound {
604 path: "rate".to_string(),
605 })?
606 .as_integer()
607 .ok_or_else(|| TransformError::ParseError {
608 path: "rate".to_string(),
609 kind: TransformParseErrorKind::IntError,
610 })?;
611
612 samples.push(Sample {
613 value: *value,
614 rate: rate as u32,
615 });
616 }
617
618 let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
619 Some(n) => n,
620 None => {
621 return Err(TransformError::PathNotFound {
622 path: "distribution.statistic".to_string(),
623 });
624 }
625 };
626 let statistic_kind = match statistic_str.as_str() {
627 "histogram" => Ok(StatisticKind::Histogram),
628 "summary" => Ok(StatisticKind::Summary),
629 _ => Err(TransformError::MetricValueError {
630 path: "distribution.statistic".to_string(),
631 path_value: statistic_str.to_string(),
632 }),
633 }?;
634
635 Ok(MetricValue::Distribution {
636 samples,
637 statistic: statistic_kind,
638 })
639}
640
641fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
642 let event_buckets = log
643 .get(event_path!("histogram", "buckets"))
644 .ok_or_else(|| TransformError::PathNotFound {
645 path: "histogram.buckets".to_string(),
646 })?
647 .as_array()
648 .ok_or_else(|| TransformError::ParseError {
649 path: "histogram.buckets".to_string(),
650 kind: TransformParseErrorKind::ArrayError,
651 })?;
652
653 let mut buckets: Vec<Bucket> = Vec::new();
654 for e_bucket in event_buckets {
655 let upper_limit = e_bucket
656 .get(path!("upper_limit"))
657 .ok_or_else(|| TransformError::PathNotFound {
658 path: "histogram.buckets.upper_limit".to_string(),
659 })?
660 .as_float()
661 .ok_or_else(|| TransformError::ParseError {
662 path: "histogram.buckets.upper_limit".to_string(),
663 kind: TransformParseErrorKind::FloatError,
664 })?;
665
666 let count = e_bucket
667 .get(path!("count"))
668 .ok_or_else(|| TransformError::PathNotFound {
669 path: "histogram.buckets.count".to_string(),
670 })?
671 .as_integer()
672 .ok_or_else(|| TransformError::ParseError {
673 path: "histogram.buckets.count".to_string(),
674 kind: TransformParseErrorKind::IntError,
675 })?;
676
677 buckets.push(Bucket {
678 upper_limit: *upper_limit,
679 count: count as u64,
680 });
681 }
682
683 let count = log
684 .get(event_path!("histogram", "count"))
685 .ok_or_else(|| TransformError::PathNotFound {
686 path: "histogram.count".to_string(),
687 })?
688 .as_integer()
689 .ok_or_else(|| TransformError::ParseError {
690 path: "histogram.count".to_string(),
691 kind: TransformParseErrorKind::IntError,
692 })?;
693
694 let sum = log
695 .get(event_path!("histogram", "sum"))
696 .ok_or_else(|| TransformError::PathNotFound {
697 path: "histogram.sum".to_string(),
698 })?
699 .as_float()
700 .ok_or_else(|| TransformError::ParseError {
701 path: "histogram.sum".to_string(),
702 kind: TransformParseErrorKind::FloatError,
703 })?;
704
705 Ok(MetricValue::AggregatedHistogram {
706 buckets,
707 count: count as u64,
708 sum: *sum,
709 })
710}
711
712fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
713 let event_quantiles = log
714 .get(event_path!("summary", "quantiles"))
715 .ok_or_else(|| TransformError::PathNotFound {
716 path: "summary.quantiles".to_string(),
717 })?
718 .as_array()
719 .ok_or_else(|| TransformError::ParseError {
720 path: "summary.quantiles".to_string(),
721 kind: TransformParseErrorKind::ArrayError,
722 })?;
723
724 let mut quantiles: Vec<Quantile> = Vec::new();
725 for e_quantile in event_quantiles {
726 let quantile = e_quantile
727 .get(path!("quantile"))
728 .ok_or_else(|| TransformError::PathNotFound {
729 path: "summary.quantiles.quantile".to_string(),
730 })?
731 .as_float()
732 .ok_or_else(|| TransformError::ParseError {
733 path: "summary.quantiles.quantile".to_string(),
734 kind: TransformParseErrorKind::FloatError,
735 })?;
736
737 let value = e_quantile
738 .get(path!("value"))
739 .ok_or_else(|| TransformError::PathNotFound {
740 path: "summary.quantiles.value".to_string(),
741 })?
742 .as_float()
743 .ok_or_else(|| TransformError::ParseError {
744 path: "summary.quantiles.value".to_string(),
745 kind: TransformParseErrorKind::FloatError,
746 })?;
747
748 quantiles.push(Quantile {
749 quantile: *quantile,
750 value: *value,
751 })
752 }
753
754 let count = log
755 .get(event_path!("summary", "count"))
756 .ok_or_else(|| TransformError::PathNotFound {
757 path: "summary.count".to_string(),
758 })?
759 .as_integer()
760 .ok_or_else(|| TransformError::ParseError {
761 path: "summary.count".to_string(),
762 kind: TransformParseErrorKind::IntError,
763 })?;
764
765 let sum = log
766 .get(event_path!("summary", "sum"))
767 .ok_or_else(|| TransformError::PathNotFound {
768 path: "summary.sum".to_string(),
769 })?
770 .as_float()
771 .ok_or_else(|| TransformError::ParseError {
772 path: "summary.sum".to_string(),
773 kind: TransformParseErrorKind::FloatError,
774 })?;
775
776 Ok(MetricValue::AggregatedSummary {
777 quantiles,
778 count: count as u64,
779 sum: *sum,
780 })
781}
782
783fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
784 let log = event.as_log();
785 let timestamp = log
786 .get_timestamp()
787 .and_then(Value::as_timestamp)
788 .cloned()
789 .or_else(|| Some(Utc::now()));
790
791 let name = match try_get_string_from_log(log, "name")? {
792 Some(n) => n,
793 None => {
794 return Err(TransformError::PathNotFound {
795 path: "name".to_string(),
796 });
797 }
798 };
799
800 let mut tags = MetricTags::default();
801
802 if let Some(els) = log.get(event_path!("tags"))
803 && let Some(el) = els.as_object()
804 {
805 for (key, value) in el {
806 tags.insert(key.to_string(), bytes_to_str(value));
807 }
808 }
809 let tags_result = Some(tags);
810
811 let kind_str = match try_get_string_from_log(log, "kind")? {
812 Some(n) => n,
813 None => {
814 return Err(TransformError::PathNotFound {
815 path: "kind".to_string(),
816 });
817 }
818 };
819
820 let kind = match kind_str.as_str() {
821 "absolute" => Ok(MetricKind::Absolute),
822 "incremental" => Ok(MetricKind::Incremental),
823 value => Err(TransformError::MetricValueError {
824 path: "kind".to_string(),
825 path_value: value.to_string(),
826 }),
827 }?;
828
829 let mut value: Option<MetricValue> = None;
830 if let Some(root_event) = log.as_map() {
831 for key in root_event.keys() {
832 value = match key.as_str() {
833 "gauge" => Some(get_gauge_value(log)?),
834 "distribution" => Some(get_distribution_value(log)?),
835 "histogram" => Some(get_histogram_value(log)?),
836 "summary" => Some(get_summary_value(log)?),
837 "counter" => Some(get_counter_value(log)?),
838 "set" => Some(get_set_value(log)?),
839 _ => None,
840 };
841
842 if value.is_some() {
843 break;
844 }
845 }
846 }
847
848 let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
849
850 let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
851 .with_tags(tags_result)
852 .with_timestamp(timestamp);
853
854 if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
855 metric = metric.with_namespace(namespace);
856 }
857
858 Ok(metric)
859}
860
861impl FunctionTransform for LogToMetric {
862 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
863 let mut buffer = Vec::with_capacity(self.metrics.len());
865 if self.all_metrics {
866 match to_metrics(&event) {
867 Ok(metric) => {
868 output.push(Event::Metric(metric));
869 }
870 Err(err) => {
871 match err {
872 TransformError::MetricValueError { path, path_value } => {
873 emit!(MetricMetadataInvalidFieldValueError {
874 field: path.as_ref(),
875 field_value: path_value.as_ref()
876 })
877 }
878 TransformError::PathNotFound { path } => {
879 emit!(ParserMissingFieldError::<DROP_EVENT> {
880 field: path.as_ref()
881 })
882 }
883 TransformError::ParseError { path, kind } => {
884 emit!(MetricMetadataParseError {
885 field: path.as_ref(),
886 kind: &kind.to_string(),
887 })
888 }
889 TransformError::MetricDetailsNotFound => {
890 emit!(MetricMetadataMetricDetailsNotFoundError {})
891 }
892 TransformError::PairExpansionError { key, value, error } => {
893 emit!(crate::internal_events::PairExpansionError {
894 key: &key,
895 value: &value,
896 drop_event: true,
897 error
898 })
899 }
900 _ => {}
901 };
902 }
903 }
904 } else {
905 for config in self.metrics.iter() {
906 match to_metric_with_config(config, &event) {
907 Ok(metric) => {
908 buffer.push(Event::Metric(metric));
909 }
910 Err(err) => {
911 match err {
912 TransformError::PathNull { path } => {
913 emit!(LogToMetricFieldNullError {
914 field: path.as_ref()
915 })
916 }
917 TransformError::PathNotFound { path } => {
918 emit!(ParserMissingFieldError::<DROP_EVENT> {
919 field: path.as_ref()
920 })
921 }
922 TransformError::ParseFloatError { path, error } => {
923 emit!(LogToMetricParseFloatError {
924 field: path.as_ref(),
925 error
926 })
927 }
928 TransformError::TemplateRenderingError(error) => {
929 emit!(crate::internal_events::TemplateRenderingError {
930 error,
931 drop_event: true,
932 field: None,
933 })
934 }
935 TransformError::PairExpansionError { key, value, error } => {
936 emit!(crate::internal_events::PairExpansionError {
937 key: &key,
938 value: &value,
939 drop_event: true,
940 error
941 })
942 }
943 _ => {}
944 };
945 return;
947 }
948 }
949 }
950 }
951
952 for event in buffer {
954 output.push(event);
955 }
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use std::{sync::Arc, time::Duration};
962
963 use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
964 use tokio::sync::mpsc;
965 use tokio_stream::wrappers::ReceiverStream;
966 use vector_lib::{
967 config::ComponentKey,
968 event::{EventMetadata, ObjectMap},
969 metric_tags,
970 };
971
972 use super::*;
973 use crate::{
974 config::log_schema,
975 event::{
976 Event, LogEvent,
977 metric::{Metric, MetricKind, MetricValue, StatisticKind},
978 },
979 test_util::components::assert_transform_compliance,
980 transforms::test::create_topology,
981 };
982
983 #[test]
984 fn generate_config() {
985 crate::test_util::test_generate_config::<LogToMetricConfig>();
986 }
987
988 fn parse_config(s: &str) -> LogToMetricConfig {
989 toml::from_str(s).unwrap()
990 }
991
992 fn parse_yaml_config(s: &str) -> LogToMetricConfig {
993 serde_yaml::from_str(s).unwrap()
994 }
995
996 fn ts() -> DateTime<Utc> {
997 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
998 .single()
999 .and_then(|t| t.with_nanosecond(11))
1000 .expect("invalid timestamp")
1001 }
1002
1003 fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1004 let mut log = Event::Log(LogEvent::from("i am a log"));
1005 log.as_mut_log().insert(key, value);
1006 log.as_mut_log()
1007 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1008 log
1009 }
1010
1011 async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1012 assert_transform_compliance(async move {
1013 let (tx, rx) = mpsc::channel(1);
1014 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1015 tx.send(event).await.unwrap();
1016 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1017 .await
1018 .unwrap_or(None);
1019 drop(tx);
1020 topology.stop().await;
1021 assert_eq!(out.recv().await, None);
1022 result
1023 })
1024 .await
1025 }
1026
1027 async fn do_transform_multiple_events(
1028 config: LogToMetricConfig,
1029 event: Event,
1030 count: usize,
1031 ) -> Vec<Event> {
1032 assert_transform_compliance(async move {
1033 let (tx, rx) = mpsc::channel(1);
1034 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1035 tx.send(event).await.unwrap();
1036
1037 let mut results = vec![];
1038 for _ in 0..count {
1039 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1040 .await
1041 .unwrap_or(None);
1042 if let Some(event) = result {
1043 results.push(event);
1044 }
1045 }
1046
1047 drop(tx);
1048 topology.stop().await;
1049 assert_eq!(out.recv().await, None);
1050 results
1051 })
1052 .await
1053 }
1054
1055 #[tokio::test]
1056 async fn count_http_status_codes() {
1057 let config = parse_config(
1058 r#"
1059 [[metrics]]
1060 type = "counter"
1061 field = "status"
1062 "#,
1063 );
1064
1065 let event = create_event("status", "42");
1066 let mut metadata =
1067 event
1068 .metadata()
1069 .clone()
1070 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1071 None,
1072 None,
1073 Some(ORIGIN_SERVICE_VALUE),
1074 ));
1075 metadata.set_schema_definition(&Arc::new(Definition::any()));
1077 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1078 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1079 let metric = do_transform(config, event).await.unwrap();
1080
1081 assert_eq!(
1082 metric.into_metric(),
1083 Metric::new_with_metadata(
1084 "status",
1085 MetricKind::Incremental,
1086 MetricValue::Counter { value: 1.0 },
1087 metadata,
1088 )
1089 .with_timestamp(Some(ts()))
1090 );
1091 }
1092
1093 #[tokio::test]
1094 async fn count_http_requests_with_tags() {
1095 let config = parse_config(
1096 r#"
1097 [[metrics]]
1098 type = "counter"
1099 field = "message"
1100 name = "http_requests_total"
1101 namespace = "app"
1102 tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1103 "#,
1104 );
1105
1106 let mut event = create_event("message", "i am log");
1107 event.as_mut_log().insert("method", "post");
1108 event.as_mut_log().insert("code", "200");
1109 let mut metadata =
1110 event
1111 .metadata()
1112 .clone()
1113 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1114 None,
1115 None,
1116 Some(ORIGIN_SERVICE_VALUE),
1117 ));
1118 metadata.set_schema_definition(&Arc::new(Definition::any()));
1120 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1121 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1122
1123 let metric = do_transform(config, event).await.unwrap();
1124
1125 assert_eq!(
1126 metric.into_metric(),
1127 Metric::new_with_metadata(
1128 "http_requests_total",
1129 MetricKind::Incremental,
1130 MetricValue::Counter { value: 1.0 },
1131 metadata,
1132 )
1133 .with_namespace(Some("app"))
1134 .with_tags(Some(metric_tags!(
1135 "method" => "post",
1136 "code" => "200",
1137 "host" => "localhost",
1138 )))
1139 .with_timestamp(Some(ts()))
1140 );
1141 }
1142
1143 #[tokio::test]
1144 async fn count_http_requests_with_tags_expansion() {
1145 let config = parse_config(
1146 r#"
1147 [[metrics]]
1148 type = "counter"
1149 field = "message"
1150 name = "http_requests_total"
1151 namespace = "app"
1152 tags = {"*" = "{{ dict }}"}
1153 "#,
1154 );
1155
1156 let mut event = create_event("message", "i am log");
1157 let log = event.as_mut_log();
1158
1159 let mut test_dict = ObjectMap::default();
1160 test_dict.insert("one".into(), Value::from("foo"));
1161 test_dict.insert("two".into(), Value::from("baz"));
1162 log.insert("dict", Value::from(test_dict));
1163
1164 let mut metadata =
1165 event
1166 .metadata()
1167 .clone()
1168 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1169 None,
1170 None,
1171 Some(ORIGIN_SERVICE_VALUE),
1172 ));
1173 metadata.set_schema_definition(&Arc::new(Definition::any()));
1175 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1176 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1177
1178 let metric = do_transform(config, event).await.unwrap();
1179
1180 assert_eq!(
1181 metric.into_metric(),
1182 Metric::new_with_metadata(
1183 "http_requests_total",
1184 MetricKind::Incremental,
1185 MetricValue::Counter { value: 1.0 },
1186 metadata,
1187 )
1188 .with_namespace(Some("app"))
1189 .with_tags(Some(metric_tags!(
1190 "one" => "foo",
1191 "two" => "baz",
1192 )))
1193 .with_timestamp(Some(ts()))
1194 );
1195 }
1196 #[tokio::test]
1197 async fn count_http_requests_with_colliding_dynamic_tags() {
1198 let config = parse_config(
1199 r#"
1200 [[metrics]]
1201 type = "counter"
1202 field = "message"
1203 name = "http_requests_total"
1204 namespace = "app"
1205 tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1206 "#,
1207 );
1208
1209 let mut event = create_event("message", "i am log");
1210 let log = event.as_mut_log();
1211
1212 let mut map1 = ObjectMap::default();
1213 map1.insert("key1".into(), Value::from("val1"));
1214 log.insert("map1", Value::from(map1));
1215
1216 let mut map2 = ObjectMap::default();
1217 map2.insert("l1_key1".into(), Value::from("val2"));
1218 log.insert("map2", Value::from(map2));
1219
1220 let mut metadata =
1221 event
1222 .metadata()
1223 .clone()
1224 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1225 None,
1226 None,
1227 Some(ORIGIN_SERVICE_VALUE),
1228 ));
1229 metadata.set_schema_definition(&Arc::new(Definition::any()));
1231 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1232 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1233
1234 let metric = do_transform(config, event).await.unwrap().into_metric();
1235 let tags = metric.tags().expect("Metric should have tags");
1236
1237 assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1238
1239 assert_eq!(tags.iter_all().count(), 2);
1240 for (name, value) in tags.iter_all() {
1241 assert_eq!(name, "l1_key1");
1242 assert!(value == Some("val1") || value == Some("val2"));
1243 }
1244 }
1245 #[tokio::test]
1246 async fn multi_value_tags_yaml() {
1247 let config = parse_yaml_config(
1249 r#"
1250 metrics:
1251 - field: "message"
1252 type: "counter"
1253 tags:
1254 tag:
1255 - "one"
1256 - null
1257 - "two"
1258 "#,
1259 );
1260
1261 let event = create_event("message", "I am log");
1262 let metric = do_transform(config, event).await.unwrap().into_metric();
1263 let tags = metric.tags().expect("Metric should have tags");
1264
1265 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1266
1267 assert_eq!(tags.iter_all().count(), 3);
1268 for (name, value) in tags.iter_all() {
1269 assert_eq!(name, "tag");
1270 assert!(value.is_none() || value == Some("one") || value == Some("two"));
1271 }
1272 }
1273 #[tokio::test]
1274 async fn multi_value_tags_expansion_yaml() {
1275 let config = parse_yaml_config(
1277 r#"
1278 metrics:
1279 - field: "message"
1280 type: "counter"
1281 tags:
1282 "*": "{{dict}}"
1283 "#,
1284 );
1285
1286 let mut event = create_event("message", "I am log");
1287 let log = event.as_mut_log();
1288
1289 let mut test_dict = ObjectMap::default();
1290 test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1291 log.insert("dict", Value::from(test_dict));
1292
1293 let metric = do_transform(config, event).await.unwrap().into_metric();
1294 let tags = metric.tags().expect("Metric should have tags");
1295
1296 assert_eq!(
1297 tags.iter_single().collect::<Vec<_>>(),
1298 vec![("one", "[\"foo\",\"baz\"]")]
1299 );
1300
1301 assert_eq!(tags.iter_all().count(), 1);
1302 for (name, value) in tags.iter_all() {
1303 assert_eq!(name, "one");
1304 assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1305 }
1306 }
1307
1308 #[tokio::test]
1309 async fn multi_value_tags_toml() {
1310 let config = parse_config(
1311 r#"
1312 [[metrics]]
1313 field = "message"
1314 type = "counter"
1315 [metrics.tags]
1316 tag = ["one", "two"]
1317 "#,
1318 );
1319
1320 let event = create_event("message", "I am log");
1321 let metric = do_transform(config, event).await.unwrap().into_metric();
1322 let tags = metric.tags().expect("Metric should have tags");
1323
1324 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1325
1326 assert_eq!(tags.iter_all().count(), 2);
1327 for (name, value) in tags.iter_all() {
1328 assert_eq!(name, "tag");
1329 assert!(value == Some("one") || value == Some("two"));
1330 }
1331 }
1332
1333 #[tokio::test]
1334 async fn count_exceptions() {
1335 let config = parse_config(
1336 r#"
1337 [[metrics]]
1338 type = "counter"
1339 field = "backtrace"
1340 name = "exception_total"
1341 "#,
1342 );
1343
1344 let event = create_event("backtrace", "message");
1345 let mut metadata =
1346 event
1347 .metadata()
1348 .clone()
1349 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1350 None,
1351 None,
1352 Some(ORIGIN_SERVICE_VALUE),
1353 ));
1354 metadata.set_schema_definition(&Arc::new(Definition::any()));
1356 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1357 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1358
1359 let metric = do_transform(config, event).await.unwrap();
1360
1361 assert_eq!(
1362 metric.into_metric(),
1363 Metric::new_with_metadata(
1364 "exception_total",
1365 MetricKind::Incremental,
1366 MetricValue::Counter { value: 1.0 },
1367 metadata
1368 )
1369 .with_timestamp(Some(ts()))
1370 );
1371 }
1372
1373 #[tokio::test]
1374 async fn count_exceptions_no_match() {
1375 let config = parse_config(
1376 r#"
1377 [[metrics]]
1378 type = "counter"
1379 field = "backtrace"
1380 name = "exception_total"
1381 "#,
1382 );
1383
1384 let event = create_event("success", "42");
1385 assert_eq!(do_transform(config, event).await, None);
1386 }
1387
1388 #[tokio::test]
1389 async fn sum_order_amounts() {
1390 let config = parse_config(
1391 r#"
1392 [[metrics]]
1393 type = "counter"
1394 field = "amount"
1395 name = "amount_total"
1396 increment_by_value = true
1397 "#,
1398 );
1399
1400 let event = create_event("amount", "33.99");
1401 let mut metadata =
1402 event
1403 .metadata()
1404 .clone()
1405 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1406 None,
1407 None,
1408 Some(ORIGIN_SERVICE_VALUE),
1409 ));
1410 metadata.set_schema_definition(&Arc::new(Definition::any()));
1412 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1413 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1414 let metric = do_transform(config, event).await.unwrap();
1415
1416 assert_eq!(
1417 metric.into_metric(),
1418 Metric::new_with_metadata(
1419 "amount_total",
1420 MetricKind::Incremental,
1421 MetricValue::Counter { value: 33.99 },
1422 metadata,
1423 )
1424 .with_timestamp(Some(ts()))
1425 );
1426 }
1427
1428 #[tokio::test]
1429 async fn count_absolute() {
1430 let config = parse_config(
1431 r#"
1432 [[metrics]]
1433 type = "counter"
1434 field = "amount"
1435 name = "amount_total"
1436 increment_by_value = true
1437 kind = "absolute"
1438 "#,
1439 );
1440
1441 let event = create_event("amount", "33.99");
1442 let mut metadata =
1443 event
1444 .metadata()
1445 .clone()
1446 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1447 None,
1448 None,
1449 Some(ORIGIN_SERVICE_VALUE),
1450 ));
1451 metadata.set_schema_definition(&Arc::new(Definition::any()));
1453 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1454 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1455
1456 let metric = do_transform(config, event).await.unwrap();
1457
1458 assert_eq!(
1459 metric.into_metric(),
1460 Metric::new_with_metadata(
1461 "amount_total",
1462 MetricKind::Absolute,
1463 MetricValue::Counter { value: 33.99 },
1464 metadata,
1465 )
1466 .with_timestamp(Some(ts()))
1467 );
1468 }
1469
1470 #[tokio::test]
1471 async fn memory_usage_gauge() {
1472 let config = parse_config(
1473 r#"
1474 [[metrics]]
1475 type = "gauge"
1476 field = "memory_rss"
1477 name = "memory_rss_bytes"
1478 "#,
1479 );
1480
1481 let event = create_event("memory_rss", "123");
1482 let mut metadata =
1483 event
1484 .metadata()
1485 .clone()
1486 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1487 None,
1488 None,
1489 Some(ORIGIN_SERVICE_VALUE),
1490 ));
1491
1492 metadata.set_schema_definition(&Arc::new(Definition::any()));
1494
1495 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1496 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1497
1498 let metric = do_transform(config, event).await.unwrap();
1499
1500 assert_eq!(
1501 metric.into_metric(),
1502 Metric::new_with_metadata(
1503 "memory_rss_bytes",
1504 MetricKind::Absolute,
1505 MetricValue::Gauge { value: 123.0 },
1506 metadata,
1507 )
1508 .with_timestamp(Some(ts()))
1509 );
1510 }
1511
1512 #[tokio::test]
1513 async fn parse_failure() {
1514 let config = parse_config(
1515 r#"
1516 [[metrics]]
1517 type = "counter"
1518 field = "status"
1519 name = "status_total"
1520 increment_by_value = true
1521 "#,
1522 );
1523
1524 let event = create_event("status", "not a number");
1525 assert_eq!(do_transform(config, event).await, None);
1526 }
1527
1528 #[tokio::test]
1529 async fn missing_field() {
1530 let config = parse_config(
1531 r#"
1532 [[metrics]]
1533 type = "counter"
1534 field = "status"
1535 name = "status_total"
1536 "#,
1537 );
1538
1539 let event = create_event("not foo", "not a number");
1540 assert_eq!(do_transform(config, event).await, None);
1541 }
1542
1543 #[tokio::test]
1544 async fn null_field() {
1545 let config = parse_config(
1546 r#"
1547 [[metrics]]
1548 type = "counter"
1549 field = "status"
1550 name = "status_total"
1551 "#,
1552 );
1553
1554 let event = create_event("status", Value::Null);
1555 assert_eq!(do_transform(config, event).await, None);
1556 }
1557
1558 #[tokio::test]
1559 async fn multiple_metrics() {
1560 let config = parse_config(
1561 r#"
1562 [[metrics]]
1563 type = "counter"
1564 field = "status"
1565
1566 [[metrics]]
1567 type = "counter"
1568 field = "backtrace"
1569 name = "exception_total"
1570 "#,
1571 );
1572
1573 let mut event = Event::Log(LogEvent::from("i am a log"));
1574 event
1575 .as_mut_log()
1576 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1577 event.as_mut_log().insert("status", "42");
1578 event.as_mut_log().insert("backtrace", "message");
1579 let mut metadata =
1580 event
1581 .metadata()
1582 .clone()
1583 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1584 None,
1585 None,
1586 Some(ORIGIN_SERVICE_VALUE),
1587 ));
1588
1589 metadata.set_schema_definition(&Arc::new(Definition::any()));
1591 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1592 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1593
1594 let output = do_transform_multiple_events(config, event, 2).await;
1595
1596 assert_eq!(2, output.len());
1597 assert_eq!(
1598 output[0].clone().into_metric(),
1599 Metric::new_with_metadata(
1600 "status",
1601 MetricKind::Incremental,
1602 MetricValue::Counter { value: 1.0 },
1603 metadata.clone(),
1604 )
1605 .with_timestamp(Some(ts()))
1606 );
1607 assert_eq!(
1608 output[1].clone().into_metric(),
1609 Metric::new_with_metadata(
1610 "exception_total",
1611 MetricKind::Incremental,
1612 MetricValue::Counter { value: 1.0 },
1613 metadata,
1614 )
1615 .with_timestamp(Some(ts()))
1616 );
1617 }
1618
1619 #[tokio::test]
1620 async fn multiple_metrics_with_multiple_templates() {
1621 let config = parse_config(
1622 r#"
1623 [[metrics]]
1624 type = "set"
1625 field = "status"
1626 name = "{{host}}_{{worker}}_status_set"
1627
1628 [[metrics]]
1629 type = "counter"
1630 field = "backtrace"
1631 name = "{{service}}_exception_total"
1632 namespace = "{{host}}"
1633 "#,
1634 );
1635
1636 let mut event = Event::Log(LogEvent::from("i am a log"));
1637 event
1638 .as_mut_log()
1639 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1640 event.as_mut_log().insert("status", "42");
1641 event.as_mut_log().insert("backtrace", "message");
1642 event.as_mut_log().insert("host", "local");
1643 event.as_mut_log().insert("worker", "abc");
1644 event.as_mut_log().insert("service", "xyz");
1645 let mut metadata =
1646 event
1647 .metadata()
1648 .clone()
1649 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1650 None,
1651 None,
1652 Some(ORIGIN_SERVICE_VALUE),
1653 ));
1654
1655 metadata.set_schema_definition(&Arc::new(Definition::any()));
1657 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1658 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1659
1660 let output = do_transform_multiple_events(config, event, 2).await;
1661
1662 assert_eq!(2, output.len());
1663 assert_eq!(
1664 output[0].as_metric(),
1665 &Metric::new_with_metadata(
1666 "local_abc_status_set",
1667 MetricKind::Incremental,
1668 MetricValue::Set {
1669 values: vec!["42".into()].into_iter().collect()
1670 },
1671 metadata.clone(),
1672 )
1673 .with_timestamp(Some(ts()))
1674 );
1675 assert_eq!(
1676 output[1].as_metric(),
1677 &Metric::new_with_metadata(
1678 "xyz_exception_total",
1679 MetricKind::Incremental,
1680 MetricValue::Counter { value: 1.0 },
1681 metadata,
1682 )
1683 .with_namespace(Some("local"))
1684 .with_timestamp(Some(ts()))
1685 );
1686 }
1687
1688 #[tokio::test]
1689 async fn user_ip_set() {
1690 let config = parse_config(
1691 r#"
1692 [[metrics]]
1693 type = "set"
1694 field = "user_ip"
1695 name = "unique_user_ip"
1696 "#,
1697 );
1698
1699 let event = create_event("user_ip", "1.2.3.4");
1700 let mut metadata =
1701 event
1702 .metadata()
1703 .clone()
1704 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1705 None,
1706 None,
1707 Some(ORIGIN_SERVICE_VALUE),
1708 ));
1709 metadata.set_schema_definition(&Arc::new(Definition::any()));
1711 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1712 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1713
1714 let metric = do_transform(config, event).await.unwrap();
1715
1716 assert_eq!(
1717 metric.into_metric(),
1718 Metric::new_with_metadata(
1719 "unique_user_ip",
1720 MetricKind::Incremental,
1721 MetricValue::Set {
1722 values: vec!["1.2.3.4".into()].into_iter().collect()
1723 },
1724 metadata,
1725 )
1726 .with_timestamp(Some(ts()))
1727 );
1728 }
1729
1730 #[tokio::test]
1731 async fn response_time_histogram() {
1732 let config = parse_config(
1733 r#"
1734 [[metrics]]
1735 type = "histogram"
1736 field = "response_time"
1737 "#,
1738 );
1739
1740 let event = create_event("response_time", "2.5");
1741 let mut metadata =
1742 event
1743 .metadata()
1744 .clone()
1745 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1746 None,
1747 None,
1748 Some(ORIGIN_SERVICE_VALUE),
1749 ));
1750
1751 metadata.set_schema_definition(&Arc::new(Definition::any()));
1753 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1754 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1755
1756 let metric = do_transform(config, event).await.unwrap();
1757
1758 assert_eq!(
1759 metric.into_metric(),
1760 Metric::new_with_metadata(
1761 "response_time",
1762 MetricKind::Incremental,
1763 MetricValue::Distribution {
1764 samples: vector_lib::samples![2.5 => 1],
1765 statistic: StatisticKind::Histogram
1766 },
1767 metadata
1768 )
1769 .with_timestamp(Some(ts()))
1770 );
1771 }
1772
1773 #[tokio::test]
1774 async fn response_time_summary() {
1775 let config = parse_config(
1776 r#"
1777 [[metrics]]
1778 type = "summary"
1779 field = "response_time"
1780 "#,
1781 );
1782
1783 let event = create_event("response_time", "2.5");
1784 let mut metadata =
1785 event
1786 .metadata()
1787 .clone()
1788 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1789 None,
1790 None,
1791 Some(ORIGIN_SERVICE_VALUE),
1792 ));
1793
1794 metadata.set_schema_definition(&Arc::new(Definition::any()));
1796 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1797 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1798
1799 let metric = do_transform(config, event).await.unwrap();
1800
1801 assert_eq!(
1802 metric.into_metric(),
1803 Metric::new_with_metadata(
1804 "response_time",
1805 MetricKind::Incremental,
1806 MetricValue::Distribution {
1807 samples: vector_lib::samples![2.5 => 1],
1808 statistic: StatisticKind::Summary
1809 },
1810 metadata
1811 )
1812 .with_timestamp(Some(ts()))
1813 );
1814 }
1815
1816 fn create_log_event(json_str: &str) -> Event {
1819 create_log_event_with_namespace(json_str, Some("test_namespace"))
1820 }
1821
1822 fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1823 let mut log_value: Value =
1824 serde_json::from_str(json_str).expect("JSON was not well-formatted");
1825 log_value.insert("timestamp", ts());
1826
1827 if let Some(namespace) = namespace {
1828 log_value.insert("namespace", namespace);
1829 }
1830
1831 let mut metadata = EventMetadata::default();
1832 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1833 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1834
1835 Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1836 }
1837
1838 #[tokio::test]
1839 async fn transform_gauge() {
1840 let config = LogToMetricConfig {
1841 metrics: None,
1842 all_metrics: Some(true),
1843 };
1844
1845 let json_str = r#"{
1846 "gauge": {
1847 "value": 990.0
1848 },
1849 "kind": "absolute",
1850 "name": "test.transform.gauge",
1851 "tags": {
1852 "env": "test_env",
1853 "host": "localhost"
1854 }
1855 }"#;
1856 let log = create_log_event(json_str);
1857 let metric = do_transform(config, log.clone()).await.unwrap();
1858 assert_eq!(
1859 *metric.as_metric(),
1860 Metric::new_with_metadata(
1861 "test.transform.gauge",
1862 MetricKind::Absolute,
1863 MetricValue::Gauge { value: 990.0 },
1864 metric.metadata().clone(),
1865 )
1866 .with_namespace(Some("test_namespace"))
1867 .with_tags(Some(metric_tags!(
1868 "env" => "test_env",
1869 "host" => "localhost",
1870 )))
1871 .with_timestamp(Some(ts()))
1872 );
1873 }
1874
1875 #[tokio::test]
1876 async fn transform_histogram() {
1877 let config = LogToMetricConfig {
1878 metrics: None,
1879 all_metrics: Some(true),
1880 };
1881
1882 let json_str = r#"{
1883 "histogram": {
1884 "sum": 18.0,
1885 "count": 5,
1886 "buckets": [
1887 {
1888 "upper_limit": 1.0,
1889 "count": 1
1890 },
1891 {
1892 "upper_limit": 2.0,
1893 "count": 2
1894 },
1895 {
1896 "upper_limit": 5.0,
1897 "count": 1
1898 },
1899 {
1900 "upper_limit": 10.0,
1901 "count": 1
1902 }
1903 ]
1904 },
1905 "kind": "absolute",
1906 "name": "test.transform.histogram",
1907 "tags": {
1908 "env": "test_env",
1909 "host": "localhost"
1910 }
1911 }"#;
1912 let log = create_log_event(json_str);
1913 let metric = do_transform(config, log.clone()).await.unwrap();
1914 assert_eq!(
1915 *metric.as_metric(),
1916 Metric::new_with_metadata(
1917 "test.transform.histogram",
1918 MetricKind::Absolute,
1919 MetricValue::AggregatedHistogram {
1920 count: 5,
1921 sum: 18.0,
1922 buckets: vec![
1923 Bucket {
1924 upper_limit: 1.0,
1925 count: 1,
1926 },
1927 Bucket {
1928 upper_limit: 2.0,
1929 count: 2,
1930 },
1931 Bucket {
1932 upper_limit: 5.0,
1933 count: 1,
1934 },
1935 Bucket {
1936 upper_limit: 10.0,
1937 count: 1,
1938 },
1939 ],
1940 },
1941 metric.metadata().clone(),
1942 )
1943 .with_namespace(Some("test_namespace"))
1944 .with_tags(Some(metric_tags!(
1945 "env" => "test_env",
1946 "host" => "localhost",
1947 )))
1948 .with_timestamp(Some(ts()))
1949 );
1950 }
1951
1952 #[tokio::test]
1953 async fn transform_distribution_histogram() {
1954 let config = LogToMetricConfig {
1955 metrics: None,
1956 all_metrics: Some(true),
1957 };
1958
1959 let json_str = r#"{
1960 "distribution": {
1961 "samples": [
1962 {
1963 "value": 1.0,
1964 "rate": 1
1965 },
1966 {
1967 "value": 2.0,
1968 "rate": 2
1969 }
1970 ],
1971 "statistic": "histogram"
1972 },
1973 "kind": "absolute",
1974 "name": "test.transform.distribution_histogram",
1975 "tags": {
1976 "env": "test_env",
1977 "host": "localhost"
1978 }
1979 }"#;
1980 let log = create_log_event(json_str);
1981 let metric = do_transform(config, log.clone()).await.unwrap();
1982 assert_eq!(
1983 *metric.as_metric(),
1984 Metric::new_with_metadata(
1985 "test.transform.distribution_histogram",
1986 MetricKind::Absolute,
1987 MetricValue::Distribution {
1988 samples: vec![
1989 Sample {
1990 value: 1.0,
1991 rate: 1
1992 },
1993 Sample {
1994 value: 2.0,
1995 rate: 2
1996 },
1997 ],
1998 statistic: StatisticKind::Histogram,
1999 },
2000 metric.metadata().clone(),
2001 )
2002 .with_namespace(Some("test_namespace"))
2003 .with_tags(Some(metric_tags!(
2004 "env" => "test_env",
2005 "host" => "localhost",
2006 )))
2007 .with_timestamp(Some(ts()))
2008 );
2009 }
2010
2011 #[tokio::test]
2012 async fn transform_distribution_summary() {
2013 let config = LogToMetricConfig {
2014 metrics: None,
2015 all_metrics: Some(true),
2016 };
2017
2018 let json_str = r#"{
2019 "distribution": {
2020 "samples": [
2021 {
2022 "value": 1.0,
2023 "rate": 1
2024 },
2025 {
2026 "value": 2.0,
2027 "rate": 2
2028 }
2029 ],
2030 "statistic": "summary"
2031 },
2032 "kind": "absolute",
2033 "name": "test.transform.distribution_summary",
2034 "tags": {
2035 "env": "test_env",
2036 "host": "localhost"
2037 }
2038 }"#;
2039 let log = create_log_event(json_str);
2040 let metric = do_transform(config, log.clone()).await.unwrap();
2041 assert_eq!(
2042 *metric.as_metric(),
2043 Metric::new_with_metadata(
2044 "test.transform.distribution_summary",
2045 MetricKind::Absolute,
2046 MetricValue::Distribution {
2047 samples: vec![
2048 Sample {
2049 value: 1.0,
2050 rate: 1
2051 },
2052 Sample {
2053 value: 2.0,
2054 rate: 2
2055 },
2056 ],
2057 statistic: StatisticKind::Summary,
2058 },
2059 metric.metadata().clone(),
2060 )
2061 .with_namespace(Some("test_namespace"))
2062 .with_tags(Some(metric_tags!(
2063 "env" => "test_env",
2064 "host" => "localhost",
2065 )))
2066 .with_timestamp(Some(ts()))
2067 );
2068 }
2069
2070 #[tokio::test]
2071 async fn transform_summary() {
2072 let config = LogToMetricConfig {
2073 metrics: None,
2074 all_metrics: Some(true),
2075 };
2076
2077 let json_str = r#"{
2078 "summary": {
2079 "sum": 100.0,
2080 "count": 7,
2081 "quantiles": [
2082 {
2083 "quantile": 0.05,
2084 "value": 10.0
2085 },
2086 {
2087 "quantile": 0.95,
2088 "value": 25.0
2089 }
2090 ]
2091 },
2092 "kind": "absolute",
2093 "name": "test.transform.histogram",
2094 "tags": {
2095 "env": "test_env",
2096 "host": "localhost"
2097 }
2098 }"#;
2099 let log = create_log_event(json_str);
2100 let metric = do_transform(config, log.clone()).await.unwrap();
2101 assert_eq!(
2102 *metric.as_metric(),
2103 Metric::new_with_metadata(
2104 "test.transform.histogram",
2105 MetricKind::Absolute,
2106 MetricValue::AggregatedSummary {
2107 quantiles: vec![
2108 Quantile {
2109 quantile: 0.05,
2110 value: 10.0,
2111 },
2112 Quantile {
2113 quantile: 0.95,
2114 value: 25.0,
2115 },
2116 ],
2117 count: 7,
2118 sum: 100.0,
2119 },
2120 metric.metadata().clone(),
2121 )
2122 .with_namespace(Some("test_namespace"))
2123 .with_tags(Some(metric_tags!(
2124 "env" => "test_env",
2125 "host" => "localhost",
2126 )))
2127 .with_timestamp(Some(ts()))
2128 );
2129 }
2130
2131 #[tokio::test]
2132 async fn transform_counter() {
2133 let config = LogToMetricConfig {
2134 metrics: None,
2135 all_metrics: Some(true),
2136 };
2137
2138 let json_str = r#"{
2139 "counter": {
2140 "value": 10.0
2141 },
2142 "kind": "incremental",
2143 "name": "test.transform.counter",
2144 "tags": {
2145 "env": "test_env",
2146 "host": "localhost"
2147 }
2148 }"#;
2149 let log = create_log_event(json_str);
2150 let metric = do_transform(config, log.clone()).await.unwrap();
2151 assert_eq!(
2152 *metric.as_metric(),
2153 Metric::new_with_metadata(
2154 "test.transform.counter",
2155 MetricKind::Incremental,
2156 MetricValue::Counter { value: 10.0 },
2157 metric.metadata().clone(),
2158 )
2159 .with_namespace(Some("test_namespace"))
2160 .with_tags(Some(metric_tags!(
2161 "env" => "test_env",
2162 "host" => "localhost",
2163 )))
2164 .with_timestamp(Some(ts()))
2165 );
2166 }
2167
2168 #[tokio::test]
2169 async fn transform_set() {
2170 let config = LogToMetricConfig {
2171 metrics: None,
2172 all_metrics: Some(true),
2173 };
2174
2175 let json_str = r#"{
2176 "set": {
2177 "values": ["990.0", "1234"]
2178 },
2179 "kind": "incremental",
2180 "name": "test.transform.set",
2181 "tags": {
2182 "env": "test_env",
2183 "host": "localhost"
2184 }
2185 }"#;
2186 let log = create_log_event(json_str);
2187 let metric = do_transform(config, log.clone()).await.unwrap();
2188 assert_eq!(
2189 *metric.as_metric(),
2190 Metric::new_with_metadata(
2191 "test.transform.set",
2192 MetricKind::Incremental,
2193 MetricValue::Set {
2194 values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2195 },
2196 metric.metadata().clone(),
2197 )
2198 .with_namespace(Some("test_namespace"))
2199 .with_tags(Some(metric_tags!(
2200 "env" => "test_env",
2201 "host" => "localhost",
2202 )))
2203 .with_timestamp(Some(ts()))
2204 );
2205 }
2206
2207 #[tokio::test]
2208 async fn transform_all_metrics_optional_namespace() {
2209 let config = LogToMetricConfig {
2210 metrics: None,
2211 all_metrics: Some(true),
2212 };
2213
2214 let json_str = r#"{
2215 "counter": {
2216 "value": 10.0
2217 },
2218 "kind": "incremental",
2219 "name": "test.transform.counter",
2220 "tags": {
2221 "env": "test_env",
2222 "host": "localhost"
2223 }
2224 }"#;
2225 let log = create_log_event_with_namespace(json_str, None);
2226 let metric = do_transform(config, log.clone()).await.unwrap();
2227 assert_eq!(
2228 *metric.as_metric(),
2229 Metric::new_with_metadata(
2230 "test.transform.counter",
2231 MetricKind::Incremental,
2232 MetricValue::Counter { value: 10.0 },
2233 metric.metadata().clone(),
2234 )
2235 .with_tags(Some(metric_tags!(
2236 "env" => "test_env",
2237 "host" => "localhost",
2238 )))
2239 .with_timestamp(Some(ts()))
2240 );
2241 }
2242}