1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6 configurable::configurable_component,
7 event::{
8 DatadogMetricOriginMetadata, LogEvent,
9 metric::{Bucket, Quantile, Sample},
10 },
11};
12use vrl::{
13 event_path, path,
14 path::{PathParseError, parse_target_path},
15};
16
17use crate::{
18 common::expansion::pair_expansion,
19 config::{
20 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
21 TransformOutput, schema::Definition,
22 },
23 event::{
24 Event, Value,
25 metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
26 },
27 internal_events::{
28 DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
29 MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
30 MetricMetadataParseError, ParserMissingFieldError,
31 },
32 schema,
33 template::{Template, TemplateRenderingError},
34 transforms::{
35 FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
36 },
37};
38
39const ORIGIN_SERVICE_VALUE: u32 = 3;
40
41#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct LogToMetricConfig {
46 pub metrics: Option<Vec<MetricConfig>>,
48
49 pub all_metrics: Option<bool>,
77}
78
79#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct CounterConfig {
83 #[serde(default = "default_increment_by_value")]
85 pub increment_by_value: bool,
86
87 #[configurable(derived)]
88 #[serde(default = "default_kind")]
89 pub kind: MetricKind,
90}
91
92#[configurable_component]
99#[derive(Clone, Debug)]
100pub struct MetricConfig {
101 pub field: Template,
103
104 pub name: Option<Template>,
108
109 pub namespace: Option<Template>,
111
112 #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
117 pub tags: Option<IndexMap<Template, TagConfig>>,
118
119 #[configurable(derived)]
120 #[serde(flatten)]
121 pub metric: MetricTypeConfig,
122}
123
124#[configurable_component]
128#[derive(Clone, Debug)]
129#[serde(untagged)]
130pub enum TagConfig {
131 Plain(Option<Template>),
133
134 Multi(Vec<Option<Template>>),
136}
137
138#[configurable_component]
140#[derive(Clone, Debug)]
141#[serde(tag = "type", rename_all = "snake_case")]
142#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
143pub enum MetricTypeConfig {
144 Counter(CounterConfig),
146
147 Histogram,
149
150 Gauge,
152
153 Set,
155
156 Summary,
158}
159
160impl MetricConfig {
161 fn field(&self) -> &str {
162 self.field.get_ref()
163 }
164}
165
166const fn default_increment_by_value() -> bool {
167 false
168}
169
170const fn default_kind() -> MetricKind {
171 MetricKind::Incremental
172}
173
174#[derive(Debug, Clone)]
175pub struct LogToMetric {
176 pub metrics: Vec<MetricConfig>,
177 pub all_metrics: bool,
178}
179
180impl GenerateConfig for LogToMetricConfig {
181 fn generate_config() -> toml::Value {
182 toml::Value::try_from(Self {
183 metrics: Some(vec![MetricConfig {
184 field: "field_name".try_into().expect("Fixed template"),
185 name: None,
186 namespace: None,
187 tags: None,
188 metric: MetricTypeConfig::Counter(CounterConfig {
189 increment_by_value: false,
190 kind: MetricKind::Incremental,
191 }),
192 }]),
193 all_metrics: Some(true),
194 })
195 .unwrap()
196 }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "log_to_metric")]
201impl TransformConfig for LogToMetricConfig {
202 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
203 Ok(Transform::function(LogToMetric {
204 metrics: self.metrics.clone().unwrap_or_default(),
205 all_metrics: self.all_metrics.unwrap_or_default(),
206 }))
207 }
208
209 fn input(&self) -> Input {
210 Input::log()
211 }
212
213 fn outputs(
214 &self,
215 _: &TransformContext,
216 _: &[(OutputId, schema::Definition)],
217 ) -> Vec<TransformOutput> {
218 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
220 }
221
222 fn enable_concurrency(&self) -> bool {
223 true
224 }
225}
226
227#[configurable_component]
229#[derive(Clone, Debug)]
230pub enum TransformParseErrorKind {
231 FloatError,
233 IntError,
235 ArrayError,
237}
238
239impl std::fmt::Display for TransformParseErrorKind {
240 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
241 write!(f, "{self:?}")
242 }
243}
244
245enum TransformError {
246 PathNotFound {
247 path: String,
248 },
249 PathNull {
250 path: String,
251 },
252 MetricDetailsNotFound,
253 MetricValueError {
254 path: String,
255 path_value: String,
256 },
257 ParseError {
258 path: String,
259 kind: TransformParseErrorKind,
260 },
261 ParseFloatError {
262 path: String,
263 error: ParseFloatError,
264 },
265 TemplateRenderingError(TemplateRenderingError),
266 PairExpansionError {
267 key: String,
268 value: String,
269 error: serde_json::Error,
270 },
271}
272
273fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
274 template
275 .render_string(event)
276 .map_err(TransformError::TemplateRenderingError)
277}
278
279fn render_tags(
280 tags: &Option<IndexMap<Template, TagConfig>>,
281 event: &Event,
282) -> Result<Option<MetricTags>, TransformError> {
283 let mut static_tags: HashMap<String, String> = HashMap::new();
284 let mut dynamic_tags: HashMap<String, String> = HashMap::new();
285 Ok(match tags {
286 None => None,
287 Some(tags) => {
288 let mut result = MetricTags::default();
289 for (name, config) in tags {
290 match config {
291 TagConfig::Plain(template) => {
292 render_tag_into(
293 event,
294 name,
295 template.as_ref(),
296 &mut result,
297 &mut static_tags,
298 &mut dynamic_tags,
299 )?;
300 }
301 TagConfig::Multi(vec) => {
302 for template in vec {
303 render_tag_into(
304 event,
305 name,
306 template.as_ref(),
307 &mut result,
308 &mut static_tags,
309 &mut dynamic_tags,
310 )?;
311 }
312 }
313 }
314 }
315 for (k, v) in static_tags {
316 if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
317 warn!(
318 "Static tags overrides dynamic tags. \
319 key: {}, value: {:?}, discarded value: {:?}",
320 k, v, discarded_v
321 );
322 };
323 }
324 result.as_option()
325 }
326 })
327}
328
329fn render_tag_into(
330 event: &Event,
331 key_template: &Template,
332 value_template: Option<&Template>,
333 result: &mut MetricTags,
334 static_tags: &mut HashMap<String, String>,
335 dynamic_tags: &mut HashMap<String, String>,
336) -> Result<(), TransformError> {
337 let key = match render_template(key_template, event) {
338 Ok(key_s) => key_s,
339 Err(TransformError::TemplateRenderingError(err)) => {
340 emit!(crate::internal_events::TemplateRenderingError {
341 error: err,
342 drop_event: false,
343 field: Some(key_template.get_ref()),
344 });
345 return Ok(());
346 }
347 Err(err) => return Err(err),
348 };
349 match value_template {
350 None => {
351 result.insert(key, TagValue::Bare);
352 }
353 Some(template) => match render_template(template, event) {
354 Ok(value) => {
355 let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
356 .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
357 result.extend(expanded_pairs);
358 }
359 Err(TransformError::TemplateRenderingError(value_error)) => {
360 emit!(crate::internal_events::TemplateRenderingError {
361 error: value_error,
362 drop_event: false,
363 field: Some(template.get_ref()),
364 });
365 return Ok(());
366 }
367 Err(other) => return Err(other),
368 },
369 };
370 Ok(())
371}
372
373fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
374 let log = event.as_log();
375
376 let timestamp = log
377 .get_timestamp()
378 .and_then(Value::as_timestamp)
379 .cloned()
380 .or_else(|| Some(Utc::now()));
381
382 let metadata = event
384 .metadata()
385 .clone()
386 .with_schema_definition(&Arc::new(Definition::any()))
387 .with_origin_metadata(DatadogMetricOriginMetadata::new(
388 None,
389 None,
390 Some(ORIGIN_SERVICE_VALUE),
391 ));
392
393 let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
394 path: config.field().to_string(),
395 })?;
396
397 let value = match log.get(&field) {
398 None => Err(TransformError::PathNotFound {
399 path: field.to_string(),
400 }),
401 Some(Value::Null) => Err(TransformError::PathNull {
402 path: field.to_string(),
403 }),
404 Some(value) => Ok(value),
405 }?;
406
407 let name = config.name.as_ref().unwrap_or(&config.field);
408 let name = render_template(name, event)?;
409
410 let namespace = config.namespace.as_ref();
411 let namespace = namespace
412 .map(|namespace| render_template(namespace, event))
413 .transpose()?;
414
415 let tags = render_tags(&config.tags, event)?;
416
417 let (kind, value) = match &config.metric {
418 MetricTypeConfig::Counter(counter) => {
419 let value = if counter.increment_by_value {
420 value.to_string_lossy().parse().map_err(|error| {
421 TransformError::ParseFloatError {
422 path: config.field.get_ref().to_owned(),
423 error,
424 }
425 })?
426 } else {
427 1.0
428 };
429
430 (counter.kind, MetricValue::Counter { value })
431 }
432 MetricTypeConfig::Histogram => {
433 let value = value.to_string_lossy().parse().map_err(|error| {
434 TransformError::ParseFloatError {
435 path: field.to_string(),
436 error,
437 }
438 })?;
439
440 (
441 MetricKind::Incremental,
442 MetricValue::Distribution {
443 samples: vector_lib::samples![value => 1],
444 statistic: StatisticKind::Histogram,
445 },
446 )
447 }
448 MetricTypeConfig::Summary => {
449 let value = value.to_string_lossy().parse().map_err(|error| {
450 TransformError::ParseFloatError {
451 path: field.to_string(),
452 error,
453 }
454 })?;
455
456 (
457 MetricKind::Incremental,
458 MetricValue::Distribution {
459 samples: vector_lib::samples![value => 1],
460 statistic: StatisticKind::Summary,
461 },
462 )
463 }
464 MetricTypeConfig::Gauge => {
465 let value = value.to_string_lossy().parse().map_err(|error| {
466 TransformError::ParseFloatError {
467 path: field.to_string(),
468 error,
469 }
470 })?;
471
472 (MetricKind::Absolute, MetricValue::Gauge { value })
473 }
474 MetricTypeConfig::Set => {
475 let value = value.to_string_lossy().into_owned();
476
477 (
478 MetricKind::Incremental,
479 MetricValue::Set {
480 values: std::iter::once(value).collect(),
481 },
482 )
483 }
484 };
485 Ok(Metric::new_with_metadata(name, kind, value, metadata)
486 .with_namespace(namespace)
487 .with_tags(tags)
488 .with_timestamp(timestamp))
489}
490
491fn bytes_to_str(value: &Value) -> Option<String> {
492 match value {
493 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
494 _ => None,
495 }
496}
497
498fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
499 let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
501 PathParseError::InvalidPathSyntax { path } => PathNotFound {
502 path: path.to_string(),
503 },
504 })?;
505 match maybe_value {
506 None => Err(PathNotFound {
507 path: path.to_string(),
508 }),
509 Some(v) => Ok(bytes_to_str(v)),
510 }
511}
512
513fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
514 let counter_value = log
515 .get(event_path!("counter", "value"))
516 .ok_or_else(|| TransformError::PathNotFound {
517 path: "counter.value".to_string(),
518 })?
519 .as_float()
520 .ok_or_else(|| TransformError::ParseError {
521 path: "counter.value".to_string(),
522 kind: TransformParseErrorKind::FloatError,
523 })?;
524
525 Ok(MetricValue::Counter {
526 value: *counter_value,
527 })
528}
529
530fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
531 let gauge_value = log
532 .get(event_path!("gauge", "value"))
533 .ok_or_else(|| TransformError::PathNotFound {
534 path: "gauge.value".to_string(),
535 })?
536 .as_float()
537 .ok_or_else(|| TransformError::ParseError {
538 path: "gauge.value".to_string(),
539 kind: TransformParseErrorKind::FloatError,
540 })?;
541 Ok(MetricValue::Gauge {
542 value: *gauge_value,
543 })
544}
545
546fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
547 let set_values = log
548 .get(event_path!("set", "values"))
549 .ok_or_else(|| TransformError::PathNotFound {
550 path: "set.values".to_string(),
551 })?
552 .as_array()
553 .ok_or_else(|| TransformError::ParseError {
554 path: "set.values".to_string(),
555 kind: TransformParseErrorKind::ArrayError,
556 })?;
557
558 let mut values: Vec<String> = Vec::new();
559 for e_value in set_values {
560 let value = e_value
561 .as_bytes()
562 .ok_or_else(|| TransformError::ParseError {
563 path: "set.values".to_string(),
564 kind: TransformParseErrorKind::ArrayError,
565 })?;
566 values.push(String::from_utf8_lossy(value).to_string());
567 }
568
569 Ok(MetricValue::Set {
570 values: values.into_iter().collect(),
571 })
572}
573
574fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
575 let event_samples = log
576 .get(event_path!("distribution", "samples"))
577 .ok_or_else(|| TransformError::PathNotFound {
578 path: "distribution.samples".to_string(),
579 })?
580 .as_array()
581 .ok_or_else(|| TransformError::ParseError {
582 path: "distribution.samples".to_string(),
583 kind: TransformParseErrorKind::ArrayError,
584 })?;
585
586 let mut samples: Vec<Sample> = Vec::new();
587 for e_sample in event_samples {
588 let value = e_sample
589 .get(path!("value"))
590 .ok_or_else(|| TransformError::PathNotFound {
591 path: "value".to_string(),
592 })?
593 .as_float()
594 .ok_or_else(|| TransformError::ParseError {
595 path: "value".to_string(),
596 kind: TransformParseErrorKind::FloatError,
597 })?;
598
599 let rate = e_sample
600 .get(path!("rate"))
601 .ok_or_else(|| TransformError::PathNotFound {
602 path: "rate".to_string(),
603 })?
604 .as_integer()
605 .ok_or_else(|| TransformError::ParseError {
606 path: "rate".to_string(),
607 kind: TransformParseErrorKind::IntError,
608 })?;
609
610 samples.push(Sample {
611 value: *value,
612 rate: rate as u32,
613 });
614 }
615
616 let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
617 Some(n) => n,
618 None => {
619 return Err(TransformError::PathNotFound {
620 path: "distribution.statistic".to_string(),
621 });
622 }
623 };
624 let statistic_kind = match statistic_str.as_str() {
625 "histogram" => Ok(StatisticKind::Histogram),
626 "summary" => Ok(StatisticKind::Summary),
627 _ => Err(TransformError::MetricValueError {
628 path: "distribution.statistic".to_string(),
629 path_value: statistic_str.to_string(),
630 }),
631 }?;
632
633 Ok(MetricValue::Distribution {
634 samples,
635 statistic: statistic_kind,
636 })
637}
638
639fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
640 let event_buckets = log
641 .get(event_path!("aggregated_histogram", "buckets"))
642 .ok_or_else(|| TransformError::PathNotFound {
643 path: "aggregated_histogram.buckets".to_string(),
644 })?
645 .as_array()
646 .ok_or_else(|| TransformError::ParseError {
647 path: "aggregated_histogram.buckets".to_string(),
648 kind: TransformParseErrorKind::ArrayError,
649 })?;
650
651 let mut buckets: Vec<Bucket> = Vec::new();
652 for e_bucket in event_buckets {
653 let upper_limit = e_bucket
654 .get(path!("upper_limit"))
655 .ok_or_else(|| TransformError::PathNotFound {
656 path: "aggregated_histogram.buckets.upper_limit".to_string(),
657 })?
658 .as_float()
659 .ok_or_else(|| TransformError::ParseError {
660 path: "aggregated_histogram.buckets.upper_limit".to_string(),
661 kind: TransformParseErrorKind::FloatError,
662 })?;
663
664 let count = e_bucket
665 .get(path!("count"))
666 .ok_or_else(|| TransformError::PathNotFound {
667 path: "aggregated_histogram.buckets.count".to_string(),
668 })?
669 .as_integer()
670 .ok_or_else(|| TransformError::ParseError {
671 path: "aggregated_histogram.buckets.count".to_string(),
672 kind: TransformParseErrorKind::IntError,
673 })?;
674
675 buckets.push(Bucket {
676 upper_limit: *upper_limit,
677 count: count as u64,
678 });
679 }
680
681 let count = log
682 .get(event_path!("aggregated_histogram", "count"))
683 .ok_or_else(|| TransformError::PathNotFound {
684 path: "aggregated_histogram.count".to_string(),
685 })?
686 .as_integer()
687 .ok_or_else(|| TransformError::ParseError {
688 path: "aggregated_histogram.count".to_string(),
689 kind: TransformParseErrorKind::IntError,
690 })?;
691
692 let sum = log
693 .get(event_path!("aggregated_histogram", "sum"))
694 .ok_or_else(|| TransformError::PathNotFound {
695 path: "aggregated_histogram.sum".to_string(),
696 })?
697 .as_float()
698 .ok_or_else(|| TransformError::ParseError {
699 path: "aggregated_histogram.sum".to_string(),
700 kind: TransformParseErrorKind::FloatError,
701 })?;
702
703 Ok(MetricValue::AggregatedHistogram {
704 buckets,
705 count: count as u64,
706 sum: *sum,
707 })
708}
709
710fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
711 let event_quantiles = log
712 .get(event_path!("aggregated_summary", "quantiles"))
713 .ok_or_else(|| TransformError::PathNotFound {
714 path: "aggregated_summary.quantiles".to_string(),
715 })?
716 .as_array()
717 .ok_or_else(|| TransformError::ParseError {
718 path: "aggregated_summary.quantiles".to_string(),
719 kind: TransformParseErrorKind::ArrayError,
720 })?;
721
722 let mut quantiles: Vec<Quantile> = Vec::new();
723 for e_quantile in event_quantiles {
724 let quantile = e_quantile
725 .get(path!("quantile"))
726 .ok_or_else(|| TransformError::PathNotFound {
727 path: "aggregated_summary.quantiles.quantile".to_string(),
728 })?
729 .as_float()
730 .ok_or_else(|| TransformError::ParseError {
731 path: "aggregated_summary.quantiles.quantile".to_string(),
732 kind: TransformParseErrorKind::FloatError,
733 })?;
734
735 let value = e_quantile
736 .get(path!("value"))
737 .ok_or_else(|| TransformError::PathNotFound {
738 path: "aggregated_summary.quantiles.value".to_string(),
739 })?
740 .as_float()
741 .ok_or_else(|| TransformError::ParseError {
742 path: "aggregated_summary.quantiles.value".to_string(),
743 kind: TransformParseErrorKind::FloatError,
744 })?;
745
746 quantiles.push(Quantile {
747 quantile: *quantile,
748 value: *value,
749 })
750 }
751
752 let count = log
753 .get(event_path!("aggregated_summary", "count"))
754 .ok_or_else(|| TransformError::PathNotFound {
755 path: "aggregated_summary.count".to_string(),
756 })?
757 .as_integer()
758 .ok_or_else(|| TransformError::ParseError {
759 path: "aggregated_summary.count".to_string(),
760 kind: TransformParseErrorKind::IntError,
761 })?;
762
763 let sum = log
764 .get(event_path!("aggregated_summary", "sum"))
765 .ok_or_else(|| TransformError::PathNotFound {
766 path: "aggregated_summary.sum".to_string(),
767 })?
768 .as_float()
769 .ok_or_else(|| TransformError::ParseError {
770 path: "aggregated_summary.sum".to_string(),
771 kind: TransformParseErrorKind::FloatError,
772 })?;
773
774 Ok(MetricValue::AggregatedSummary {
775 quantiles,
776 count: count as u64,
777 sum: *sum,
778 })
779}
780
781fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
782 let log = event.as_log();
783 let timestamp = log
784 .get_timestamp()
785 .and_then(Value::as_timestamp)
786 .cloned()
787 .or_else(|| Some(Utc::now()));
788
789 let name = match try_get_string_from_log(log, "name")? {
790 Some(n) => n,
791 None => {
792 return Err(TransformError::PathNotFound {
793 path: "name".to_string(),
794 });
795 }
796 };
797
798 let mut tags = MetricTags::default();
799
800 if let Some(els) = log.get(event_path!("tags"))
801 && let Some(el) = els.as_object()
802 {
803 for (key, value) in el {
804 tags.insert(key.to_string(), bytes_to_str(value));
805 }
806 }
807 let tags_result = Some(tags);
808
809 let kind_str = match try_get_string_from_log(log, "kind")? {
810 Some(n) => n,
811 None => {
812 return Err(TransformError::PathNotFound {
813 path: "kind".to_string(),
814 });
815 }
816 };
817
818 let kind = match kind_str.as_str() {
819 "absolute" => Ok(MetricKind::Absolute),
820 "incremental" => Ok(MetricKind::Incremental),
821 value => Err(TransformError::MetricValueError {
822 path: "kind".to_string(),
823 path_value: value.to_string(),
824 }),
825 }?;
826
827 let mut value: Option<MetricValue> = None;
828 if let Some(root_event) = log.as_map() {
829 for key in root_event.keys() {
830 value = match key.as_str() {
831 "gauge" => Some(get_gauge_value(log)?),
832 "distribution" => Some(get_distribution_value(log)?),
833 "aggregated_histogram" => Some(get_histogram_value(log)?),
834 "aggregated_summary" => Some(get_summary_value(log)?),
835 "counter" => Some(get_counter_value(log)?),
836 "set" => Some(get_set_value(log)?),
837 _ => None,
838 };
839
840 if value.is_some() {
841 break;
842 }
843 }
844 }
845
846 let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
847
848 let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
849 .with_tags(tags_result)
850 .with_timestamp(timestamp);
851
852 if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
853 metric = metric.with_namespace(namespace);
854 }
855
856 Ok(metric)
857}
858
859impl FunctionTransform for LogToMetric {
860 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
861 let mut buffer = Vec::with_capacity(self.metrics.len());
863 if self.all_metrics {
864 match to_metrics(&event) {
865 Ok(metric) => {
866 output.push(Event::Metric(metric));
867 }
868 Err(err) => {
869 match err {
870 TransformError::MetricValueError { path, path_value } => {
871 emit!(MetricMetadataInvalidFieldValueError {
872 field: path.as_ref(),
873 field_value: path_value.as_ref()
874 })
875 }
876 TransformError::PathNotFound { path } => {
877 emit!(ParserMissingFieldError::<DROP_EVENT> {
878 field: path.as_ref()
879 })
880 }
881 TransformError::ParseError { path, kind } => {
882 emit!(MetricMetadataParseError {
883 field: path.as_ref(),
884 kind: &kind.to_string(),
885 })
886 }
887 TransformError::MetricDetailsNotFound => {
888 emit!(MetricMetadataMetricDetailsNotFoundError {})
889 }
890 TransformError::PairExpansionError { key, value, error } => {
891 emit!(crate::internal_events::PairExpansionError {
892 key: &key,
893 value: &value,
894 drop_event: true,
895 error
896 })
897 }
898 _ => {}
899 };
900 }
901 }
902 } else {
903 for config in self.metrics.iter() {
904 match to_metric_with_config(config, &event) {
905 Ok(metric) => {
906 buffer.push(Event::Metric(metric));
907 }
908 Err(err) => {
909 match err {
910 TransformError::PathNull { path } => {
911 emit!(LogToMetricFieldNullError {
912 field: path.as_ref()
913 })
914 }
915 TransformError::PathNotFound { path } => {
916 emit!(ParserMissingFieldError::<DROP_EVENT> {
917 field: path.as_ref()
918 })
919 }
920 TransformError::ParseFloatError { path, error } => {
921 emit!(LogToMetricParseFloatError {
922 field: path.as_ref(),
923 error
924 })
925 }
926 TransformError::TemplateRenderingError(error) => {
927 emit!(crate::internal_events::TemplateRenderingError {
928 error,
929 drop_event: true,
930 field: None,
931 })
932 }
933 TransformError::PairExpansionError { key, value, error } => {
934 emit!(crate::internal_events::PairExpansionError {
935 key: &key,
936 value: &value,
937 drop_event: true,
938 error
939 })
940 }
941 _ => {}
942 };
943 return;
945 }
946 }
947 }
948 }
949
950 for event in buffer {
952 output.push(event);
953 }
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use std::{sync::Arc, time::Duration};
960
961 use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
962 use similar_asserts::assert_eq;
963 use tokio::sync::mpsc;
964 use tokio_stream::wrappers::ReceiverStream;
965 use vector_lib::{
966 config::ComponentKey,
967 event::{EventMetadata, ObjectMap},
968 metric_tags,
969 };
970
971 use super::*;
972 use crate::{
973 config::log_schema,
974 event::{
975 Event, LogEvent,
976 metric::{Metric, MetricKind, MetricValue, StatisticKind},
977 },
978 test_util::components::assert_transform_compliance,
979 transforms::test::create_topology,
980 };
981
982 const TEST_SOURCE_COMPONENT_ID: &str = "in";
983 const TEST_UPSTREAM_COMPONENT_ID: &str = "transform";
984 const TEST_SOURCE_TYPE: &str = "unit_test_stream";
985 const TEST_NAMESPACE: &str = "test_namespace";
986
987 #[test]
988 fn generate_config() {
989 crate::test_util::test_generate_config::<LogToMetricConfig>();
990 }
991
992 fn parse_config(s: &str) -> LogToMetricConfig {
993 toml::from_str(s).unwrap()
994 }
995
996 fn parse_yaml_config(s: &str) -> LogToMetricConfig {
997 serde_yaml::from_str(s).unwrap()
998 }
999
1000 fn ts() -> DateTime<Utc> {
1001 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1002 .single()
1003 .and_then(|t| t.with_nanosecond(11))
1004 .expect("invalid timestamp")
1005 }
1006
1007 fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1008 let mut log = Event::Log(LogEvent::from("i am a log"));
1009 log.as_mut_log().insert(key, value);
1010 log.as_mut_log()
1011 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1012 log
1013 }
1014
1015 fn set_test_source_metadata(metadata: &mut EventMetadata) {
1016 metadata.set_upstream_id(Arc::new(OutputId::from(TEST_UPSTREAM_COMPONENT_ID)));
1017 metadata.set_source_id(Arc::new(ComponentKey::from(TEST_SOURCE_COMPONENT_ID)));
1018 metadata.set_source_type(TEST_SOURCE_TYPE);
1019 }
1020
1021 async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1022 assert_transform_compliance(async move {
1023 let (tx, rx) = mpsc::channel(1);
1024 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1025 tx.send(event).await.unwrap();
1026 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1027 .await
1028 .unwrap_or(None);
1029 drop(tx);
1030 topology.stop().await;
1031 assert_eq!(out.recv().await, None);
1032 result
1033 })
1034 .await
1035 }
1036
1037 async fn do_transform_multiple_events(
1038 config: LogToMetricConfig,
1039 event: Event,
1040 count: usize,
1041 ) -> Vec<Event> {
1042 assert_transform_compliance(async move {
1043 let (tx, rx) = mpsc::channel(1);
1044 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1045 tx.send(event).await.unwrap();
1046
1047 let mut results = vec![];
1048 for _ in 0..count {
1049 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1050 .await
1051 .unwrap_or(None);
1052 if let Some(event) = result {
1053 results.push(event);
1054 }
1055 }
1056
1057 drop(tx);
1058 topology.stop().await;
1059 assert_eq!(out.recv().await, None);
1060 results
1061 })
1062 .await
1063 }
1064
1065 #[tokio::test]
1066 async fn count_http_status_codes() {
1067 let config = parse_config(
1068 r#"
1069 [[metrics]]
1070 type = "counter"
1071 field = "status"
1072 "#,
1073 );
1074
1075 let event = create_event("status", "42");
1076 let mut metadata =
1077 event
1078 .metadata()
1079 .clone()
1080 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1081 None,
1082 None,
1083 Some(ORIGIN_SERVICE_VALUE),
1084 ));
1085 metadata.set_schema_definition(&Arc::new(Definition::any()));
1087 set_test_source_metadata(&mut metadata);
1088 let metric = do_transform(config, event).await.unwrap();
1089
1090 assert_eq!(
1091 metric.into_metric(),
1092 Metric::new_with_metadata(
1093 "status",
1094 MetricKind::Incremental,
1095 MetricValue::Counter { value: 1.0 },
1096 metadata,
1097 )
1098 .with_timestamp(Some(ts()))
1099 );
1100 }
1101
1102 #[tokio::test]
1103 async fn count_http_requests_with_tags() {
1104 let config = parse_config(
1105 r#"
1106 [[metrics]]
1107 type = "counter"
1108 field = "message"
1109 name = "http_requests_total"
1110 namespace = "app"
1111 tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1112 "#,
1113 );
1114
1115 let mut event = create_event("message", "i am log");
1116 event.as_mut_log().insert("method", "post");
1117 event.as_mut_log().insert("code", "200");
1118 let mut metadata =
1119 event
1120 .metadata()
1121 .clone()
1122 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1123 None,
1124 None,
1125 Some(ORIGIN_SERVICE_VALUE),
1126 ));
1127 metadata.set_schema_definition(&Arc::new(Definition::any()));
1129 set_test_source_metadata(&mut metadata);
1130
1131 let metric = do_transform(config, event).await.unwrap();
1132
1133 assert_eq!(
1134 metric.into_metric(),
1135 Metric::new_with_metadata(
1136 "http_requests_total",
1137 MetricKind::Incremental,
1138 MetricValue::Counter { value: 1.0 },
1139 metadata,
1140 )
1141 .with_namespace(Some("app"))
1142 .with_tags(Some(metric_tags!(
1143 "method" => "post",
1144 "code" => "200",
1145 "host" => "localhost",
1146 )))
1147 .with_timestamp(Some(ts()))
1148 );
1149 }
1150
1151 #[tokio::test]
1152 async fn count_http_requests_with_tags_expansion() {
1153 let config = parse_config(
1154 r#"
1155 [[metrics]]
1156 type = "counter"
1157 field = "message"
1158 name = "http_requests_total"
1159 namespace = "app"
1160 tags = {"*" = "{{ dict }}"}
1161 "#,
1162 );
1163
1164 let mut event = create_event("message", "i am log");
1165 let log = event.as_mut_log();
1166
1167 let mut test_dict = ObjectMap::default();
1168 test_dict.insert("one".into(), Value::from("foo"));
1169 test_dict.insert("two".into(), Value::from("baz"));
1170 log.insert("dict", Value::from(test_dict));
1171
1172 let mut metadata =
1173 event
1174 .metadata()
1175 .clone()
1176 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1177 None,
1178 None,
1179 Some(ORIGIN_SERVICE_VALUE),
1180 ));
1181 metadata.set_schema_definition(&Arc::new(Definition::any()));
1183 set_test_source_metadata(&mut metadata);
1184
1185 let metric = do_transform(config, event).await.unwrap();
1186
1187 assert_eq!(
1188 metric.into_metric(),
1189 Metric::new_with_metadata(
1190 "http_requests_total",
1191 MetricKind::Incremental,
1192 MetricValue::Counter { value: 1.0 },
1193 metadata,
1194 )
1195 .with_namespace(Some("app"))
1196 .with_tags(Some(metric_tags!(
1197 "one" => "foo",
1198 "two" => "baz",
1199 )))
1200 .with_timestamp(Some(ts()))
1201 );
1202 }
1203 #[tokio::test]
1204 async fn count_http_requests_with_colliding_dynamic_tags() {
1205 let config = parse_config(
1206 r#"
1207 [[metrics]]
1208 type = "counter"
1209 field = "message"
1210 name = "http_requests_total"
1211 namespace = "app"
1212 tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1213 "#,
1214 );
1215
1216 let mut event = create_event("message", "i am log");
1217 let log = event.as_mut_log();
1218
1219 let mut map1 = ObjectMap::default();
1220 map1.insert("key1".into(), Value::from("val1"));
1221 log.insert("map1", Value::from(map1));
1222
1223 let mut map2 = ObjectMap::default();
1224 map2.insert("l1_key1".into(), Value::from("val2"));
1225 log.insert("map2", Value::from(map2));
1226
1227 let mut metadata =
1228 event
1229 .metadata()
1230 .clone()
1231 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1232 None,
1233 None,
1234 Some(ORIGIN_SERVICE_VALUE),
1235 ));
1236 metadata.set_schema_definition(&Arc::new(Definition::any()));
1238 set_test_source_metadata(&mut metadata);
1239
1240 let metric = do_transform(config, event).await.unwrap().into_metric();
1241 let tags = metric.tags().expect("Metric should have tags");
1242
1243 assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1244
1245 assert_eq!(tags.iter_all().count(), 2);
1246 for (name, value) in tags.iter_all() {
1247 assert_eq!(name, "l1_key1");
1248 assert!(value == Some("val1") || value == Some("val2"));
1249 }
1250 }
1251 #[tokio::test]
1252 async fn multi_value_tags_yaml() {
1253 let config = parse_yaml_config(
1255 r#"
1256 metrics:
1257 - field: "message"
1258 type: "counter"
1259 tags:
1260 tag:
1261 - "one"
1262 - null
1263 - "two"
1264 "#,
1265 );
1266
1267 let event = create_event("message", "I am log");
1268 let metric = do_transform(config, event).await.unwrap().into_metric();
1269 let tags = metric.tags().expect("Metric should have tags");
1270
1271 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1272
1273 assert_eq!(tags.iter_all().count(), 3);
1274 for (name, value) in tags.iter_all() {
1275 assert_eq!(name, "tag");
1276 assert!(value.is_none() || value == Some("one") || value == Some("two"));
1277 }
1278 }
1279 #[tokio::test]
1280 async fn multi_value_tags_expansion_yaml() {
1281 let config = parse_yaml_config(
1283 r#"
1284 metrics:
1285 - field: "message"
1286 type: "counter"
1287 tags:
1288 "*": "{{dict}}"
1289 "#,
1290 );
1291
1292 let mut event = create_event("message", "I am log");
1293 let log = event.as_mut_log();
1294
1295 let mut test_dict = ObjectMap::default();
1296 test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1297 log.insert("dict", Value::from(test_dict));
1298
1299 let metric = do_transform(config, event).await.unwrap().into_metric();
1300 let tags = metric.tags().expect("Metric should have tags");
1301
1302 assert_eq!(
1303 tags.iter_single().collect::<Vec<_>>(),
1304 vec![("one", "[\"foo\",\"baz\"]")]
1305 );
1306
1307 assert_eq!(tags.iter_all().count(), 1);
1308 for (name, value) in tags.iter_all() {
1309 assert_eq!(name, "one");
1310 assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1311 }
1312 }
1313
1314 #[tokio::test]
1315 async fn multi_value_tags_toml() {
1316 let config = parse_config(
1317 r#"
1318 [[metrics]]
1319 field = "message"
1320 type = "counter"
1321 [metrics.tags]
1322 tag = ["one", "two"]
1323 "#,
1324 );
1325
1326 let event = create_event("message", "I am log");
1327 let metric = do_transform(config, event).await.unwrap().into_metric();
1328 let tags = metric.tags().expect("Metric should have tags");
1329
1330 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1331
1332 assert_eq!(tags.iter_all().count(), 2);
1333 for (name, value) in tags.iter_all() {
1334 assert_eq!(name, "tag");
1335 assert!(value == Some("one") || value == Some("two"));
1336 }
1337 }
1338
1339 #[tokio::test]
1340 async fn count_exceptions() {
1341 let config = parse_config(
1342 r#"
1343 [[metrics]]
1344 type = "counter"
1345 field = "backtrace"
1346 name = "exception_total"
1347 "#,
1348 );
1349
1350 let event = create_event("backtrace", "message");
1351 let mut metadata =
1352 event
1353 .metadata()
1354 .clone()
1355 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1356 None,
1357 None,
1358 Some(ORIGIN_SERVICE_VALUE),
1359 ));
1360 metadata.set_schema_definition(&Arc::new(Definition::any()));
1362 set_test_source_metadata(&mut metadata);
1363
1364 let metric = do_transform(config, event).await.unwrap();
1365
1366 assert_eq!(
1367 metric.into_metric(),
1368 Metric::new_with_metadata(
1369 "exception_total",
1370 MetricKind::Incremental,
1371 MetricValue::Counter { value: 1.0 },
1372 metadata
1373 )
1374 .with_timestamp(Some(ts()))
1375 );
1376 }
1377
1378 #[tokio::test]
1379 async fn count_exceptions_no_match() {
1380 let config = parse_config(
1381 r#"
1382 [[metrics]]
1383 type = "counter"
1384 field = "backtrace"
1385 name = "exception_total"
1386 "#,
1387 );
1388
1389 let event = create_event("success", "42");
1390 assert_eq!(do_transform(config, event).await, None);
1391 }
1392
1393 #[tokio::test]
1394 async fn sum_order_amounts() {
1395 let config = parse_config(
1396 r#"
1397 [[metrics]]
1398 type = "counter"
1399 field = "amount"
1400 name = "amount_total"
1401 increment_by_value = true
1402 "#,
1403 );
1404
1405 let event = create_event("amount", "33.99");
1406 let mut metadata =
1407 event
1408 .metadata()
1409 .clone()
1410 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1411 None,
1412 None,
1413 Some(ORIGIN_SERVICE_VALUE),
1414 ));
1415 metadata.set_schema_definition(&Arc::new(Definition::any()));
1417 set_test_source_metadata(&mut metadata);
1418 let metric = do_transform(config, event).await.unwrap();
1419
1420 assert_eq!(
1421 metric.into_metric(),
1422 Metric::new_with_metadata(
1423 "amount_total",
1424 MetricKind::Incremental,
1425 MetricValue::Counter { value: 33.99 },
1426 metadata,
1427 )
1428 .with_timestamp(Some(ts()))
1429 );
1430 }
1431
1432 #[tokio::test]
1433 async fn count_absolute() {
1434 let config = parse_config(
1435 r#"
1436 [[metrics]]
1437 type = "counter"
1438 field = "amount"
1439 name = "amount_total"
1440 increment_by_value = true
1441 kind = "absolute"
1442 "#,
1443 );
1444
1445 let event = create_event("amount", "33.99");
1446 let mut metadata =
1447 event
1448 .metadata()
1449 .clone()
1450 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1451 None,
1452 None,
1453 Some(ORIGIN_SERVICE_VALUE),
1454 ));
1455 metadata.set_schema_definition(&Arc::new(Definition::any()));
1457 set_test_source_metadata(&mut metadata);
1458
1459 let metric = do_transform(config, event).await.unwrap();
1460
1461 assert_eq!(
1462 metric.into_metric(),
1463 Metric::new_with_metadata(
1464 "amount_total",
1465 MetricKind::Absolute,
1466 MetricValue::Counter { value: 33.99 },
1467 metadata,
1468 )
1469 .with_timestamp(Some(ts()))
1470 );
1471 }
1472
1473 #[tokio::test]
1474 async fn memory_usage_gauge() {
1475 let config = parse_config(
1476 r#"
1477 [[metrics]]
1478 type = "gauge"
1479 field = "memory_rss"
1480 name = "memory_rss_bytes"
1481 "#,
1482 );
1483
1484 let event = create_event("memory_rss", "123");
1485 let mut metadata =
1486 event
1487 .metadata()
1488 .clone()
1489 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1490 None,
1491 None,
1492 Some(ORIGIN_SERVICE_VALUE),
1493 ));
1494
1495 metadata.set_schema_definition(&Arc::new(Definition::any()));
1497
1498 set_test_source_metadata(&mut metadata);
1499
1500 let metric = do_transform(config, event).await.unwrap();
1501
1502 assert_eq!(
1503 metric.into_metric(),
1504 Metric::new_with_metadata(
1505 "memory_rss_bytes",
1506 MetricKind::Absolute,
1507 MetricValue::Gauge { value: 123.0 },
1508 metadata,
1509 )
1510 .with_timestamp(Some(ts()))
1511 );
1512 }
1513
1514 #[tokio::test]
1515 async fn parse_failure() {
1516 let config = parse_config(
1517 r#"
1518 [[metrics]]
1519 type = "counter"
1520 field = "status"
1521 name = "status_total"
1522 increment_by_value = true
1523 "#,
1524 );
1525
1526 let event = create_event("status", "not a number");
1527 assert_eq!(do_transform(config, event).await, None);
1528 }
1529
1530 #[tokio::test]
1531 async fn missing_field() {
1532 let config = parse_config(
1533 r#"
1534 [[metrics]]
1535 type = "counter"
1536 field = "status"
1537 name = "status_total"
1538 "#,
1539 );
1540
1541 let event = create_event("not foo", "not a number");
1542 assert_eq!(do_transform(config, event).await, None);
1543 }
1544
1545 #[tokio::test]
1546 async fn null_field() {
1547 let config = parse_config(
1548 r#"
1549 [[metrics]]
1550 type = "counter"
1551 field = "status"
1552 name = "status_total"
1553 "#,
1554 );
1555
1556 let event = create_event("status", Value::Null);
1557 assert_eq!(do_transform(config, event).await, None);
1558 }
1559
1560 #[tokio::test]
1561 async fn multiple_metrics() {
1562 let config = parse_config(
1563 r#"
1564 [[metrics]]
1565 type = "counter"
1566 field = "status"
1567
1568 [[metrics]]
1569 type = "counter"
1570 field = "backtrace"
1571 name = "exception_total"
1572 "#,
1573 );
1574
1575 let mut event = Event::Log(LogEvent::from("i am a log"));
1576 event
1577 .as_mut_log()
1578 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1579 event.as_mut_log().insert("status", "42");
1580 event.as_mut_log().insert("backtrace", "message");
1581 let mut metadata =
1582 event
1583 .metadata()
1584 .clone()
1585 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1586 None,
1587 None,
1588 Some(ORIGIN_SERVICE_VALUE),
1589 ));
1590
1591 metadata.set_schema_definition(&Arc::new(Definition::any()));
1593 set_test_source_metadata(&mut metadata);
1594
1595 let output = do_transform_multiple_events(config, event, 2).await;
1596
1597 assert_eq!(2, output.len());
1598 assert_eq!(
1599 output[0].clone().into_metric(),
1600 Metric::new_with_metadata(
1601 "status",
1602 MetricKind::Incremental,
1603 MetricValue::Counter { value: 1.0 },
1604 metadata.clone(),
1605 )
1606 .with_timestamp(Some(ts()))
1607 );
1608 assert_eq!(
1609 output[1].clone().into_metric(),
1610 Metric::new_with_metadata(
1611 "exception_total",
1612 MetricKind::Incremental,
1613 MetricValue::Counter { value: 1.0 },
1614 metadata,
1615 )
1616 .with_timestamp(Some(ts()))
1617 );
1618 }
1619
1620 #[tokio::test]
1621 async fn multiple_metrics_with_multiple_templates() {
1622 let config = parse_config(
1623 r#"
1624 [[metrics]]
1625 type = "set"
1626 field = "status"
1627 name = "{{host}}_{{worker}}_status_set"
1628
1629 [[metrics]]
1630 type = "counter"
1631 field = "backtrace"
1632 name = "{{service}}_exception_total"
1633 namespace = "{{host}}"
1634 "#,
1635 );
1636
1637 let mut event = Event::Log(LogEvent::from("i am a log"));
1638 event
1639 .as_mut_log()
1640 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1641 event.as_mut_log().insert("status", "42");
1642 event.as_mut_log().insert("backtrace", "message");
1643 event.as_mut_log().insert("host", "local");
1644 event.as_mut_log().insert("worker", "abc");
1645 event.as_mut_log().insert("service", "xyz");
1646 let mut metadata =
1647 event
1648 .metadata()
1649 .clone()
1650 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1651 None,
1652 None,
1653 Some(ORIGIN_SERVICE_VALUE),
1654 ));
1655
1656 metadata.set_schema_definition(&Arc::new(Definition::any()));
1658 set_test_source_metadata(&mut metadata);
1659
1660 let output = do_transform_multiple_events(config, event, 2).await;
1661
1662 assert_eq!(2, output.len());
1663 assert_eq!(
1664 output[0].as_metric(),
1665 &Metric::new_with_metadata(
1666 "local_abc_status_set",
1667 MetricKind::Incremental,
1668 MetricValue::Set {
1669 values: vec!["42".into()].into_iter().collect()
1670 },
1671 metadata.clone(),
1672 )
1673 .with_timestamp(Some(ts()))
1674 );
1675 assert_eq!(
1676 output[1].as_metric(),
1677 &Metric::new_with_metadata(
1678 "xyz_exception_total",
1679 MetricKind::Incremental,
1680 MetricValue::Counter { value: 1.0 },
1681 metadata,
1682 )
1683 .with_namespace(Some("local"))
1684 .with_timestamp(Some(ts()))
1685 );
1686 }
1687
1688 #[tokio::test]
1689 async fn user_ip_set() {
1690 let config = parse_config(
1691 r#"
1692 [[metrics]]
1693 type = "set"
1694 field = "user_ip"
1695 name = "unique_user_ip"
1696 "#,
1697 );
1698
1699 let event = create_event("user_ip", "1.2.3.4");
1700 let mut metadata =
1701 event
1702 .metadata()
1703 .clone()
1704 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1705 None,
1706 None,
1707 Some(ORIGIN_SERVICE_VALUE),
1708 ));
1709 metadata.set_schema_definition(&Arc::new(Definition::any()));
1711 set_test_source_metadata(&mut metadata);
1712
1713 let metric = do_transform(config, event).await.unwrap();
1714
1715 assert_eq!(
1716 metric.into_metric(),
1717 Metric::new_with_metadata(
1718 "unique_user_ip",
1719 MetricKind::Incremental,
1720 MetricValue::Set {
1721 values: vec!["1.2.3.4".into()].into_iter().collect()
1722 },
1723 metadata,
1724 )
1725 .with_timestamp(Some(ts()))
1726 );
1727 }
1728
1729 #[tokio::test]
1730 async fn response_time_histogram() {
1731 let config = parse_config(
1732 r#"
1733 [[metrics]]
1734 type = "histogram"
1735 field = "response_time"
1736 "#,
1737 );
1738
1739 let event = create_event("response_time", "2.5");
1740 let mut metadata =
1741 event
1742 .metadata()
1743 .clone()
1744 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1745 None,
1746 None,
1747 Some(ORIGIN_SERVICE_VALUE),
1748 ));
1749
1750 metadata.set_schema_definition(&Arc::new(Definition::any()));
1752 set_test_source_metadata(&mut metadata);
1753
1754 let metric = do_transform(config, event).await.unwrap();
1755
1756 assert_eq!(
1757 metric.into_metric(),
1758 Metric::new_with_metadata(
1759 "response_time",
1760 MetricKind::Incremental,
1761 MetricValue::Distribution {
1762 samples: vector_lib::samples![2.5 => 1],
1763 statistic: StatisticKind::Histogram
1764 },
1765 metadata
1766 )
1767 .with_timestamp(Some(ts()))
1768 );
1769 }
1770
1771 #[tokio::test]
1772 async fn response_time_summary() {
1773 let config = parse_config(
1774 r#"
1775 [[metrics]]
1776 type = "summary"
1777 field = "response_time"
1778 "#,
1779 );
1780
1781 let event = create_event("response_time", "2.5");
1782 let mut metadata =
1783 event
1784 .metadata()
1785 .clone()
1786 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787 None,
1788 None,
1789 Some(ORIGIN_SERVICE_VALUE),
1790 ));
1791
1792 metadata.set_schema_definition(&Arc::new(Definition::any()));
1794 set_test_source_metadata(&mut metadata);
1795
1796 let metric = do_transform(config, event).await.unwrap();
1797
1798 assert_eq!(
1799 metric.into_metric(),
1800 Metric::new_with_metadata(
1801 "response_time",
1802 MetricKind::Incremental,
1803 MetricValue::Distribution {
1804 samples: vector_lib::samples![2.5 => 1],
1805 statistic: StatisticKind::Summary
1806 },
1807 metadata
1808 )
1809 .with_timestamp(Some(ts()))
1810 );
1811 }
1812
1813 fn create_log_event(json_str: &str) -> Event {
1816 create_log_event_with_namespace(json_str, Some(TEST_NAMESPACE))
1817 }
1818
1819 fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1820 let mut log_value: Value =
1821 serde_json::from_str(json_str).expect("JSON was not well-formatted");
1822 log_value.insert("timestamp", ts());
1823
1824 if let Some(namespace) = namespace {
1825 log_value.insert("namespace", namespace);
1826 }
1827
1828 let mut metadata = EventMetadata::default();
1829 set_test_source_metadata(&mut metadata);
1830
1831 Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1832 }
1833
1834 #[tokio::test]
1835 async fn transform_gauge() {
1836 let config = LogToMetricConfig {
1837 metrics: None,
1838 all_metrics: Some(true),
1839 };
1840
1841 let json_str = r#"{
1842 "gauge": {
1843 "value": 990.0
1844 },
1845 "kind": "absolute",
1846 "name": "test.transform.gauge",
1847 "tags": {
1848 "env": "test_env",
1849 "host": "localhost"
1850 }
1851 }"#;
1852 let log = create_log_event(json_str);
1853 let metric = do_transform(config, log.clone()).await.unwrap();
1854 assert_eq!(
1855 *metric.as_metric(),
1856 Metric::new_with_metadata(
1857 "test.transform.gauge",
1858 MetricKind::Absolute,
1859 MetricValue::Gauge { value: 990.0 },
1860 metric.metadata().clone(),
1861 )
1862 .with_namespace(Some(TEST_NAMESPACE))
1863 .with_tags(Some(metric_tags!(
1864 "env" => "test_env",
1865 "host" => "localhost",
1866 )))
1867 .with_timestamp(Some(ts()))
1868 );
1869 }
1870
1871 #[tokio::test]
1872 async fn transform_histogram() {
1873 let config = LogToMetricConfig {
1874 metrics: None,
1875 all_metrics: Some(true),
1876 };
1877
1878 let json_str = r#"{
1879 "aggregated_histogram": {
1880 "sum": 18.0,
1881 "count": 5,
1882 "buckets": [
1883 {
1884 "upper_limit": 1.0,
1885 "count": 1
1886 },
1887 {
1888 "upper_limit": 2.0,
1889 "count": 2
1890 },
1891 {
1892 "upper_limit": 5.0,
1893 "count": 1
1894 },
1895 {
1896 "upper_limit": 10.0,
1897 "count": 1
1898 }
1899 ]
1900 },
1901 "kind": "absolute",
1902 "name": "test.transform.histogram",
1903 "tags": {
1904 "env": "test_env",
1905 "host": "localhost"
1906 }
1907 }"#;
1908 let log = create_log_event(json_str);
1909 let metric = do_transform(config, log.clone()).await.unwrap();
1910 assert_eq!(
1911 *metric.as_metric(),
1912 Metric::new_with_metadata(
1913 "test.transform.histogram",
1914 MetricKind::Absolute,
1915 MetricValue::AggregatedHistogram {
1916 count: 5,
1917 sum: 18.0,
1918 buckets: vec![
1919 Bucket {
1920 upper_limit: 1.0,
1921 count: 1,
1922 },
1923 Bucket {
1924 upper_limit: 2.0,
1925 count: 2,
1926 },
1927 Bucket {
1928 upper_limit: 5.0,
1929 count: 1,
1930 },
1931 Bucket {
1932 upper_limit: 10.0,
1933 count: 1,
1934 },
1935 ],
1936 },
1937 metric.metadata().clone(),
1938 )
1939 .with_namespace(Some(TEST_NAMESPACE))
1940 .with_tags(Some(metric_tags!(
1941 "env" => "test_env",
1942 "host" => "localhost",
1943 )))
1944 .with_timestamp(Some(ts()))
1945 );
1946 }
1947
1948 #[tokio::test]
1949 async fn transform_distribution_histogram() {
1950 let config = LogToMetricConfig {
1951 metrics: None,
1952 all_metrics: Some(true),
1953 };
1954
1955 let json_str = r#"{
1956 "distribution": {
1957 "samples": [
1958 {
1959 "value": 1.0,
1960 "rate": 1
1961 },
1962 {
1963 "value": 2.0,
1964 "rate": 2
1965 }
1966 ],
1967 "statistic": "histogram"
1968 },
1969 "kind": "absolute",
1970 "name": "test.transform.distribution_histogram",
1971 "tags": {
1972 "env": "test_env",
1973 "host": "localhost"
1974 }
1975 }"#;
1976 let log = create_log_event(json_str);
1977 let metric = do_transform(config, log.clone()).await.unwrap();
1978 assert_eq!(
1979 *metric.as_metric(),
1980 Metric::new_with_metadata(
1981 "test.transform.distribution_histogram",
1982 MetricKind::Absolute,
1983 MetricValue::Distribution {
1984 samples: vec![
1985 Sample {
1986 value: 1.0,
1987 rate: 1
1988 },
1989 Sample {
1990 value: 2.0,
1991 rate: 2
1992 },
1993 ],
1994 statistic: StatisticKind::Histogram,
1995 },
1996 metric.metadata().clone(),
1997 )
1998 .with_namespace(Some(TEST_NAMESPACE))
1999 .with_tags(Some(metric_tags!(
2000 "env" => "test_env",
2001 "host" => "localhost",
2002 )))
2003 .with_timestamp(Some(ts()))
2004 );
2005 }
2006
2007 #[tokio::test]
2008 async fn transform_distribution_summary() {
2009 let config = LogToMetricConfig {
2010 metrics: None,
2011 all_metrics: Some(true),
2012 };
2013
2014 let json_str = r#"{
2015 "distribution": {
2016 "samples": [
2017 {
2018 "value": 1.0,
2019 "rate": 1
2020 },
2021 {
2022 "value": 2.0,
2023 "rate": 2
2024 }
2025 ],
2026 "statistic": "summary"
2027 },
2028 "kind": "absolute",
2029 "name": "test.transform.distribution_summary",
2030 "tags": {
2031 "env": "test_env",
2032 "host": "localhost"
2033 }
2034 }"#;
2035 let log = create_log_event(json_str);
2036 let metric = do_transform(config, log.clone()).await.unwrap();
2037 assert_eq!(
2038 *metric.as_metric(),
2039 Metric::new_with_metadata(
2040 "test.transform.distribution_summary",
2041 MetricKind::Absolute,
2042 MetricValue::Distribution {
2043 samples: vec![
2044 Sample {
2045 value: 1.0,
2046 rate: 1
2047 },
2048 Sample {
2049 value: 2.0,
2050 rate: 2
2051 },
2052 ],
2053 statistic: StatisticKind::Summary,
2054 },
2055 metric.metadata().clone(),
2056 )
2057 .with_namespace(Some(TEST_NAMESPACE))
2058 .with_tags(Some(metric_tags!(
2059 "env" => "test_env",
2060 "host" => "localhost",
2061 )))
2062 .with_timestamp(Some(ts()))
2063 );
2064 }
2065
2066 #[tokio::test]
2067 async fn transform_summary() {
2068 let config = LogToMetricConfig {
2069 metrics: None,
2070 all_metrics: Some(true),
2071 };
2072
2073 let json_str = r#"{
2074 "aggregated_summary": {
2075 "sum": 100.0,
2076 "count": 7,
2077 "quantiles": [
2078 {
2079 "quantile": 0.05,
2080 "value": 10.0
2081 },
2082 {
2083 "quantile": 0.95,
2084 "value": 25.0
2085 }
2086 ]
2087 },
2088 "kind": "absolute",
2089 "name": "test.transform.histogram",
2090 "tags": {
2091 "env": "test_env",
2092 "host": "localhost"
2093 }
2094 }"#;
2095 let log = create_log_event(json_str);
2096 let metric = do_transform(config, log.clone()).await.unwrap();
2097 assert_eq!(
2098 *metric.as_metric(),
2099 Metric::new_with_metadata(
2100 "test.transform.histogram",
2101 MetricKind::Absolute,
2102 MetricValue::AggregatedSummary {
2103 quantiles: vec![
2104 Quantile {
2105 quantile: 0.05,
2106 value: 10.0,
2107 },
2108 Quantile {
2109 quantile: 0.95,
2110 value: 25.0,
2111 },
2112 ],
2113 count: 7,
2114 sum: 100.0,
2115 },
2116 metric.metadata().clone(),
2117 )
2118 .with_namespace(Some(TEST_NAMESPACE))
2119 .with_tags(Some(metric_tags!(
2120 "env" => "test_env",
2121 "host" => "localhost",
2122 )))
2123 .with_timestamp(Some(ts()))
2124 );
2125 }
2126
2127 #[tokio::test]
2128 async fn transform_counter() {
2129 let config = LogToMetricConfig {
2130 metrics: None,
2131 all_metrics: Some(true),
2132 };
2133
2134 let json_str = r#"{
2135 "counter": {
2136 "value": 10.0
2137 },
2138 "kind": "incremental",
2139 "name": "test.transform.counter",
2140 "tags": {
2141 "env": "test_env",
2142 "host": "localhost"
2143 }
2144 }"#;
2145 let log = create_log_event(json_str);
2146 let metric = do_transform(config, log.clone()).await.unwrap();
2147 assert_eq!(
2148 *metric.as_metric(),
2149 Metric::new_with_metadata(
2150 "test.transform.counter",
2151 MetricKind::Incremental,
2152 MetricValue::Counter { value: 10.0 },
2153 metric.metadata().clone(),
2154 )
2155 .with_namespace(Some(TEST_NAMESPACE))
2156 .with_tags(Some(metric_tags!(
2157 "env" => "test_env",
2158 "host" => "localhost",
2159 )))
2160 .with_timestamp(Some(ts()))
2161 );
2162 }
2163
2164 #[tokio::test]
2165 async fn transform_set() {
2166 let config = LogToMetricConfig {
2167 metrics: None,
2168 all_metrics: Some(true),
2169 };
2170
2171 let json_str = r#"{
2172 "set": {
2173 "values": ["990.0", "1234"]
2174 },
2175 "kind": "incremental",
2176 "name": "test.transform.set",
2177 "tags": {
2178 "env": "test_env",
2179 "host": "localhost"
2180 }
2181 }"#;
2182 let log = create_log_event(json_str);
2183 let metric = do_transform(config, log.clone()).await.unwrap();
2184 assert_eq!(
2185 *metric.as_metric(),
2186 Metric::new_with_metadata(
2187 "test.transform.set",
2188 MetricKind::Incremental,
2189 MetricValue::Set {
2190 values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2191 },
2192 metric.metadata().clone(),
2193 )
2194 .with_namespace(Some(TEST_NAMESPACE))
2195 .with_tags(Some(metric_tags!(
2196 "env" => "test_env",
2197 "host" => "localhost",
2198 )))
2199 .with_timestamp(Some(ts()))
2200 );
2201 }
2202
2203 #[tokio::test]
2204 async fn transform_all_metrics_optional_namespace() {
2205 let config = LogToMetricConfig {
2206 metrics: None,
2207 all_metrics: Some(true),
2208 };
2209
2210 let json_str = r#"{
2211 "counter": {
2212 "value": 10.0
2213 },
2214 "kind": "incremental",
2215 "name": "test.transform.counter",
2216 "tags": {
2217 "env": "test_env",
2218 "host": "localhost"
2219 }
2220 }"#;
2221 let log = create_log_event_with_namespace(json_str, None);
2222 let metric = do_transform(config, log.clone()).await.unwrap();
2223 assert_eq!(
2224 *metric.as_metric(),
2225 Metric::new_with_metadata(
2226 "test.transform.counter",
2227 MetricKind::Incremental,
2228 MetricValue::Counter { value: 10.0 },
2229 metric.metadata().clone(),
2230 )
2231 .with_tags(Some(metric_tags!(
2232 "env" => "test_env",
2233 "host" => "localhost",
2234 )))
2235 .with_timestamp(Some(ts()))
2236 );
2237 }
2238}