1use std::sync::Arc;
2use std::{collections::HashMap, num::ParseFloatError};
3
4use chrono::Utc;
5use indexmap::IndexMap;
6use vector_lib::configurable::configurable_component;
7use vector_lib::event::LogEvent;
8use vector_lib::{
9 config::LogNamespace,
10 event::DatadogMetricOriginMetadata,
11 event::{
12 metric::Sample,
13 metric::{Bucket, Quantile},
14 },
15};
16use vrl::path::{parse_target_path, PathParseError};
17use vrl::{event_path, path};
18
19use crate::config::schema::Definition;
20use crate::transforms::log_to_metric::TransformError::PathNotFound;
21use crate::{
22 common::expansion::pair_expansion,
23 config::{
24 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
25 TransformOutput,
26 },
27 event::{
28 metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
29 Event, Value,
30 },
31 internal_events::{
32 LogToMetricFieldNullError, LogToMetricParseFloatError,
33 MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
34 MetricMetadataParseError, ParserMissingFieldError, DROP_EVENT,
35 },
36 schema,
37 template::{Template, TemplateRenderingError},
38 transforms::{FunctionTransform, OutputBuffer, Transform},
39};
40
41const ORIGIN_SERVICE_VALUE: u32 = 3;
42
43#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct LogToMetricConfig {
48 pub metrics: Option<Vec<MetricConfig>>,
50
51 pub all_metrics: Option<bool>,
79}
80
81#[configurable_component]
83#[derive(Clone, Debug)]
84pub struct CounterConfig {
85 #[serde(default = "default_increment_by_value")]
87 pub increment_by_value: bool,
88
89 #[configurable(derived)]
90 #[serde(default = "default_kind")]
91 pub kind: MetricKind,
92}
93
94#[configurable_component]
101#[derive(Clone, Debug)]
102pub struct MetricConfig {
103 pub field: Template,
105
106 pub name: Option<Template>,
110
111 pub namespace: Option<Template>,
113
114 #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
119 pub tags: Option<IndexMap<Template, TagConfig>>,
120
121 #[configurable(derived)]
122 #[serde(flatten)]
123 pub metric: MetricTypeConfig,
124}
125
126#[configurable_component]
130#[derive(Clone, Debug)]
131#[serde(untagged)]
132pub enum TagConfig {
133 Plain(Option<Template>),
135
136 Multi(Vec<Option<Template>>),
138}
139
140#[configurable_component]
142#[derive(Clone, Debug)]
143#[serde(tag = "type", rename_all = "snake_case")]
144#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
145pub enum MetricTypeConfig {
146 Counter(CounterConfig),
148
149 Histogram,
151
152 Gauge,
154
155 Set,
157
158 Summary,
160}
161
162impl MetricConfig {
163 fn field(&self) -> &str {
164 self.field.get_ref()
165 }
166}
167
168const fn default_increment_by_value() -> bool {
169 false
170}
171
172const fn default_kind() -> MetricKind {
173 MetricKind::Incremental
174}
175
176#[derive(Debug, Clone)]
177pub struct LogToMetric {
178 pub metrics: Vec<MetricConfig>,
179 pub all_metrics: bool,
180}
181
182impl GenerateConfig for LogToMetricConfig {
183 fn generate_config() -> toml::Value {
184 toml::Value::try_from(Self {
185 metrics: Some(vec![MetricConfig {
186 field: "field_name".try_into().expect("Fixed template"),
187 name: None,
188 namespace: None,
189 tags: None,
190 metric: MetricTypeConfig::Counter(CounterConfig {
191 increment_by_value: false,
192 kind: MetricKind::Incremental,
193 }),
194 }]),
195 all_metrics: Some(true),
196 })
197 .unwrap()
198 }
199}
200
201#[async_trait::async_trait]
202#[typetag::serde(name = "log_to_metric")]
203impl TransformConfig for LogToMetricConfig {
204 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
205 Ok(Transform::function(LogToMetric {
206 metrics: self.metrics.clone().unwrap_or_default(),
207 all_metrics: self.all_metrics.unwrap_or_default(),
208 }))
209 }
210
211 fn input(&self) -> Input {
212 Input::log()
213 }
214
215 fn outputs(
216 &self,
217 _: vector_lib::enrichment::TableRegistry,
218 _: &[(OutputId, schema::Definition)],
219 _: LogNamespace,
220 ) -> Vec<TransformOutput> {
221 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
223 }
224
225 fn enable_concurrency(&self) -> bool {
226 true
227 }
228}
229
230#[configurable_component]
232#[derive(Clone, Debug)]
233pub enum TransformParseErrorKind {
234 FloatError,
236 IntError,
238 ArrayError,
240}
241
242impl std::fmt::Display for TransformParseErrorKind {
243 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
244 write!(f, "{self:?}")
245 }
246}
247
248enum TransformError {
249 PathNotFound {
250 path: String,
251 },
252 PathNull {
253 path: String,
254 },
255 MetricDetailsNotFound,
256 MetricValueError {
257 path: String,
258 path_value: String,
259 },
260 ParseError {
261 path: String,
262 kind: TransformParseErrorKind,
263 },
264 ParseFloatError {
265 path: String,
266 error: ParseFloatError,
267 },
268 TemplateRenderingError(TemplateRenderingError),
269 PairExpansionError {
270 key: String,
271 value: String,
272 error: serde_json::Error,
273 },
274}
275
276fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
277 template
278 .render_string(event)
279 .map_err(TransformError::TemplateRenderingError)
280}
281
282fn render_tags(
283 tags: &Option<IndexMap<Template, TagConfig>>,
284 event: &Event,
285) -> Result<Option<MetricTags>, TransformError> {
286 let mut static_tags: HashMap<String, String> = HashMap::new();
287 let mut dynamic_tags: HashMap<String, String> = HashMap::new();
288 Ok(match tags {
289 None => None,
290 Some(tags) => {
291 let mut result = MetricTags::default();
292 for (name, config) in tags {
293 match config {
294 TagConfig::Plain(template) => {
295 render_tag_into(
296 event,
297 name,
298 template.as_ref(),
299 &mut result,
300 &mut static_tags,
301 &mut dynamic_tags,
302 )?;
303 }
304 TagConfig::Multi(vec) => {
305 for template in vec {
306 render_tag_into(
307 event,
308 name,
309 template.as_ref(),
310 &mut result,
311 &mut static_tags,
312 &mut dynamic_tags,
313 )?;
314 }
315 }
316 }
317 }
318 for (k, v) in static_tags {
319 if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
320 warn!(
321 "Static tags overrides dynamic tags. \
322 key: {}, value: {:?}, discarded value: {:?}",
323 k, v, discarded_v
324 );
325 };
326 }
327 result.as_option()
328 }
329 })
330}
331
332fn render_tag_into(
333 event: &Event,
334 key_template: &Template,
335 value_template: Option<&Template>,
336 result: &mut MetricTags,
337 static_tags: &mut HashMap<String, String>,
338 dynamic_tags: &mut HashMap<String, String>,
339) -> Result<(), TransformError> {
340 let key = match render_template(key_template, event) {
341 Ok(key_s) => key_s,
342 Err(TransformError::TemplateRenderingError(err)) => {
343 emit!(crate::internal_events::TemplateRenderingError {
344 error: err,
345 drop_event: false,
346 field: Some(key_template.get_ref()),
347 });
348 return Ok(());
349 }
350 Err(err) => return Err(err),
351 };
352 match value_template {
353 None => {
354 result.insert(key, TagValue::Bare);
355 }
356 Some(template) => match render_template(template, event) {
357 Ok(value) => {
358 let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
359 .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
360 result.extend(expanded_pairs);
361 }
362 Err(TransformError::TemplateRenderingError(value_error)) => {
363 emit!(crate::internal_events::TemplateRenderingError {
364 error: value_error,
365 drop_event: false,
366 field: Some(template.get_ref()),
367 });
368 return Ok(());
369 }
370 Err(other) => return Err(other),
371 },
372 };
373 Ok(())
374}
375
376fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
377 let log = event.as_log();
378
379 let timestamp = log
380 .get_timestamp()
381 .and_then(Value::as_timestamp)
382 .cloned()
383 .or_else(|| Some(Utc::now()));
384
385 let metadata = event
387 .metadata()
388 .clone()
389 .with_schema_definition(&Arc::new(Definition::any()))
390 .with_origin_metadata(DatadogMetricOriginMetadata::new(
391 None,
392 None,
393 Some(ORIGIN_SERVICE_VALUE),
394 ));
395
396 let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
397 path: config.field().to_string(),
398 })?;
399
400 let value = match log.get(&field) {
401 None => Err(TransformError::PathNotFound {
402 path: field.to_string(),
403 }),
404 Some(Value::Null) => Err(TransformError::PathNull {
405 path: field.to_string(),
406 }),
407 Some(value) => Ok(value),
408 }?;
409
410 let name = config.name.as_ref().unwrap_or(&config.field);
411 let name = render_template(name, event)?;
412
413 let namespace = config.namespace.as_ref();
414 let namespace = namespace
415 .map(|namespace| render_template(namespace, event))
416 .transpose()?;
417
418 let tags = render_tags(&config.tags, event)?;
419
420 let (kind, value) = match &config.metric {
421 MetricTypeConfig::Counter(counter) => {
422 let value = if counter.increment_by_value {
423 value.to_string_lossy().parse().map_err(|error| {
424 TransformError::ParseFloatError {
425 path: config.field.get_ref().to_owned(),
426 error,
427 }
428 })?
429 } else {
430 1.0
431 };
432
433 (counter.kind, MetricValue::Counter { value })
434 }
435 MetricTypeConfig::Histogram => {
436 let value = value.to_string_lossy().parse().map_err(|error| {
437 TransformError::ParseFloatError {
438 path: field.to_string(),
439 error,
440 }
441 })?;
442
443 (
444 MetricKind::Incremental,
445 MetricValue::Distribution {
446 samples: vector_lib::samples![value => 1],
447 statistic: StatisticKind::Histogram,
448 },
449 )
450 }
451 MetricTypeConfig::Summary => {
452 let value = value.to_string_lossy().parse().map_err(|error| {
453 TransformError::ParseFloatError {
454 path: field.to_string(),
455 error,
456 }
457 })?;
458
459 (
460 MetricKind::Incremental,
461 MetricValue::Distribution {
462 samples: vector_lib::samples![value => 1],
463 statistic: StatisticKind::Summary,
464 },
465 )
466 }
467 MetricTypeConfig::Gauge => {
468 let value = value.to_string_lossy().parse().map_err(|error| {
469 TransformError::ParseFloatError {
470 path: field.to_string(),
471 error,
472 }
473 })?;
474
475 (MetricKind::Absolute, MetricValue::Gauge { value })
476 }
477 MetricTypeConfig::Set => {
478 let value = value.to_string_lossy().into_owned();
479
480 (
481 MetricKind::Incremental,
482 MetricValue::Set {
483 values: std::iter::once(value).collect(),
484 },
485 )
486 }
487 };
488 Ok(Metric::new_with_metadata(name, kind, value, metadata)
489 .with_namespace(namespace)
490 .with_tags(tags)
491 .with_timestamp(timestamp))
492}
493
494fn bytes_to_str(value: &Value) -> Option<String> {
495 match value {
496 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
497 _ => None,
498 }
499}
500
501fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
502 let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
504 PathParseError::InvalidPathSyntax { path } => PathNotFound {
505 path: path.to_string(),
506 },
507 })?;
508 match maybe_value {
509 None => Err(PathNotFound {
510 path: path.to_string(),
511 }),
512 Some(v) => Ok(bytes_to_str(v)),
513 }
514}
515
516fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
517 let counter_value = log
518 .get(event_path!("counter", "value"))
519 .ok_or_else(|| TransformError::PathNotFound {
520 path: "counter.value".to_string(),
521 })?
522 .as_float()
523 .ok_or_else(|| TransformError::ParseError {
524 path: "counter.value".to_string(),
525 kind: TransformParseErrorKind::FloatError,
526 })?;
527
528 Ok(MetricValue::Counter {
529 value: *counter_value,
530 })
531}
532
533fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
534 let gauge_value = log
535 .get(event_path!("gauge", "value"))
536 .ok_or_else(|| TransformError::PathNotFound {
537 path: "gauge.value".to_string(),
538 })?
539 .as_float()
540 .ok_or_else(|| TransformError::ParseError {
541 path: "gauge.value".to_string(),
542 kind: TransformParseErrorKind::FloatError,
543 })?;
544 Ok(MetricValue::Gauge {
545 value: *gauge_value,
546 })
547}
548
549fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
550 let set_values = log
551 .get(event_path!("set", "values"))
552 .ok_or_else(|| TransformError::PathNotFound {
553 path: "set.values".to_string(),
554 })?
555 .as_array()
556 .ok_or_else(|| TransformError::ParseError {
557 path: "set.values".to_string(),
558 kind: TransformParseErrorKind::ArrayError,
559 })?;
560
561 let mut values: Vec<String> = Vec::new();
562 for e_value in set_values {
563 let value = e_value
564 .as_bytes()
565 .ok_or_else(|| TransformError::ParseError {
566 path: "set.values".to_string(),
567 kind: TransformParseErrorKind::ArrayError,
568 })?;
569 values.push(String::from_utf8_lossy(value).to_string());
570 }
571
572 Ok(MetricValue::Set {
573 values: values.into_iter().collect(),
574 })
575}
576
577fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
578 let event_samples = log
579 .get(event_path!("distribution", "samples"))
580 .ok_or_else(|| TransformError::PathNotFound {
581 path: "distribution.samples".to_string(),
582 })?
583 .as_array()
584 .ok_or_else(|| TransformError::ParseError {
585 path: "distribution.samples".to_string(),
586 kind: TransformParseErrorKind::ArrayError,
587 })?;
588
589 let mut samples: Vec<Sample> = Vec::new();
590 for e_sample in event_samples {
591 let value = e_sample
592 .get(path!("value"))
593 .ok_or_else(|| TransformError::PathNotFound {
594 path: "value".to_string(),
595 })?
596 .as_float()
597 .ok_or_else(|| TransformError::ParseError {
598 path: "value".to_string(),
599 kind: TransformParseErrorKind::FloatError,
600 })?;
601
602 let rate = e_sample
603 .get(path!("rate"))
604 .ok_or_else(|| TransformError::PathNotFound {
605 path: "rate".to_string(),
606 })?
607 .as_integer()
608 .ok_or_else(|| TransformError::ParseError {
609 path: "rate".to_string(),
610 kind: TransformParseErrorKind::IntError,
611 })?;
612
613 samples.push(Sample {
614 value: *value,
615 rate: rate as u32,
616 });
617 }
618
619 let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
620 Some(n) => n,
621 None => {
622 return Err(TransformError::PathNotFound {
623 path: "distribution.statistic".to_string(),
624 })
625 }
626 };
627 let statistic_kind = match statistic_str.as_str() {
628 "histogram" => Ok(StatisticKind::Histogram),
629 "summary" => Ok(StatisticKind::Summary),
630 _ => Err(TransformError::MetricValueError {
631 path: "distribution.statistic".to_string(),
632 path_value: statistic_str.to_string(),
633 }),
634 }?;
635
636 Ok(MetricValue::Distribution {
637 samples,
638 statistic: statistic_kind,
639 })
640}
641
642fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
643 let event_buckets = log
644 .get(event_path!("histogram", "buckets"))
645 .ok_or_else(|| TransformError::PathNotFound {
646 path: "histogram.buckets".to_string(),
647 })?
648 .as_array()
649 .ok_or_else(|| TransformError::ParseError {
650 path: "histogram.buckets".to_string(),
651 kind: TransformParseErrorKind::ArrayError,
652 })?;
653
654 let mut buckets: Vec<Bucket> = Vec::new();
655 for e_bucket in event_buckets {
656 let upper_limit = e_bucket
657 .get(path!("upper_limit"))
658 .ok_or_else(|| TransformError::PathNotFound {
659 path: "histogram.buckets.upper_limit".to_string(),
660 })?
661 .as_float()
662 .ok_or_else(|| TransformError::ParseError {
663 path: "histogram.buckets.upper_limit".to_string(),
664 kind: TransformParseErrorKind::FloatError,
665 })?;
666
667 let count = e_bucket
668 .get(path!("count"))
669 .ok_or_else(|| TransformError::PathNotFound {
670 path: "histogram.buckets.count".to_string(),
671 })?
672 .as_integer()
673 .ok_or_else(|| TransformError::ParseError {
674 path: "histogram.buckets.count".to_string(),
675 kind: TransformParseErrorKind::IntError,
676 })?;
677
678 buckets.push(Bucket {
679 upper_limit: *upper_limit,
680 count: count as u64,
681 });
682 }
683
684 let count = log
685 .get(event_path!("histogram", "count"))
686 .ok_or_else(|| TransformError::PathNotFound {
687 path: "histogram.count".to_string(),
688 })?
689 .as_integer()
690 .ok_or_else(|| TransformError::ParseError {
691 path: "histogram.count".to_string(),
692 kind: TransformParseErrorKind::IntError,
693 })?;
694
695 let sum = log
696 .get(event_path!("histogram", "sum"))
697 .ok_or_else(|| TransformError::PathNotFound {
698 path: "histogram.sum".to_string(),
699 })?
700 .as_float()
701 .ok_or_else(|| TransformError::ParseError {
702 path: "histogram.sum".to_string(),
703 kind: TransformParseErrorKind::FloatError,
704 })?;
705
706 Ok(MetricValue::AggregatedHistogram {
707 buckets,
708 count: count as u64,
709 sum: *sum,
710 })
711}
712
713fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
714 let event_quantiles = log
715 .get(event_path!("summary", "quantiles"))
716 .ok_or_else(|| TransformError::PathNotFound {
717 path: "summary.quantiles".to_string(),
718 })?
719 .as_array()
720 .ok_or_else(|| TransformError::ParseError {
721 path: "summary.quantiles".to_string(),
722 kind: TransformParseErrorKind::ArrayError,
723 })?;
724
725 let mut quantiles: Vec<Quantile> = Vec::new();
726 for e_quantile in event_quantiles {
727 let quantile = e_quantile
728 .get(path!("quantile"))
729 .ok_or_else(|| TransformError::PathNotFound {
730 path: "summary.quantiles.quantile".to_string(),
731 })?
732 .as_float()
733 .ok_or_else(|| TransformError::ParseError {
734 path: "summary.quantiles.quantile".to_string(),
735 kind: TransformParseErrorKind::FloatError,
736 })?;
737
738 let value = e_quantile
739 .get(path!("value"))
740 .ok_or_else(|| TransformError::PathNotFound {
741 path: "summary.quantiles.value".to_string(),
742 })?
743 .as_float()
744 .ok_or_else(|| TransformError::ParseError {
745 path: "summary.quantiles.value".to_string(),
746 kind: TransformParseErrorKind::FloatError,
747 })?;
748
749 quantiles.push(Quantile {
750 quantile: *quantile,
751 value: *value,
752 })
753 }
754
755 let count = log
756 .get(event_path!("summary", "count"))
757 .ok_or_else(|| TransformError::PathNotFound {
758 path: "summary.count".to_string(),
759 })?
760 .as_integer()
761 .ok_or_else(|| TransformError::ParseError {
762 path: "summary.count".to_string(),
763 kind: TransformParseErrorKind::IntError,
764 })?;
765
766 let sum = log
767 .get(event_path!("summary", "sum"))
768 .ok_or_else(|| TransformError::PathNotFound {
769 path: "summary.sum".to_string(),
770 })?
771 .as_float()
772 .ok_or_else(|| TransformError::ParseError {
773 path: "summary.sum".to_string(),
774 kind: TransformParseErrorKind::FloatError,
775 })?;
776
777 Ok(MetricValue::AggregatedSummary {
778 quantiles,
779 count: count as u64,
780 sum: *sum,
781 })
782}
783
784fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
785 let log = event.as_log();
786 let timestamp = log
787 .get_timestamp()
788 .and_then(Value::as_timestamp)
789 .cloned()
790 .or_else(|| Some(Utc::now()));
791
792 let name = match try_get_string_from_log(log, "name")? {
793 Some(n) => n,
794 None => {
795 return Err(TransformError::PathNotFound {
796 path: "name".to_string(),
797 })
798 }
799 };
800
801 let mut tags = MetricTags::default();
802
803 if let Some(els) = log.get(event_path!("tags")) {
804 if let Some(el) = els.as_object() {
805 for (key, value) in el {
806 tags.insert(key.to_string(), bytes_to_str(value));
807 }
808 }
809 }
810 let tags_result = Some(tags);
811
812 let kind_str = match try_get_string_from_log(log, "kind")? {
813 Some(n) => n,
814 None => {
815 return Err(TransformError::PathNotFound {
816 path: "kind".to_string(),
817 })
818 }
819 };
820
821 let kind = match kind_str.as_str() {
822 "absolute" => Ok(MetricKind::Absolute),
823 "incremental" => Ok(MetricKind::Incremental),
824 value => Err(TransformError::MetricValueError {
825 path: "kind".to_string(),
826 path_value: value.to_string(),
827 }),
828 }?;
829
830 let mut value: Option<MetricValue> = None;
831 if let Some(root_event) = log.as_map() {
832 for key in root_event.keys() {
833 value = match key.as_str() {
834 "gauge" => Some(get_gauge_value(log)?),
835 "distribution" => Some(get_distribution_value(log)?),
836 "histogram" => Some(get_histogram_value(log)?),
837 "summary" => Some(get_summary_value(log)?),
838 "counter" => Some(get_counter_value(log)?),
839 "set" => Some(get_set_value(log)?),
840 _ => None,
841 };
842
843 if value.is_some() {
844 break;
845 }
846 }
847 }
848
849 let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
850
851 let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
852 .with_tags(tags_result)
853 .with_timestamp(timestamp);
854
855 if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
856 metric = metric.with_namespace(namespace);
857 }
858
859 Ok(metric)
860}
861
862impl FunctionTransform for LogToMetric {
863 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
864 let mut buffer = Vec::with_capacity(self.metrics.len());
866 if self.all_metrics {
867 match to_metrics(&event) {
868 Ok(metric) => {
869 output.push(Event::Metric(metric));
870 }
871 Err(err) => {
872 match err {
873 TransformError::MetricValueError { path, path_value } => {
874 emit!(MetricMetadataInvalidFieldValueError {
875 field: path.as_ref(),
876 field_value: path_value.as_ref()
877 })
878 }
879 TransformError::PathNotFound { path } => {
880 emit!(ParserMissingFieldError::<DROP_EVENT> {
881 field: path.as_ref()
882 })
883 }
884 TransformError::ParseError { path, kind } => {
885 emit!(MetricMetadataParseError {
886 field: path.as_ref(),
887 kind: &kind.to_string(),
888 })
889 }
890 TransformError::MetricDetailsNotFound => {
891 emit!(MetricMetadataMetricDetailsNotFoundError {})
892 }
893 TransformError::PairExpansionError { key, value, error } => {
894 emit!(crate::internal_events::PairExpansionError {
895 key: &key,
896 value: &value,
897 drop_event: true,
898 error
899 })
900 }
901 _ => {}
902 };
903 }
904 }
905 } else {
906 for config in self.metrics.iter() {
907 match to_metric_with_config(config, &event) {
908 Ok(metric) => {
909 buffer.push(Event::Metric(metric));
910 }
911 Err(err) => {
912 match err {
913 TransformError::PathNull { path } => {
914 emit!(LogToMetricFieldNullError {
915 field: path.as_ref()
916 })
917 }
918 TransformError::PathNotFound { path } => {
919 emit!(ParserMissingFieldError::<DROP_EVENT> {
920 field: path.as_ref()
921 })
922 }
923 TransformError::ParseFloatError { path, error } => {
924 emit!(LogToMetricParseFloatError {
925 field: path.as_ref(),
926 error
927 })
928 }
929 TransformError::TemplateRenderingError(error) => {
930 emit!(crate::internal_events::TemplateRenderingError {
931 error,
932 drop_event: true,
933 field: None,
934 })
935 }
936 TransformError::PairExpansionError { key, value, error } => {
937 emit!(crate::internal_events::PairExpansionError {
938 key: &key,
939 value: &value,
940 drop_event: true,
941 error
942 })
943 }
944 _ => {}
945 };
946 return;
948 }
949 }
950 }
951 }
952
953 for event in buffer {
955 output.push(event);
956 }
957 }
958}
959
960#[cfg(test)]
961mod tests {
962 use super::*;
963 use crate::test_util::components::assert_transform_compliance;
964 use crate::transforms::test::create_topology;
965 use crate::{
966 config::log_schema,
967 event::{
968 metric::{Metric, MetricKind, MetricValue, StatisticKind},
969 Event, LogEvent,
970 },
971 };
972 use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
973 use std::sync::Arc;
974 use std::time::Duration;
975 use tokio::sync::mpsc;
976 use tokio_stream::wrappers::ReceiverStream;
977 use vector_lib::config::ComponentKey;
978 use vector_lib::event::{EventMetadata, ObjectMap};
979 use vector_lib::metric_tags;
980
981 #[test]
982 fn generate_config() {
983 crate::test_util::test_generate_config::<LogToMetricConfig>();
984 }
985
986 fn parse_config(s: &str) -> LogToMetricConfig {
987 toml::from_str(s).unwrap()
988 }
989
990 fn parse_yaml_config(s: &str) -> LogToMetricConfig {
991 serde_yaml::from_str(s).unwrap()
992 }
993
994 fn ts() -> DateTime<Utc> {
995 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
996 .single()
997 .and_then(|t| t.with_nanosecond(11))
998 .expect("invalid timestamp")
999 }
1000
1001 fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1002 let mut log = Event::Log(LogEvent::from("i am a log"));
1003 log.as_mut_log().insert(key, value);
1004 log.as_mut_log()
1005 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1006 log
1007 }
1008
1009 async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1010 assert_transform_compliance(async move {
1011 let (tx, rx) = mpsc::channel(1);
1012 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1013 tx.send(event).await.unwrap();
1014 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1015 .await
1016 .unwrap_or(None);
1017 drop(tx);
1018 topology.stop().await;
1019 assert_eq!(out.recv().await, None);
1020 result
1021 })
1022 .await
1023 }
1024
1025 async fn do_transform_multiple_events(
1026 config: LogToMetricConfig,
1027 event: Event,
1028 count: usize,
1029 ) -> Vec<Event> {
1030 assert_transform_compliance(async move {
1031 let (tx, rx) = mpsc::channel(1);
1032 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1033 tx.send(event).await.unwrap();
1034
1035 let mut results = vec![];
1036 for _ in 0..count {
1037 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1038 .await
1039 .unwrap_or(None);
1040 if let Some(event) = result {
1041 results.push(event);
1042 }
1043 }
1044
1045 drop(tx);
1046 topology.stop().await;
1047 assert_eq!(out.recv().await, None);
1048 results
1049 })
1050 .await
1051 }
1052
1053 #[tokio::test]
1054 async fn count_http_status_codes() {
1055 let config = parse_config(
1056 r#"
1057 [[metrics]]
1058 type = "counter"
1059 field = "status"
1060 "#,
1061 );
1062
1063 let event = create_event("status", "42");
1064 let mut metadata =
1065 event
1066 .metadata()
1067 .clone()
1068 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1069 None,
1070 None,
1071 Some(ORIGIN_SERVICE_VALUE),
1072 ));
1073 metadata.set_schema_definition(&Arc::new(Definition::any()));
1075 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1076 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1077 let metric = do_transform(config, event).await.unwrap();
1078
1079 assert_eq!(
1080 metric.into_metric(),
1081 Metric::new_with_metadata(
1082 "status",
1083 MetricKind::Incremental,
1084 MetricValue::Counter { value: 1.0 },
1085 metadata,
1086 )
1087 .with_timestamp(Some(ts()))
1088 );
1089 }
1090
1091 #[tokio::test]
1092 async fn count_http_requests_with_tags() {
1093 let config = parse_config(
1094 r#"
1095 [[metrics]]
1096 type = "counter"
1097 field = "message"
1098 name = "http_requests_total"
1099 namespace = "app"
1100 tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1101 "#,
1102 );
1103
1104 let mut event = create_event("message", "i am log");
1105 event.as_mut_log().insert("method", "post");
1106 event.as_mut_log().insert("code", "200");
1107 let mut metadata =
1108 event
1109 .metadata()
1110 .clone()
1111 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1112 None,
1113 None,
1114 Some(ORIGIN_SERVICE_VALUE),
1115 ));
1116 metadata.set_schema_definition(&Arc::new(Definition::any()));
1118 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1119 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1120
1121 let metric = do_transform(config, event).await.unwrap();
1122
1123 assert_eq!(
1124 metric.into_metric(),
1125 Metric::new_with_metadata(
1126 "http_requests_total",
1127 MetricKind::Incremental,
1128 MetricValue::Counter { value: 1.0 },
1129 metadata,
1130 )
1131 .with_namespace(Some("app"))
1132 .with_tags(Some(metric_tags!(
1133 "method" => "post",
1134 "code" => "200",
1135 "host" => "localhost",
1136 )))
1137 .with_timestamp(Some(ts()))
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn count_http_requests_with_tags_expansion() {
1143 let config = parse_config(
1144 r#"
1145 [[metrics]]
1146 type = "counter"
1147 field = "message"
1148 name = "http_requests_total"
1149 namespace = "app"
1150 tags = {"*" = "{{ dict }}"}
1151 "#,
1152 );
1153
1154 let mut event = create_event("message", "i am log");
1155 let log = event.as_mut_log();
1156
1157 let mut test_dict = ObjectMap::default();
1158 test_dict.insert("one".into(), Value::from("foo"));
1159 test_dict.insert("two".into(), Value::from("baz"));
1160 log.insert("dict", Value::from(test_dict));
1161
1162 let mut metadata =
1163 event
1164 .metadata()
1165 .clone()
1166 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1167 None,
1168 None,
1169 Some(ORIGIN_SERVICE_VALUE),
1170 ));
1171 metadata.set_schema_definition(&Arc::new(Definition::any()));
1173 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1174 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1175
1176 let metric = do_transform(config, event).await.unwrap();
1177
1178 assert_eq!(
1179 metric.into_metric(),
1180 Metric::new_with_metadata(
1181 "http_requests_total",
1182 MetricKind::Incremental,
1183 MetricValue::Counter { value: 1.0 },
1184 metadata,
1185 )
1186 .with_namespace(Some("app"))
1187 .with_tags(Some(metric_tags!(
1188 "one" => "foo",
1189 "two" => "baz",
1190 )))
1191 .with_timestamp(Some(ts()))
1192 );
1193 }
1194 #[tokio::test]
1195 async fn count_http_requests_with_colliding_dynamic_tags() {
1196 let config = parse_config(
1197 r#"
1198 [[metrics]]
1199 type = "counter"
1200 field = "message"
1201 name = "http_requests_total"
1202 namespace = "app"
1203 tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1204 "#,
1205 );
1206
1207 let mut event = create_event("message", "i am log");
1208 let log = event.as_mut_log();
1209
1210 let mut map1 = ObjectMap::default();
1211 map1.insert("key1".into(), Value::from("val1"));
1212 log.insert("map1", Value::from(map1));
1213
1214 let mut map2 = ObjectMap::default();
1215 map2.insert("l1_key1".into(), Value::from("val2"));
1216 log.insert("map2", Value::from(map2));
1217
1218 let mut metadata =
1219 event
1220 .metadata()
1221 .clone()
1222 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1223 None,
1224 None,
1225 Some(ORIGIN_SERVICE_VALUE),
1226 ));
1227 metadata.set_schema_definition(&Arc::new(Definition::any()));
1229 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1230 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1231
1232 let metric = do_transform(config, event).await.unwrap().into_metric();
1233 let tags = metric.tags().expect("Metric should have tags");
1234
1235 assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1236
1237 assert_eq!(tags.iter_all().count(), 2);
1238 for (name, value) in tags.iter_all() {
1239 assert_eq!(name, "l1_key1");
1240 assert!(value == Some("val1") || value == Some("val2"));
1241 }
1242 }
1243 #[tokio::test]
1244 async fn multi_value_tags_yaml() {
1245 let config = parse_yaml_config(
1247 r#"
1248 metrics:
1249 - field: "message"
1250 type: "counter"
1251 tags:
1252 tag:
1253 - "one"
1254 - null
1255 - "two"
1256 "#,
1257 );
1258
1259 let event = create_event("message", "I am log");
1260 let metric = do_transform(config, event).await.unwrap().into_metric();
1261 let tags = metric.tags().expect("Metric should have tags");
1262
1263 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1264
1265 assert_eq!(tags.iter_all().count(), 3);
1266 for (name, value) in tags.iter_all() {
1267 assert_eq!(name, "tag");
1268 assert!(value.is_none() || value == Some("one") || value == Some("two"));
1269 }
1270 }
1271 #[tokio::test]
1272 async fn multi_value_tags_expansion_yaml() {
1273 let config = parse_yaml_config(
1275 r#"
1276 metrics:
1277 - field: "message"
1278 type: "counter"
1279 tags:
1280 "*": "{{dict}}"
1281 "#,
1282 );
1283
1284 let mut event = create_event("message", "I am log");
1285 let log = event.as_mut_log();
1286
1287 let mut test_dict = ObjectMap::default();
1288 test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1289 log.insert("dict", Value::from(test_dict));
1290
1291 let metric = do_transform(config, event).await.unwrap().into_metric();
1292 let tags = metric.tags().expect("Metric should have tags");
1293
1294 assert_eq!(
1295 tags.iter_single().collect::<Vec<_>>(),
1296 vec![("one", "[\"foo\",\"baz\"]")]
1297 );
1298
1299 assert_eq!(tags.iter_all().count(), 1);
1300 for (name, value) in tags.iter_all() {
1301 assert_eq!(name, "one");
1302 assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1303 }
1304 }
1305
1306 #[tokio::test]
1307 async fn multi_value_tags_toml() {
1308 let config = parse_config(
1309 r#"
1310 [[metrics]]
1311 field = "message"
1312 type = "counter"
1313 [metrics.tags]
1314 tag = ["one", "two"]
1315 "#,
1316 );
1317
1318 let event = create_event("message", "I am log");
1319 let metric = do_transform(config, event).await.unwrap().into_metric();
1320 let tags = metric.tags().expect("Metric should have tags");
1321
1322 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1323
1324 assert_eq!(tags.iter_all().count(), 2);
1325 for (name, value) in tags.iter_all() {
1326 assert_eq!(name, "tag");
1327 assert!(value == Some("one") || value == Some("two"));
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn count_exceptions() {
1333 let config = parse_config(
1334 r#"
1335 [[metrics]]
1336 type = "counter"
1337 field = "backtrace"
1338 name = "exception_total"
1339 "#,
1340 );
1341
1342 let event = create_event("backtrace", "message");
1343 let mut metadata =
1344 event
1345 .metadata()
1346 .clone()
1347 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1348 None,
1349 None,
1350 Some(ORIGIN_SERVICE_VALUE),
1351 ));
1352 metadata.set_schema_definition(&Arc::new(Definition::any()));
1354 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1355 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1356
1357 let metric = do_transform(config, event).await.unwrap();
1358
1359 assert_eq!(
1360 metric.into_metric(),
1361 Metric::new_with_metadata(
1362 "exception_total",
1363 MetricKind::Incremental,
1364 MetricValue::Counter { value: 1.0 },
1365 metadata
1366 )
1367 .with_timestamp(Some(ts()))
1368 );
1369 }
1370
1371 #[tokio::test]
1372 async fn count_exceptions_no_match() {
1373 let config = parse_config(
1374 r#"
1375 [[metrics]]
1376 type = "counter"
1377 field = "backtrace"
1378 name = "exception_total"
1379 "#,
1380 );
1381
1382 let event = create_event("success", "42");
1383 assert_eq!(do_transform(config, event).await, None);
1384 }
1385
1386 #[tokio::test]
1387 async fn sum_order_amounts() {
1388 let config = parse_config(
1389 r#"
1390 [[metrics]]
1391 type = "counter"
1392 field = "amount"
1393 name = "amount_total"
1394 increment_by_value = true
1395 "#,
1396 );
1397
1398 let event = create_event("amount", "33.99");
1399 let mut metadata =
1400 event
1401 .metadata()
1402 .clone()
1403 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1404 None,
1405 None,
1406 Some(ORIGIN_SERVICE_VALUE),
1407 ));
1408 metadata.set_schema_definition(&Arc::new(Definition::any()));
1410 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1411 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1412 let metric = do_transform(config, event).await.unwrap();
1413
1414 assert_eq!(
1415 metric.into_metric(),
1416 Metric::new_with_metadata(
1417 "amount_total",
1418 MetricKind::Incremental,
1419 MetricValue::Counter { value: 33.99 },
1420 metadata,
1421 )
1422 .with_timestamp(Some(ts()))
1423 );
1424 }
1425
1426 #[tokio::test]
1427 async fn count_absolute() {
1428 let config = parse_config(
1429 r#"
1430 [[metrics]]
1431 type = "counter"
1432 field = "amount"
1433 name = "amount_total"
1434 increment_by_value = true
1435 kind = "absolute"
1436 "#,
1437 );
1438
1439 let event = create_event("amount", "33.99");
1440 let mut metadata =
1441 event
1442 .metadata()
1443 .clone()
1444 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1445 None,
1446 None,
1447 Some(ORIGIN_SERVICE_VALUE),
1448 ));
1449 metadata.set_schema_definition(&Arc::new(Definition::any()));
1451 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1452 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1453
1454 let metric = do_transform(config, event).await.unwrap();
1455
1456 assert_eq!(
1457 metric.into_metric(),
1458 Metric::new_with_metadata(
1459 "amount_total",
1460 MetricKind::Absolute,
1461 MetricValue::Counter { value: 33.99 },
1462 metadata,
1463 )
1464 .with_timestamp(Some(ts()))
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn memory_usage_gauge() {
1470 let config = parse_config(
1471 r#"
1472 [[metrics]]
1473 type = "gauge"
1474 field = "memory_rss"
1475 name = "memory_rss_bytes"
1476 "#,
1477 );
1478
1479 let event = create_event("memory_rss", "123");
1480 let mut metadata =
1481 event
1482 .metadata()
1483 .clone()
1484 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1485 None,
1486 None,
1487 Some(ORIGIN_SERVICE_VALUE),
1488 ));
1489
1490 metadata.set_schema_definition(&Arc::new(Definition::any()));
1492
1493 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1494 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1495
1496 let metric = do_transform(config, event).await.unwrap();
1497
1498 assert_eq!(
1499 metric.into_metric(),
1500 Metric::new_with_metadata(
1501 "memory_rss_bytes",
1502 MetricKind::Absolute,
1503 MetricValue::Gauge { value: 123.0 },
1504 metadata,
1505 )
1506 .with_timestamp(Some(ts()))
1507 );
1508 }
1509
1510 #[tokio::test]
1511 async fn parse_failure() {
1512 let config = parse_config(
1513 r#"
1514 [[metrics]]
1515 type = "counter"
1516 field = "status"
1517 name = "status_total"
1518 increment_by_value = true
1519 "#,
1520 );
1521
1522 let event = create_event("status", "not a number");
1523 assert_eq!(do_transform(config, event).await, None);
1524 }
1525
1526 #[tokio::test]
1527 async fn missing_field() {
1528 let config = parse_config(
1529 r#"
1530 [[metrics]]
1531 type = "counter"
1532 field = "status"
1533 name = "status_total"
1534 "#,
1535 );
1536
1537 let event = create_event("not foo", "not a number");
1538 assert_eq!(do_transform(config, event).await, None);
1539 }
1540
1541 #[tokio::test]
1542 async fn null_field() {
1543 let config = parse_config(
1544 r#"
1545 [[metrics]]
1546 type = "counter"
1547 field = "status"
1548 name = "status_total"
1549 "#,
1550 );
1551
1552 let event = create_event("status", Value::Null);
1553 assert_eq!(do_transform(config, event).await, None);
1554 }
1555
1556 #[tokio::test]
1557 async fn multiple_metrics() {
1558 let config = parse_config(
1559 r#"
1560 [[metrics]]
1561 type = "counter"
1562 field = "status"
1563
1564 [[metrics]]
1565 type = "counter"
1566 field = "backtrace"
1567 name = "exception_total"
1568 "#,
1569 );
1570
1571 let mut event = Event::Log(LogEvent::from("i am a log"));
1572 event
1573 .as_mut_log()
1574 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1575 event.as_mut_log().insert("status", "42");
1576 event.as_mut_log().insert("backtrace", "message");
1577 let mut metadata =
1578 event
1579 .metadata()
1580 .clone()
1581 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1582 None,
1583 None,
1584 Some(ORIGIN_SERVICE_VALUE),
1585 ));
1586
1587 metadata.set_schema_definition(&Arc::new(Definition::any()));
1589 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1590 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1591
1592 let output = do_transform_multiple_events(config, event, 2).await;
1593
1594 assert_eq!(2, output.len());
1595 assert_eq!(
1596 output[0].clone().into_metric(),
1597 Metric::new_with_metadata(
1598 "status",
1599 MetricKind::Incremental,
1600 MetricValue::Counter { value: 1.0 },
1601 metadata.clone(),
1602 )
1603 .with_timestamp(Some(ts()))
1604 );
1605 assert_eq!(
1606 output[1].clone().into_metric(),
1607 Metric::new_with_metadata(
1608 "exception_total",
1609 MetricKind::Incremental,
1610 MetricValue::Counter { value: 1.0 },
1611 metadata,
1612 )
1613 .with_timestamp(Some(ts()))
1614 );
1615 }
1616
1617 #[tokio::test]
1618 async fn multiple_metrics_with_multiple_templates() {
1619 let config = parse_config(
1620 r#"
1621 [[metrics]]
1622 type = "set"
1623 field = "status"
1624 name = "{{host}}_{{worker}}_status_set"
1625
1626 [[metrics]]
1627 type = "counter"
1628 field = "backtrace"
1629 name = "{{service}}_exception_total"
1630 namespace = "{{host}}"
1631 "#,
1632 );
1633
1634 let mut event = Event::Log(LogEvent::from("i am a log"));
1635 event
1636 .as_mut_log()
1637 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1638 event.as_mut_log().insert("status", "42");
1639 event.as_mut_log().insert("backtrace", "message");
1640 event.as_mut_log().insert("host", "local");
1641 event.as_mut_log().insert("worker", "abc");
1642 event.as_mut_log().insert("service", "xyz");
1643 let mut metadata =
1644 event
1645 .metadata()
1646 .clone()
1647 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1648 None,
1649 None,
1650 Some(ORIGIN_SERVICE_VALUE),
1651 ));
1652
1653 metadata.set_schema_definition(&Arc::new(Definition::any()));
1655 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1656 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1657
1658 let output = do_transform_multiple_events(config, event, 2).await;
1659
1660 assert_eq!(2, output.len());
1661 assert_eq!(
1662 output[0].as_metric(),
1663 &Metric::new_with_metadata(
1664 "local_abc_status_set",
1665 MetricKind::Incremental,
1666 MetricValue::Set {
1667 values: vec!["42".into()].into_iter().collect()
1668 },
1669 metadata.clone(),
1670 )
1671 .with_timestamp(Some(ts()))
1672 );
1673 assert_eq!(
1674 output[1].as_metric(),
1675 &Metric::new_with_metadata(
1676 "xyz_exception_total",
1677 MetricKind::Incremental,
1678 MetricValue::Counter { value: 1.0 },
1679 metadata,
1680 )
1681 .with_namespace(Some("local"))
1682 .with_timestamp(Some(ts()))
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn user_ip_set() {
1688 let config = parse_config(
1689 r#"
1690 [[metrics]]
1691 type = "set"
1692 field = "user_ip"
1693 name = "unique_user_ip"
1694 "#,
1695 );
1696
1697 let event = create_event("user_ip", "1.2.3.4");
1698 let mut metadata =
1699 event
1700 .metadata()
1701 .clone()
1702 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1703 None,
1704 None,
1705 Some(ORIGIN_SERVICE_VALUE),
1706 ));
1707 metadata.set_schema_definition(&Arc::new(Definition::any()));
1709 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1710 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1711
1712 let metric = do_transform(config, event).await.unwrap();
1713
1714 assert_eq!(
1715 metric.into_metric(),
1716 Metric::new_with_metadata(
1717 "unique_user_ip",
1718 MetricKind::Incremental,
1719 MetricValue::Set {
1720 values: vec!["1.2.3.4".into()].into_iter().collect()
1721 },
1722 metadata,
1723 )
1724 .with_timestamp(Some(ts()))
1725 );
1726 }
1727
1728 #[tokio::test]
1729 async fn response_time_histogram() {
1730 let config = parse_config(
1731 r#"
1732 [[metrics]]
1733 type = "histogram"
1734 field = "response_time"
1735 "#,
1736 );
1737
1738 let event = create_event("response_time", "2.5");
1739 let mut metadata =
1740 event
1741 .metadata()
1742 .clone()
1743 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1744 None,
1745 None,
1746 Some(ORIGIN_SERVICE_VALUE),
1747 ));
1748
1749 metadata.set_schema_definition(&Arc::new(Definition::any()));
1751 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1752 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1753
1754 let metric = do_transform(config, event).await.unwrap();
1755
1756 assert_eq!(
1757 metric.into_metric(),
1758 Metric::new_with_metadata(
1759 "response_time",
1760 MetricKind::Incremental,
1761 MetricValue::Distribution {
1762 samples: vector_lib::samples![2.5 => 1],
1763 statistic: StatisticKind::Histogram
1764 },
1765 metadata
1766 )
1767 .with_timestamp(Some(ts()))
1768 );
1769 }
1770
1771 #[tokio::test]
1772 async fn response_time_summary() {
1773 let config = parse_config(
1774 r#"
1775 [[metrics]]
1776 type = "summary"
1777 field = "response_time"
1778 "#,
1779 );
1780
1781 let event = create_event("response_time", "2.5");
1782 let mut metadata =
1783 event
1784 .metadata()
1785 .clone()
1786 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787 None,
1788 None,
1789 Some(ORIGIN_SERVICE_VALUE),
1790 ));
1791
1792 metadata.set_schema_definition(&Arc::new(Definition::any()));
1794 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1795 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1796
1797 let metric = do_transform(config, event).await.unwrap();
1798
1799 assert_eq!(
1800 metric.into_metric(),
1801 Metric::new_with_metadata(
1802 "response_time",
1803 MetricKind::Incremental,
1804 MetricValue::Distribution {
1805 samples: vector_lib::samples![2.5 => 1],
1806 statistic: StatisticKind::Summary
1807 },
1808 metadata
1809 )
1810 .with_timestamp(Some(ts()))
1811 );
1812 }
1813
1814 fn create_log_event(json_str: &str) -> Event {
1817 create_log_event_with_namespace(json_str, Some("test_namespace"))
1818 }
1819
1820 fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1821 let mut log_value: Value =
1822 serde_json::from_str(json_str).expect("JSON was not well-formatted");
1823 log_value.insert("timestamp", ts());
1824
1825 if let Some(namespace) = namespace {
1826 log_value.insert("namespace", namespace);
1827 }
1828
1829 let mut metadata = EventMetadata::default();
1830 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1831 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1832
1833 Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1834 }
1835
1836 #[tokio::test]
1837 async fn transform_gauge() {
1838 let config = LogToMetricConfig {
1839 metrics: None,
1840 all_metrics: Some(true),
1841 };
1842
1843 let json_str = r#"{
1844 "gauge": {
1845 "value": 990.0
1846 },
1847 "kind": "absolute",
1848 "name": "test.transform.gauge",
1849 "tags": {
1850 "env": "test_env",
1851 "host": "localhost"
1852 }
1853 }"#;
1854 let log = create_log_event(json_str);
1855 let metric = do_transform(config, log.clone()).await.unwrap();
1856 assert_eq!(
1857 *metric.as_metric(),
1858 Metric::new_with_metadata(
1859 "test.transform.gauge",
1860 MetricKind::Absolute,
1861 MetricValue::Gauge { value: 990.0 },
1862 metric.metadata().clone(),
1863 )
1864 .with_namespace(Some("test_namespace"))
1865 .with_tags(Some(metric_tags!(
1866 "env" => "test_env",
1867 "host" => "localhost",
1868 )))
1869 .with_timestamp(Some(ts()))
1870 );
1871 }
1872
1873 #[tokio::test]
1874 async fn transform_histogram() {
1875 let config = LogToMetricConfig {
1876 metrics: None,
1877 all_metrics: Some(true),
1878 };
1879
1880 let json_str = r#"{
1881 "histogram": {
1882 "sum": 18.0,
1883 "count": 5,
1884 "buckets": [
1885 {
1886 "upper_limit": 1.0,
1887 "count": 1
1888 },
1889 {
1890 "upper_limit": 2.0,
1891 "count": 2
1892 },
1893 {
1894 "upper_limit": 5.0,
1895 "count": 1
1896 },
1897 {
1898 "upper_limit": 10.0,
1899 "count": 1
1900 }
1901 ]
1902 },
1903 "kind": "absolute",
1904 "name": "test.transform.histogram",
1905 "tags": {
1906 "env": "test_env",
1907 "host": "localhost"
1908 }
1909 }"#;
1910 let log = create_log_event(json_str);
1911 let metric = do_transform(config, log.clone()).await.unwrap();
1912 assert_eq!(
1913 *metric.as_metric(),
1914 Metric::new_with_metadata(
1915 "test.transform.histogram",
1916 MetricKind::Absolute,
1917 MetricValue::AggregatedHistogram {
1918 count: 5,
1919 sum: 18.0,
1920 buckets: vec![
1921 Bucket {
1922 upper_limit: 1.0,
1923 count: 1,
1924 },
1925 Bucket {
1926 upper_limit: 2.0,
1927 count: 2,
1928 },
1929 Bucket {
1930 upper_limit: 5.0,
1931 count: 1,
1932 },
1933 Bucket {
1934 upper_limit: 10.0,
1935 count: 1,
1936 },
1937 ],
1938 },
1939 metric.metadata().clone(),
1940 )
1941 .with_namespace(Some("test_namespace"))
1942 .with_tags(Some(metric_tags!(
1943 "env" => "test_env",
1944 "host" => "localhost",
1945 )))
1946 .with_timestamp(Some(ts()))
1947 );
1948 }
1949
1950 #[tokio::test]
1951 async fn transform_distribution_histogram() {
1952 let config = LogToMetricConfig {
1953 metrics: None,
1954 all_metrics: Some(true),
1955 };
1956
1957 let json_str = r#"{
1958 "distribution": {
1959 "samples": [
1960 {
1961 "value": 1.0,
1962 "rate": 1
1963 },
1964 {
1965 "value": 2.0,
1966 "rate": 2
1967 }
1968 ],
1969 "statistic": "histogram"
1970 },
1971 "kind": "absolute",
1972 "name": "test.transform.distribution_histogram",
1973 "tags": {
1974 "env": "test_env",
1975 "host": "localhost"
1976 }
1977 }"#;
1978 let log = create_log_event(json_str);
1979 let metric = do_transform(config, log.clone()).await.unwrap();
1980 assert_eq!(
1981 *metric.as_metric(),
1982 Metric::new_with_metadata(
1983 "test.transform.distribution_histogram",
1984 MetricKind::Absolute,
1985 MetricValue::Distribution {
1986 samples: vec![
1987 Sample {
1988 value: 1.0,
1989 rate: 1
1990 },
1991 Sample {
1992 value: 2.0,
1993 rate: 2
1994 },
1995 ],
1996 statistic: StatisticKind::Histogram,
1997 },
1998 metric.metadata().clone(),
1999 )
2000 .with_namespace(Some("test_namespace"))
2001 .with_tags(Some(metric_tags!(
2002 "env" => "test_env",
2003 "host" => "localhost",
2004 )))
2005 .with_timestamp(Some(ts()))
2006 );
2007 }
2008
2009 #[tokio::test]
2010 async fn transform_distribution_summary() {
2011 let config = LogToMetricConfig {
2012 metrics: None,
2013 all_metrics: Some(true),
2014 };
2015
2016 let json_str = r#"{
2017 "distribution": {
2018 "samples": [
2019 {
2020 "value": 1.0,
2021 "rate": 1
2022 },
2023 {
2024 "value": 2.0,
2025 "rate": 2
2026 }
2027 ],
2028 "statistic": "summary"
2029 },
2030 "kind": "absolute",
2031 "name": "test.transform.distribution_summary",
2032 "tags": {
2033 "env": "test_env",
2034 "host": "localhost"
2035 }
2036 }"#;
2037 let log = create_log_event(json_str);
2038 let metric = do_transform(config, log.clone()).await.unwrap();
2039 assert_eq!(
2040 *metric.as_metric(),
2041 Metric::new_with_metadata(
2042 "test.transform.distribution_summary",
2043 MetricKind::Absolute,
2044 MetricValue::Distribution {
2045 samples: vec![
2046 Sample {
2047 value: 1.0,
2048 rate: 1
2049 },
2050 Sample {
2051 value: 2.0,
2052 rate: 2
2053 },
2054 ],
2055 statistic: StatisticKind::Summary,
2056 },
2057 metric.metadata().clone(),
2058 )
2059 .with_namespace(Some("test_namespace"))
2060 .with_tags(Some(metric_tags!(
2061 "env" => "test_env",
2062 "host" => "localhost",
2063 )))
2064 .with_timestamp(Some(ts()))
2065 );
2066 }
2067
2068 #[tokio::test]
2069 async fn transform_summary() {
2070 let config = LogToMetricConfig {
2071 metrics: None,
2072 all_metrics: Some(true),
2073 };
2074
2075 let json_str = r#"{
2076 "summary": {
2077 "sum": 100.0,
2078 "count": 7,
2079 "quantiles": [
2080 {
2081 "quantile": 0.05,
2082 "value": 10.0
2083 },
2084 {
2085 "quantile": 0.95,
2086 "value": 25.0
2087 }
2088 ]
2089 },
2090 "kind": "absolute",
2091 "name": "test.transform.histogram",
2092 "tags": {
2093 "env": "test_env",
2094 "host": "localhost"
2095 }
2096 }"#;
2097 let log = create_log_event(json_str);
2098 let metric = do_transform(config, log.clone()).await.unwrap();
2099 assert_eq!(
2100 *metric.as_metric(),
2101 Metric::new_with_metadata(
2102 "test.transform.histogram",
2103 MetricKind::Absolute,
2104 MetricValue::AggregatedSummary {
2105 quantiles: vec![
2106 Quantile {
2107 quantile: 0.05,
2108 value: 10.0,
2109 },
2110 Quantile {
2111 quantile: 0.95,
2112 value: 25.0,
2113 },
2114 ],
2115 count: 7,
2116 sum: 100.0,
2117 },
2118 metric.metadata().clone(),
2119 )
2120 .with_namespace(Some("test_namespace"))
2121 .with_tags(Some(metric_tags!(
2122 "env" => "test_env",
2123 "host" => "localhost",
2124 )))
2125 .with_timestamp(Some(ts()))
2126 );
2127 }
2128
2129 #[tokio::test]
2130 async fn transform_counter() {
2131 let config = LogToMetricConfig {
2132 metrics: None,
2133 all_metrics: Some(true),
2134 };
2135
2136 let json_str = r#"{
2137 "counter": {
2138 "value": 10.0
2139 },
2140 "kind": "incremental",
2141 "name": "test.transform.counter",
2142 "tags": {
2143 "env": "test_env",
2144 "host": "localhost"
2145 }
2146 }"#;
2147 let log = create_log_event(json_str);
2148 let metric = do_transform(config, log.clone()).await.unwrap();
2149 assert_eq!(
2150 *metric.as_metric(),
2151 Metric::new_with_metadata(
2152 "test.transform.counter",
2153 MetricKind::Incremental,
2154 MetricValue::Counter { value: 10.0 },
2155 metric.metadata().clone(),
2156 )
2157 .with_namespace(Some("test_namespace"))
2158 .with_tags(Some(metric_tags!(
2159 "env" => "test_env",
2160 "host" => "localhost",
2161 )))
2162 .with_timestamp(Some(ts()))
2163 );
2164 }
2165
2166 #[tokio::test]
2167 async fn transform_set() {
2168 let config = LogToMetricConfig {
2169 metrics: None,
2170 all_metrics: Some(true),
2171 };
2172
2173 let json_str = r#"{
2174 "set": {
2175 "values": ["990.0", "1234"]
2176 },
2177 "kind": "incremental",
2178 "name": "test.transform.set",
2179 "tags": {
2180 "env": "test_env",
2181 "host": "localhost"
2182 }
2183 }"#;
2184 let log = create_log_event(json_str);
2185 let metric = do_transform(config, log.clone()).await.unwrap();
2186 assert_eq!(
2187 *metric.as_metric(),
2188 Metric::new_with_metadata(
2189 "test.transform.set",
2190 MetricKind::Incremental,
2191 MetricValue::Set {
2192 values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2193 },
2194 metric.metadata().clone(),
2195 )
2196 .with_namespace(Some("test_namespace"))
2197 .with_tags(Some(metric_tags!(
2198 "env" => "test_env",
2199 "host" => "localhost",
2200 )))
2201 .with_timestamp(Some(ts()))
2202 );
2203 }
2204
2205 #[tokio::test]
2206 async fn transform_all_metrics_optional_namespace() {
2207 let config = LogToMetricConfig {
2208 metrics: None,
2209 all_metrics: Some(true),
2210 };
2211
2212 let json_str = r#"{
2213 "counter": {
2214 "value": 10.0
2215 },
2216 "kind": "incremental",
2217 "name": "test.transform.counter",
2218 "tags": {
2219 "env": "test_env",
2220 "host": "localhost"
2221 }
2222 }"#;
2223 let log = create_log_event_with_namespace(json_str, None);
2224 let metric = do_transform(config, log.clone()).await.unwrap();
2225 assert_eq!(
2226 *metric.as_metric(),
2227 Metric::new_with_metadata(
2228 "test.transform.counter",
2229 MetricKind::Incremental,
2230 MetricValue::Counter { value: 10.0 },
2231 metric.metadata().clone(),
2232 )
2233 .with_tags(Some(metric_tags!(
2234 "env" => "test_env",
2235 "host" => "localhost",
2236 )))
2237 .with_timestamp(Some(ts()))
2238 );
2239 }
2240}