1use std::{collections::HashMap, num::ParseFloatError, sync::Arc};
2
3use chrono::Utc;
4use indexmap::IndexMap;
5use vector_lib::{
6 configurable::configurable_component,
7 event::{
8 DatadogMetricOriginMetadata, LogEvent,
9 metric::{Bucket, Quantile, Sample},
10 },
11};
12use vrl::{
13 event_path, path,
14 path::{PathParseError, parse_target_path},
15};
16
17use crate::{
18 common::expansion::pair_expansion,
19 config::{
20 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
21 TransformOutput, schema::Definition,
22 },
23 event::{
24 Event, Value,
25 metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind, TagValue},
26 },
27 internal_events::{
28 DROP_EVENT, LogToMetricFieldNullError, LogToMetricParseFloatError,
29 MetricMetadataInvalidFieldValueError, MetricMetadataMetricDetailsNotFoundError,
30 MetricMetadataParseError, ParserMissingFieldError,
31 },
32 schema,
33 template::{Template, TemplateRenderingError},
34 transforms::{
35 FunctionTransform, OutputBuffer, Transform, log_to_metric::TransformError::PathNotFound,
36 },
37};
38
39const ORIGIN_SERVICE_VALUE: u32 = 3;
40
41#[configurable_component(transform("log_to_metric", "Convert log events to metric events."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct LogToMetricConfig {
46 pub metrics: Option<Vec<MetricConfig>>,
48
49 pub all_metrics: Option<bool>,
77}
78
79#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct CounterConfig {
83 #[serde(default = "default_increment_by_value")]
85 pub increment_by_value: bool,
86
87 #[configurable(derived)]
88 #[serde(default = "default_kind")]
89 pub kind: MetricKind,
90}
91
92#[configurable_component]
99#[derive(Clone, Debug)]
100pub struct MetricConfig {
101 pub field: Template,
103
104 pub name: Option<Template>,
108
109 pub namespace: Option<Template>,
111
112 #[configurable(metadata(docs::additional_props_description = "A metric tag."))]
117 pub tags: Option<IndexMap<Template, TagConfig>>,
118
119 #[configurable(derived)]
120 #[serde(flatten)]
121 pub metric: MetricTypeConfig,
122}
123
124#[configurable_component]
128#[derive(Clone, Debug)]
129#[serde(untagged)]
130pub enum TagConfig {
131 Plain(Option<Template>),
133
134 Multi(Vec<Option<Template>>),
136}
137
138#[configurable_component]
140#[derive(Clone, Debug)]
141#[serde(tag = "type", rename_all = "snake_case")]
142#[configurable(metadata(docs::enum_tag_description = "The type of metric to create."))]
143pub enum MetricTypeConfig {
144 Counter(CounterConfig),
146
147 Histogram,
149
150 Gauge,
152
153 Set,
155
156 Summary,
158}
159
160impl MetricConfig {
161 fn field(&self) -> &str {
162 self.field.get_ref()
163 }
164}
165
166const fn default_increment_by_value() -> bool {
167 false
168}
169
170const fn default_kind() -> MetricKind {
171 MetricKind::Incremental
172}
173
174#[derive(Debug, Clone)]
175pub struct LogToMetric {
176 pub metrics: Vec<MetricConfig>,
177 pub all_metrics: bool,
178}
179
180impl GenerateConfig for LogToMetricConfig {
181 fn generate_config() -> toml::Value {
182 toml::Value::try_from(Self {
183 metrics: Some(vec![MetricConfig {
184 field: "field_name".try_into().expect("Fixed template"),
185 name: None,
186 namespace: None,
187 tags: None,
188 metric: MetricTypeConfig::Counter(CounterConfig {
189 increment_by_value: false,
190 kind: MetricKind::Incremental,
191 }),
192 }]),
193 all_metrics: Some(true),
194 })
195 .unwrap()
196 }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "log_to_metric")]
201impl TransformConfig for LogToMetricConfig {
202 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
203 Ok(Transform::function(LogToMetric {
204 metrics: self.metrics.clone().unwrap_or_default(),
205 all_metrics: self.all_metrics.unwrap_or_default(),
206 }))
207 }
208
209 fn input(&self) -> Input {
210 Input::log()
211 }
212
213 fn outputs(
214 &self,
215 _: &TransformContext,
216 _: &[(OutputId, schema::Definition)],
217 ) -> Vec<TransformOutput> {
218 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
220 }
221
222 fn enable_concurrency(&self) -> bool {
223 true
224 }
225}
226
227#[configurable_component]
229#[derive(Clone, Debug)]
230pub enum TransformParseErrorKind {
231 FloatError,
233 IntError,
235 ArrayError,
237}
238
239impl std::fmt::Display for TransformParseErrorKind {
240 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
241 write!(f, "{self:?}")
242 }
243}
244
245enum TransformError {
246 PathNotFound {
247 path: String,
248 },
249 PathNull {
250 path: String,
251 },
252 MetricDetailsNotFound,
253 MetricValueError {
254 path: String,
255 path_value: String,
256 },
257 ParseError {
258 path: String,
259 kind: TransformParseErrorKind,
260 },
261 ParseFloatError {
262 path: String,
263 error: ParseFloatError,
264 },
265 TemplateRenderingError(TemplateRenderingError),
266 PairExpansionError {
267 key: String,
268 value: String,
269 error: serde_json::Error,
270 },
271}
272
273fn render_template(template: &Template, event: &Event) -> Result<String, TransformError> {
274 template
275 .render_string(event)
276 .map_err(TransformError::TemplateRenderingError)
277}
278
279fn render_tags(
280 tags: &Option<IndexMap<Template, TagConfig>>,
281 event: &Event,
282) -> Result<Option<MetricTags>, TransformError> {
283 let mut static_tags: HashMap<String, String> = HashMap::new();
284 let mut dynamic_tags: HashMap<String, String> = HashMap::new();
285 Ok(match tags {
286 None => None,
287 Some(tags) => {
288 let mut result = MetricTags::default();
289 for (name, config) in tags {
290 match config {
291 TagConfig::Plain(template) => {
292 render_tag_into(
293 event,
294 name,
295 template.as_ref(),
296 &mut result,
297 &mut static_tags,
298 &mut dynamic_tags,
299 )?;
300 }
301 TagConfig::Multi(vec) => {
302 for template in vec {
303 render_tag_into(
304 event,
305 name,
306 template.as_ref(),
307 &mut result,
308 &mut static_tags,
309 &mut dynamic_tags,
310 )?;
311 }
312 }
313 }
314 }
315 for (k, v) in static_tags {
316 if let Some(discarded_v) = dynamic_tags.insert(k.clone(), v.clone()) {
317 warn!(
318 "Static tags overrides dynamic tags. \
319 key: {}, value: {:?}, discarded value: {:?}",
320 k, v, discarded_v
321 );
322 };
323 }
324 result.as_option()
325 }
326 })
327}
328
329fn render_tag_into(
330 event: &Event,
331 key_template: &Template,
332 value_template: Option<&Template>,
333 result: &mut MetricTags,
334 static_tags: &mut HashMap<String, String>,
335 dynamic_tags: &mut HashMap<String, String>,
336) -> Result<(), TransformError> {
337 let key = match render_template(key_template, event) {
338 Ok(key_s) => key_s,
339 Err(TransformError::TemplateRenderingError(err)) => {
340 emit!(crate::internal_events::TemplateRenderingError {
341 error: err,
342 drop_event: false,
343 field: Some(key_template.get_ref()),
344 });
345 return Ok(());
346 }
347 Err(err) => return Err(err),
348 };
349 match value_template {
350 None => {
351 result.insert(key, TagValue::Bare);
352 }
353 Some(template) => match render_template(template, event) {
354 Ok(value) => {
355 let expanded_pairs = pair_expansion(&key, &value, static_tags, dynamic_tags)
356 .map_err(|error| TransformError::PairExpansionError { key, value, error })?;
357 result.extend(expanded_pairs);
358 }
359 Err(TransformError::TemplateRenderingError(value_error)) => {
360 emit!(crate::internal_events::TemplateRenderingError {
361 error: value_error,
362 drop_event: false,
363 field: Some(template.get_ref()),
364 });
365 return Ok(());
366 }
367 Err(other) => return Err(other),
368 },
369 };
370 Ok(())
371}
372
373fn to_metric_with_config(config: &MetricConfig, event: &Event) -> Result<Metric, TransformError> {
374 let log = event.as_log();
375
376 let timestamp = log
377 .get_timestamp()
378 .and_then(Value::as_timestamp)
379 .cloned()
380 .or_else(|| Some(Utc::now()));
381
382 let metadata = event
384 .metadata()
385 .clone()
386 .with_schema_definition(&Arc::new(Definition::any()))
387 .with_origin_metadata(DatadogMetricOriginMetadata::new(
388 None,
389 None,
390 Some(ORIGIN_SERVICE_VALUE),
391 ));
392
393 let field = parse_target_path(config.field()).map_err(|_e| PathNotFound {
394 path: config.field().to_string(),
395 })?;
396
397 let value = match log.get(&field) {
398 None => Err(TransformError::PathNotFound {
399 path: field.to_string(),
400 }),
401 Some(Value::Null) => Err(TransformError::PathNull {
402 path: field.to_string(),
403 }),
404 Some(value) => Ok(value),
405 }?;
406
407 let name = config.name.as_ref().unwrap_or(&config.field);
408 let name = render_template(name, event)?;
409
410 let namespace = config.namespace.as_ref();
411 let namespace = namespace
412 .map(|namespace| render_template(namespace, event))
413 .transpose()?;
414
415 let tags = render_tags(&config.tags, event)?;
416
417 let (kind, value) = match &config.metric {
418 MetricTypeConfig::Counter(counter) => {
419 let value = if counter.increment_by_value {
420 value.to_string_lossy().parse().map_err(|error| {
421 TransformError::ParseFloatError {
422 path: config.field.get_ref().to_owned(),
423 error,
424 }
425 })?
426 } else {
427 1.0
428 };
429
430 (counter.kind, MetricValue::Counter { value })
431 }
432 MetricTypeConfig::Histogram => {
433 let value = value.to_string_lossy().parse().map_err(|error| {
434 TransformError::ParseFloatError {
435 path: field.to_string(),
436 error,
437 }
438 })?;
439
440 (
441 MetricKind::Incremental,
442 MetricValue::Distribution {
443 samples: vector_lib::samples![value => 1],
444 statistic: StatisticKind::Histogram,
445 },
446 )
447 }
448 MetricTypeConfig::Summary => {
449 let value = value.to_string_lossy().parse().map_err(|error| {
450 TransformError::ParseFloatError {
451 path: field.to_string(),
452 error,
453 }
454 })?;
455
456 (
457 MetricKind::Incremental,
458 MetricValue::Distribution {
459 samples: vector_lib::samples![value => 1],
460 statistic: StatisticKind::Summary,
461 },
462 )
463 }
464 MetricTypeConfig::Gauge => {
465 let value = value.to_string_lossy().parse().map_err(|error| {
466 TransformError::ParseFloatError {
467 path: field.to_string(),
468 error,
469 }
470 })?;
471
472 (MetricKind::Absolute, MetricValue::Gauge { value })
473 }
474 MetricTypeConfig::Set => {
475 let value = value.to_string_lossy().into_owned();
476
477 (
478 MetricKind::Incremental,
479 MetricValue::Set {
480 values: std::iter::once(value).collect(),
481 },
482 )
483 }
484 };
485 Ok(Metric::new_with_metadata(name, kind, value, metadata)
486 .with_namespace(namespace)
487 .with_tags(tags)
488 .with_timestamp(timestamp))
489}
490
491fn bytes_to_str(value: &Value) -> Option<String> {
492 match value {
493 Value::Bytes(bytes) => std::str::from_utf8(bytes).ok().map(|s| s.to_string()),
494 _ => None,
495 }
496}
497
498fn try_get_string_from_log(log: &LogEvent, path: &str) -> Result<Option<String>, TransformError> {
499 let maybe_value = log.parse_path_and_get_value(path).map_err(|e| match e {
501 PathParseError::InvalidPathSyntax { path } => PathNotFound {
502 path: path.to_string(),
503 },
504 })?;
505 match maybe_value {
506 None => Err(PathNotFound {
507 path: path.to_string(),
508 }),
509 Some(v) => Ok(bytes_to_str(v)),
510 }
511}
512
513fn get_counter_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
514 let counter_value = log
515 .get(event_path!("counter", "value"))
516 .ok_or_else(|| TransformError::PathNotFound {
517 path: "counter.value".to_string(),
518 })?
519 .as_float()
520 .ok_or_else(|| TransformError::ParseError {
521 path: "counter.value".to_string(),
522 kind: TransformParseErrorKind::FloatError,
523 })?;
524
525 Ok(MetricValue::Counter {
526 value: *counter_value,
527 })
528}
529
530fn get_gauge_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
531 let gauge_value = log
532 .get(event_path!("gauge", "value"))
533 .ok_or_else(|| TransformError::PathNotFound {
534 path: "gauge.value".to_string(),
535 })?
536 .as_float()
537 .ok_or_else(|| TransformError::ParseError {
538 path: "gauge.value".to_string(),
539 kind: TransformParseErrorKind::FloatError,
540 })?;
541 Ok(MetricValue::Gauge {
542 value: *gauge_value,
543 })
544}
545
546fn get_set_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
547 let set_values = log
548 .get(event_path!("set", "values"))
549 .ok_or_else(|| TransformError::PathNotFound {
550 path: "set.values".to_string(),
551 })?
552 .as_array()
553 .ok_or_else(|| TransformError::ParseError {
554 path: "set.values".to_string(),
555 kind: TransformParseErrorKind::ArrayError,
556 })?;
557
558 let mut values: Vec<String> = Vec::new();
559 for e_value in set_values {
560 let value = e_value
561 .as_bytes()
562 .ok_or_else(|| TransformError::ParseError {
563 path: "set.values".to_string(),
564 kind: TransformParseErrorKind::ArrayError,
565 })?;
566 values.push(String::from_utf8_lossy(value).to_string());
567 }
568
569 Ok(MetricValue::Set {
570 values: values.into_iter().collect(),
571 })
572}
573
574fn get_distribution_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
575 let event_samples = log
576 .get(event_path!("distribution", "samples"))
577 .ok_or_else(|| TransformError::PathNotFound {
578 path: "distribution.samples".to_string(),
579 })?
580 .as_array()
581 .ok_or_else(|| TransformError::ParseError {
582 path: "distribution.samples".to_string(),
583 kind: TransformParseErrorKind::ArrayError,
584 })?;
585
586 let mut samples: Vec<Sample> = Vec::new();
587 for e_sample in event_samples {
588 let value = e_sample
589 .get(path!("value"))
590 .ok_or_else(|| TransformError::PathNotFound {
591 path: "value".to_string(),
592 })?
593 .as_float()
594 .ok_or_else(|| TransformError::ParseError {
595 path: "value".to_string(),
596 kind: TransformParseErrorKind::FloatError,
597 })?;
598
599 let rate = e_sample
600 .get(path!("rate"))
601 .ok_or_else(|| TransformError::PathNotFound {
602 path: "rate".to_string(),
603 })?
604 .as_integer()
605 .ok_or_else(|| TransformError::ParseError {
606 path: "rate".to_string(),
607 kind: TransformParseErrorKind::IntError,
608 })?;
609
610 samples.push(Sample {
611 value: *value,
612 rate: rate as u32,
613 });
614 }
615
616 let statistic_str = match try_get_string_from_log(log, "distribution.statistic")? {
617 Some(n) => n,
618 None => {
619 return Err(TransformError::PathNotFound {
620 path: "distribution.statistic".to_string(),
621 });
622 }
623 };
624 let statistic_kind = match statistic_str.as_str() {
625 "histogram" => Ok(StatisticKind::Histogram),
626 "summary" => Ok(StatisticKind::Summary),
627 _ => Err(TransformError::MetricValueError {
628 path: "distribution.statistic".to_string(),
629 path_value: statistic_str.to_string(),
630 }),
631 }?;
632
633 Ok(MetricValue::Distribution {
634 samples,
635 statistic: statistic_kind,
636 })
637}
638
639fn get_histogram_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
640 let event_buckets = log
641 .get(event_path!("histogram", "buckets"))
642 .ok_or_else(|| TransformError::PathNotFound {
643 path: "histogram.buckets".to_string(),
644 })?
645 .as_array()
646 .ok_or_else(|| TransformError::ParseError {
647 path: "histogram.buckets".to_string(),
648 kind: TransformParseErrorKind::ArrayError,
649 })?;
650
651 let mut buckets: Vec<Bucket> = Vec::new();
652 for e_bucket in event_buckets {
653 let upper_limit = e_bucket
654 .get(path!("upper_limit"))
655 .ok_or_else(|| TransformError::PathNotFound {
656 path: "histogram.buckets.upper_limit".to_string(),
657 })?
658 .as_float()
659 .ok_or_else(|| TransformError::ParseError {
660 path: "histogram.buckets.upper_limit".to_string(),
661 kind: TransformParseErrorKind::FloatError,
662 })?;
663
664 let count = e_bucket
665 .get(path!("count"))
666 .ok_or_else(|| TransformError::PathNotFound {
667 path: "histogram.buckets.count".to_string(),
668 })?
669 .as_integer()
670 .ok_or_else(|| TransformError::ParseError {
671 path: "histogram.buckets.count".to_string(),
672 kind: TransformParseErrorKind::IntError,
673 })?;
674
675 buckets.push(Bucket {
676 upper_limit: *upper_limit,
677 count: count as u64,
678 });
679 }
680
681 let count = log
682 .get(event_path!("histogram", "count"))
683 .ok_or_else(|| TransformError::PathNotFound {
684 path: "histogram.count".to_string(),
685 })?
686 .as_integer()
687 .ok_or_else(|| TransformError::ParseError {
688 path: "histogram.count".to_string(),
689 kind: TransformParseErrorKind::IntError,
690 })?;
691
692 let sum = log
693 .get(event_path!("histogram", "sum"))
694 .ok_or_else(|| TransformError::PathNotFound {
695 path: "histogram.sum".to_string(),
696 })?
697 .as_float()
698 .ok_or_else(|| TransformError::ParseError {
699 path: "histogram.sum".to_string(),
700 kind: TransformParseErrorKind::FloatError,
701 })?;
702
703 Ok(MetricValue::AggregatedHistogram {
704 buckets,
705 count: count as u64,
706 sum: *sum,
707 })
708}
709
710fn get_summary_value(log: &LogEvent) -> Result<MetricValue, TransformError> {
711 let event_quantiles = log
712 .get(event_path!("summary", "quantiles"))
713 .ok_or_else(|| TransformError::PathNotFound {
714 path: "summary.quantiles".to_string(),
715 })?
716 .as_array()
717 .ok_or_else(|| TransformError::ParseError {
718 path: "summary.quantiles".to_string(),
719 kind: TransformParseErrorKind::ArrayError,
720 })?;
721
722 let mut quantiles: Vec<Quantile> = Vec::new();
723 for e_quantile in event_quantiles {
724 let quantile = e_quantile
725 .get(path!("quantile"))
726 .ok_or_else(|| TransformError::PathNotFound {
727 path: "summary.quantiles.quantile".to_string(),
728 })?
729 .as_float()
730 .ok_or_else(|| TransformError::ParseError {
731 path: "summary.quantiles.quantile".to_string(),
732 kind: TransformParseErrorKind::FloatError,
733 })?;
734
735 let value = e_quantile
736 .get(path!("value"))
737 .ok_or_else(|| TransformError::PathNotFound {
738 path: "summary.quantiles.value".to_string(),
739 })?
740 .as_float()
741 .ok_or_else(|| TransformError::ParseError {
742 path: "summary.quantiles.value".to_string(),
743 kind: TransformParseErrorKind::FloatError,
744 })?;
745
746 quantiles.push(Quantile {
747 quantile: *quantile,
748 value: *value,
749 })
750 }
751
752 let count = log
753 .get(event_path!("summary", "count"))
754 .ok_or_else(|| TransformError::PathNotFound {
755 path: "summary.count".to_string(),
756 })?
757 .as_integer()
758 .ok_or_else(|| TransformError::ParseError {
759 path: "summary.count".to_string(),
760 kind: TransformParseErrorKind::IntError,
761 })?;
762
763 let sum = log
764 .get(event_path!("summary", "sum"))
765 .ok_or_else(|| TransformError::PathNotFound {
766 path: "summary.sum".to_string(),
767 })?
768 .as_float()
769 .ok_or_else(|| TransformError::ParseError {
770 path: "summary.sum".to_string(),
771 kind: TransformParseErrorKind::FloatError,
772 })?;
773
774 Ok(MetricValue::AggregatedSummary {
775 quantiles,
776 count: count as u64,
777 sum: *sum,
778 })
779}
780
781fn to_metrics(event: &Event) -> Result<Metric, TransformError> {
782 let log = event.as_log();
783 let timestamp = log
784 .get_timestamp()
785 .and_then(Value::as_timestamp)
786 .cloned()
787 .or_else(|| Some(Utc::now()));
788
789 let name = match try_get_string_from_log(log, "name")? {
790 Some(n) => n,
791 None => {
792 return Err(TransformError::PathNotFound {
793 path: "name".to_string(),
794 });
795 }
796 };
797
798 let mut tags = MetricTags::default();
799
800 if let Some(els) = log.get(event_path!("tags"))
801 && let Some(el) = els.as_object()
802 {
803 for (key, value) in el {
804 tags.insert(key.to_string(), bytes_to_str(value));
805 }
806 }
807 let tags_result = Some(tags);
808
809 let kind_str = match try_get_string_from_log(log, "kind")? {
810 Some(n) => n,
811 None => {
812 return Err(TransformError::PathNotFound {
813 path: "kind".to_string(),
814 });
815 }
816 };
817
818 let kind = match kind_str.as_str() {
819 "absolute" => Ok(MetricKind::Absolute),
820 "incremental" => Ok(MetricKind::Incremental),
821 value => Err(TransformError::MetricValueError {
822 path: "kind".to_string(),
823 path_value: value.to_string(),
824 }),
825 }?;
826
827 let mut value: Option<MetricValue> = None;
828 if let Some(root_event) = log.as_map() {
829 for key in root_event.keys() {
830 value = match key.as_str() {
831 "gauge" => Some(get_gauge_value(log)?),
832 "distribution" => Some(get_distribution_value(log)?),
833 "histogram" => Some(get_histogram_value(log)?),
834 "summary" => Some(get_summary_value(log)?),
835 "counter" => Some(get_counter_value(log)?),
836 "set" => Some(get_set_value(log)?),
837 _ => None,
838 };
839
840 if value.is_some() {
841 break;
842 }
843 }
844 }
845
846 let value = value.ok_or(TransformError::MetricDetailsNotFound)?;
847
848 let mut metric = Metric::new_with_metadata(name, kind, value, log.metadata().clone())
849 .with_tags(tags_result)
850 .with_timestamp(timestamp);
851
852 if let Ok(namespace) = try_get_string_from_log(log, "namespace") {
853 metric = metric.with_namespace(namespace);
854 }
855
856 Ok(metric)
857}
858
859impl FunctionTransform for LogToMetric {
860 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
861 let mut buffer = Vec::with_capacity(self.metrics.len());
863 if self.all_metrics {
864 match to_metrics(&event) {
865 Ok(metric) => {
866 output.push(Event::Metric(metric));
867 }
868 Err(err) => {
869 match err {
870 TransformError::MetricValueError { path, path_value } => {
871 emit!(MetricMetadataInvalidFieldValueError {
872 field: path.as_ref(),
873 field_value: path_value.as_ref()
874 })
875 }
876 TransformError::PathNotFound { path } => {
877 emit!(ParserMissingFieldError::<DROP_EVENT> {
878 field: path.as_ref()
879 })
880 }
881 TransformError::ParseError { path, kind } => {
882 emit!(MetricMetadataParseError {
883 field: path.as_ref(),
884 kind: &kind.to_string(),
885 })
886 }
887 TransformError::MetricDetailsNotFound => {
888 emit!(MetricMetadataMetricDetailsNotFoundError {})
889 }
890 TransformError::PairExpansionError { key, value, error } => {
891 emit!(crate::internal_events::PairExpansionError {
892 key: &key,
893 value: &value,
894 drop_event: true,
895 error
896 })
897 }
898 _ => {}
899 };
900 }
901 }
902 } else {
903 for config in self.metrics.iter() {
904 match to_metric_with_config(config, &event) {
905 Ok(metric) => {
906 buffer.push(Event::Metric(metric));
907 }
908 Err(err) => {
909 match err {
910 TransformError::PathNull { path } => {
911 emit!(LogToMetricFieldNullError {
912 field: path.as_ref()
913 })
914 }
915 TransformError::PathNotFound { path } => {
916 emit!(ParserMissingFieldError::<DROP_EVENT> {
917 field: path.as_ref()
918 })
919 }
920 TransformError::ParseFloatError { path, error } => {
921 emit!(LogToMetricParseFloatError {
922 field: path.as_ref(),
923 error
924 })
925 }
926 TransformError::TemplateRenderingError(error) => {
927 emit!(crate::internal_events::TemplateRenderingError {
928 error,
929 drop_event: true,
930 field: None,
931 })
932 }
933 TransformError::PairExpansionError { key, value, error } => {
934 emit!(crate::internal_events::PairExpansionError {
935 key: &key,
936 value: &value,
937 drop_event: true,
938 error
939 })
940 }
941 _ => {}
942 };
943 return;
945 }
946 }
947 }
948 }
949
950 for event in buffer {
952 output.push(event);
953 }
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use std::{sync::Arc, time::Duration};
960
961 use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
962 use tokio::sync::mpsc;
963 use tokio_stream::wrappers::ReceiverStream;
964 use vector_lib::{
965 config::ComponentKey,
966 event::{EventMetadata, ObjectMap},
967 metric_tags,
968 };
969
970 use super::*;
971 use crate::{
972 config::log_schema,
973 event::{
974 Event, LogEvent,
975 metric::{Metric, MetricKind, MetricValue, StatisticKind},
976 },
977 test_util::components::assert_transform_compliance,
978 transforms::test::create_topology,
979 };
980
981 #[test]
982 fn generate_config() {
983 crate::test_util::test_generate_config::<LogToMetricConfig>();
984 }
985
986 fn parse_config(s: &str) -> LogToMetricConfig {
987 toml::from_str(s).unwrap()
988 }
989
990 fn parse_yaml_config(s: &str) -> LogToMetricConfig {
991 serde_yaml::from_str(s).unwrap()
992 }
993
994 fn ts() -> DateTime<Utc> {
995 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
996 .single()
997 .and_then(|t| t.with_nanosecond(11))
998 .expect("invalid timestamp")
999 }
1000
1001 fn create_event(key: &str, value: impl Into<Value> + std::fmt::Debug) -> Event {
1002 let mut log = Event::Log(LogEvent::from("i am a log"));
1003 log.as_mut_log().insert(key, value);
1004 log.as_mut_log()
1005 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1006 log
1007 }
1008
1009 async fn do_transform(config: LogToMetricConfig, event: Event) -> Option<Event> {
1010 assert_transform_compliance(async move {
1011 let (tx, rx) = mpsc::channel(1);
1012 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1013 tx.send(event).await.unwrap();
1014 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1015 .await
1016 .unwrap_or(None);
1017 drop(tx);
1018 topology.stop().await;
1019 assert_eq!(out.recv().await, None);
1020 result
1021 })
1022 .await
1023 }
1024
1025 async fn do_transform_multiple_events(
1026 config: LogToMetricConfig,
1027 event: Event,
1028 count: usize,
1029 ) -> Vec<Event> {
1030 assert_transform_compliance(async move {
1031 let (tx, rx) = mpsc::channel(1);
1032 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1033 tx.send(event).await.unwrap();
1034
1035 let mut results = vec![];
1036 for _ in 0..count {
1037 let result = tokio::time::timeout(Duration::from_secs(5), out.recv())
1038 .await
1039 .unwrap_or(None);
1040 if let Some(event) = result {
1041 results.push(event);
1042 }
1043 }
1044
1045 drop(tx);
1046 topology.stop().await;
1047 assert_eq!(out.recv().await, None);
1048 results
1049 })
1050 .await
1051 }
1052
1053 #[tokio::test]
1054 async fn count_http_status_codes() {
1055 let config = parse_config(
1056 r#"
1057 [[metrics]]
1058 type = "counter"
1059 field = "status"
1060 "#,
1061 );
1062
1063 let event = create_event("status", "42");
1064 let mut metadata =
1065 event
1066 .metadata()
1067 .clone()
1068 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1069 None,
1070 None,
1071 Some(ORIGIN_SERVICE_VALUE),
1072 ));
1073 metadata.set_schema_definition(&Arc::new(Definition::any()));
1075 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1076 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1077 let metric = do_transform(config, event).await.unwrap();
1078
1079 assert_eq!(
1080 metric.into_metric(),
1081 Metric::new_with_metadata(
1082 "status",
1083 MetricKind::Incremental,
1084 MetricValue::Counter { value: 1.0 },
1085 metadata,
1086 )
1087 .with_timestamp(Some(ts()))
1088 );
1089 }
1090
1091 #[tokio::test]
1092 async fn count_http_requests_with_tags() {
1093 let config = parse_config(
1094 r#"
1095 [[metrics]]
1096 type = "counter"
1097 field = "message"
1098 name = "http_requests_total"
1099 namespace = "app"
1100 tags = {method = "{{method}}", code = "{{code}}", missing_tag = "{{unknown}}", host = "localhost"}
1101 "#,
1102 );
1103
1104 let mut event = create_event("message", "i am log");
1105 event.as_mut_log().insert("method", "post");
1106 event.as_mut_log().insert("code", "200");
1107 let mut metadata =
1108 event
1109 .metadata()
1110 .clone()
1111 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1112 None,
1113 None,
1114 Some(ORIGIN_SERVICE_VALUE),
1115 ));
1116 metadata.set_schema_definition(&Arc::new(Definition::any()));
1118 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1119 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1120
1121 let metric = do_transform(config, event).await.unwrap();
1122
1123 assert_eq!(
1124 metric.into_metric(),
1125 Metric::new_with_metadata(
1126 "http_requests_total",
1127 MetricKind::Incremental,
1128 MetricValue::Counter { value: 1.0 },
1129 metadata,
1130 )
1131 .with_namespace(Some("app"))
1132 .with_tags(Some(metric_tags!(
1133 "method" => "post",
1134 "code" => "200",
1135 "host" => "localhost",
1136 )))
1137 .with_timestamp(Some(ts()))
1138 );
1139 }
1140
1141 #[tokio::test]
1142 async fn count_http_requests_with_tags_expansion() {
1143 let config = parse_config(
1144 r#"
1145 [[metrics]]
1146 type = "counter"
1147 field = "message"
1148 name = "http_requests_total"
1149 namespace = "app"
1150 tags = {"*" = "{{ dict }}"}
1151 "#,
1152 );
1153
1154 let mut event = create_event("message", "i am log");
1155 let log = event.as_mut_log();
1156
1157 let mut test_dict = ObjectMap::default();
1158 test_dict.insert("one".into(), Value::from("foo"));
1159 test_dict.insert("two".into(), Value::from("baz"));
1160 log.insert("dict", Value::from(test_dict));
1161
1162 let mut metadata =
1163 event
1164 .metadata()
1165 .clone()
1166 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1167 None,
1168 None,
1169 Some(ORIGIN_SERVICE_VALUE),
1170 ));
1171 metadata.set_schema_definition(&Arc::new(Definition::any()));
1173 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1174 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1175
1176 let metric = do_transform(config, event).await.unwrap();
1177
1178 assert_eq!(
1179 metric.into_metric(),
1180 Metric::new_with_metadata(
1181 "http_requests_total",
1182 MetricKind::Incremental,
1183 MetricValue::Counter { value: 1.0 },
1184 metadata,
1185 )
1186 .with_namespace(Some("app"))
1187 .with_tags(Some(metric_tags!(
1188 "one" => "foo",
1189 "two" => "baz",
1190 )))
1191 .with_timestamp(Some(ts()))
1192 );
1193 }
1194 #[tokio::test]
1195 async fn count_http_requests_with_colliding_dynamic_tags() {
1196 let config = parse_config(
1197 r#"
1198 [[metrics]]
1199 type = "counter"
1200 field = "message"
1201 name = "http_requests_total"
1202 namespace = "app"
1203 tags = {"l1_*" = "{{ map1 }}", "*" = "{{ map2 }}"}
1204 "#,
1205 );
1206
1207 let mut event = create_event("message", "i am log");
1208 let log = event.as_mut_log();
1209
1210 let mut map1 = ObjectMap::default();
1211 map1.insert("key1".into(), Value::from("val1"));
1212 log.insert("map1", Value::from(map1));
1213
1214 let mut map2 = ObjectMap::default();
1215 map2.insert("l1_key1".into(), Value::from("val2"));
1216 log.insert("map2", Value::from(map2));
1217
1218 let mut metadata =
1219 event
1220 .metadata()
1221 .clone()
1222 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1223 None,
1224 None,
1225 Some(ORIGIN_SERVICE_VALUE),
1226 ));
1227 metadata.set_schema_definition(&Arc::new(Definition::any()));
1229 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1230 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1231
1232 let metric = do_transform(config, event).await.unwrap().into_metric();
1233 let tags = metric.tags().expect("Metric should have tags");
1234
1235 assert_eq!(tags.iter_single().collect::<Vec<_>>()[0].0, "l1_key1");
1236
1237 assert_eq!(tags.iter_all().count(), 2);
1238 for (name, value) in tags.iter_all() {
1239 assert_eq!(name, "l1_key1");
1240 assert!(value == Some("val1") || value == Some("val2"));
1241 }
1242 }
1243 #[tokio::test]
1244 async fn multi_value_tags_yaml() {
1245 let config = parse_yaml_config(
1247 r#"
1248 metrics:
1249 - field: "message"
1250 type: "counter"
1251 tags:
1252 tag:
1253 - "one"
1254 - null
1255 - "two"
1256 "#,
1257 );
1258
1259 let event = create_event("message", "I am log");
1260 let metric = do_transform(config, event).await.unwrap().into_metric();
1261 let tags = metric.tags().expect("Metric should have tags");
1262
1263 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1264
1265 assert_eq!(tags.iter_all().count(), 3);
1266 for (name, value) in tags.iter_all() {
1267 assert_eq!(name, "tag");
1268 assert!(value.is_none() || value == Some("one") || value == Some("two"));
1269 }
1270 }
1271 #[tokio::test]
1272 async fn multi_value_tags_expansion_yaml() {
1273 let config = parse_yaml_config(
1275 r#"
1276 metrics:
1277 - field: "message"
1278 type: "counter"
1279 tags:
1280 "*": "{{dict}}"
1281 "#,
1282 );
1283
1284 let mut event = create_event("message", "I am log");
1285 let log = event.as_mut_log();
1286
1287 let mut test_dict = ObjectMap::default();
1288 test_dict.insert("one".into(), Value::from(vec!["foo", "baz"]));
1289 log.insert("dict", Value::from(test_dict));
1290
1291 let metric = do_transform(config, event).await.unwrap().into_metric();
1292 let tags = metric.tags().expect("Metric should have tags");
1293
1294 assert_eq!(
1295 tags.iter_single().collect::<Vec<_>>(),
1296 vec![("one", "[\"foo\",\"baz\"]")]
1297 );
1298
1299 assert_eq!(tags.iter_all().count(), 1);
1300 for (name, value) in tags.iter_all() {
1301 assert_eq!(name, "one");
1302 assert_eq!(value, Some("[\"foo\",\"baz\"]"));
1303 }
1304 }
1305
1306 #[tokio::test]
1307 async fn multi_value_tags_toml() {
1308 let config = parse_config(
1309 r#"
1310 [[metrics]]
1311 field = "message"
1312 type = "counter"
1313 [metrics.tags]
1314 tag = ["one", "two"]
1315 "#,
1316 );
1317
1318 let event = create_event("message", "I am log");
1319 let metric = do_transform(config, event).await.unwrap().into_metric();
1320 let tags = metric.tags().expect("Metric should have tags");
1321
1322 assert_eq!(tags.iter_single().collect::<Vec<_>>(), vec![("tag", "two")]);
1323
1324 assert_eq!(tags.iter_all().count(), 2);
1325 for (name, value) in tags.iter_all() {
1326 assert_eq!(name, "tag");
1327 assert!(value == Some("one") || value == Some("two"));
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn count_exceptions() {
1333 let config = parse_config(
1334 r#"
1335 [[metrics]]
1336 type = "counter"
1337 field = "backtrace"
1338 name = "exception_total"
1339 "#,
1340 );
1341
1342 let event = create_event("backtrace", "message");
1343 let mut metadata =
1344 event
1345 .metadata()
1346 .clone()
1347 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1348 None,
1349 None,
1350 Some(ORIGIN_SERVICE_VALUE),
1351 ));
1352 metadata.set_schema_definition(&Arc::new(Definition::any()));
1354 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1355 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1356
1357 let metric = do_transform(config, event).await.unwrap();
1358
1359 assert_eq!(
1360 metric.into_metric(),
1361 Metric::new_with_metadata(
1362 "exception_total",
1363 MetricKind::Incremental,
1364 MetricValue::Counter { value: 1.0 },
1365 metadata
1366 )
1367 .with_timestamp(Some(ts()))
1368 );
1369 }
1370
1371 #[tokio::test]
1372 async fn count_exceptions_no_match() {
1373 let config = parse_config(
1374 r#"
1375 [[metrics]]
1376 type = "counter"
1377 field = "backtrace"
1378 name = "exception_total"
1379 "#,
1380 );
1381
1382 let event = create_event("success", "42");
1383 assert_eq!(do_transform(config, event).await, None);
1384 }
1385
1386 #[tokio::test]
1387 async fn sum_order_amounts() {
1388 let config = parse_config(
1389 r#"
1390 [[metrics]]
1391 type = "counter"
1392 field = "amount"
1393 name = "amount_total"
1394 increment_by_value = true
1395 "#,
1396 );
1397
1398 let event = create_event("amount", "33.99");
1399 let mut metadata =
1400 event
1401 .metadata()
1402 .clone()
1403 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1404 None,
1405 None,
1406 Some(ORIGIN_SERVICE_VALUE),
1407 ));
1408 metadata.set_schema_definition(&Arc::new(Definition::any()));
1410 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1411 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1412 let metric = do_transform(config, event).await.unwrap();
1413
1414 assert_eq!(
1415 metric.into_metric(),
1416 Metric::new_with_metadata(
1417 "amount_total",
1418 MetricKind::Incremental,
1419 MetricValue::Counter { value: 33.99 },
1420 metadata,
1421 )
1422 .with_timestamp(Some(ts()))
1423 );
1424 }
1425
1426 #[tokio::test]
1427 async fn count_absolute() {
1428 let config = parse_config(
1429 r#"
1430 [[metrics]]
1431 type = "counter"
1432 field = "amount"
1433 name = "amount_total"
1434 increment_by_value = true
1435 kind = "absolute"
1436 "#,
1437 );
1438
1439 let event = create_event("amount", "33.99");
1440 let mut metadata =
1441 event
1442 .metadata()
1443 .clone()
1444 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1445 None,
1446 None,
1447 Some(ORIGIN_SERVICE_VALUE),
1448 ));
1449 metadata.set_schema_definition(&Arc::new(Definition::any()));
1451 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1452 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1453
1454 let metric = do_transform(config, event).await.unwrap();
1455
1456 assert_eq!(
1457 metric.into_metric(),
1458 Metric::new_with_metadata(
1459 "amount_total",
1460 MetricKind::Absolute,
1461 MetricValue::Counter { value: 33.99 },
1462 metadata,
1463 )
1464 .with_timestamp(Some(ts()))
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn memory_usage_gauge() {
1470 let config = parse_config(
1471 r#"
1472 [[metrics]]
1473 type = "gauge"
1474 field = "memory_rss"
1475 name = "memory_rss_bytes"
1476 "#,
1477 );
1478
1479 let event = create_event("memory_rss", "123");
1480 let mut metadata =
1481 event
1482 .metadata()
1483 .clone()
1484 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1485 None,
1486 None,
1487 Some(ORIGIN_SERVICE_VALUE),
1488 ));
1489
1490 metadata.set_schema_definition(&Arc::new(Definition::any()));
1492
1493 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1494 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1495
1496 let metric = do_transform(config, event).await.unwrap();
1497
1498 assert_eq!(
1499 metric.into_metric(),
1500 Metric::new_with_metadata(
1501 "memory_rss_bytes",
1502 MetricKind::Absolute,
1503 MetricValue::Gauge { value: 123.0 },
1504 metadata,
1505 )
1506 .with_timestamp(Some(ts()))
1507 );
1508 }
1509
1510 #[tokio::test]
1511 async fn parse_failure() {
1512 let config = parse_config(
1513 r#"
1514 [[metrics]]
1515 type = "counter"
1516 field = "status"
1517 name = "status_total"
1518 increment_by_value = true
1519 "#,
1520 );
1521
1522 let event = create_event("status", "not a number");
1523 assert_eq!(do_transform(config, event).await, None);
1524 }
1525
1526 #[tokio::test]
1527 async fn missing_field() {
1528 let config = parse_config(
1529 r#"
1530 [[metrics]]
1531 type = "counter"
1532 field = "status"
1533 name = "status_total"
1534 "#,
1535 );
1536
1537 let event = create_event("not foo", "not a number");
1538 assert_eq!(do_transform(config, event).await, None);
1539 }
1540
1541 #[tokio::test]
1542 async fn null_field() {
1543 let config = parse_config(
1544 r#"
1545 [[metrics]]
1546 type = "counter"
1547 field = "status"
1548 name = "status_total"
1549 "#,
1550 );
1551
1552 let event = create_event("status", Value::Null);
1553 assert_eq!(do_transform(config, event).await, None);
1554 }
1555
1556 #[tokio::test]
1557 async fn multiple_metrics() {
1558 let config = parse_config(
1559 r#"
1560 [[metrics]]
1561 type = "counter"
1562 field = "status"
1563
1564 [[metrics]]
1565 type = "counter"
1566 field = "backtrace"
1567 name = "exception_total"
1568 "#,
1569 );
1570
1571 let mut event = Event::Log(LogEvent::from("i am a log"));
1572 event
1573 .as_mut_log()
1574 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1575 event.as_mut_log().insert("status", "42");
1576 event.as_mut_log().insert("backtrace", "message");
1577 let mut metadata =
1578 event
1579 .metadata()
1580 .clone()
1581 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1582 None,
1583 None,
1584 Some(ORIGIN_SERVICE_VALUE),
1585 ));
1586
1587 metadata.set_schema_definition(&Arc::new(Definition::any()));
1589 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1590 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1591
1592 let output = do_transform_multiple_events(config, event, 2).await;
1593
1594 assert_eq!(2, output.len());
1595 assert_eq!(
1596 output[0].clone().into_metric(),
1597 Metric::new_with_metadata(
1598 "status",
1599 MetricKind::Incremental,
1600 MetricValue::Counter { value: 1.0 },
1601 metadata.clone(),
1602 )
1603 .with_timestamp(Some(ts()))
1604 );
1605 assert_eq!(
1606 output[1].clone().into_metric(),
1607 Metric::new_with_metadata(
1608 "exception_total",
1609 MetricKind::Incremental,
1610 MetricValue::Counter { value: 1.0 },
1611 metadata,
1612 )
1613 .with_timestamp(Some(ts()))
1614 );
1615 }
1616
1617 #[tokio::test]
1618 async fn multiple_metrics_with_multiple_templates() {
1619 let config = parse_config(
1620 r#"
1621 [[metrics]]
1622 type = "set"
1623 field = "status"
1624 name = "{{host}}_{{worker}}_status_set"
1625
1626 [[metrics]]
1627 type = "counter"
1628 field = "backtrace"
1629 name = "{{service}}_exception_total"
1630 namespace = "{{host}}"
1631 "#,
1632 );
1633
1634 let mut event = Event::Log(LogEvent::from("i am a log"));
1635 event
1636 .as_mut_log()
1637 .insert(log_schema().timestamp_key_target_path().unwrap(), ts());
1638 event.as_mut_log().insert("status", "42");
1639 event.as_mut_log().insert("backtrace", "message");
1640 event.as_mut_log().insert("host", "local");
1641 event.as_mut_log().insert("worker", "abc");
1642 event.as_mut_log().insert("service", "xyz");
1643 let mut metadata =
1644 event
1645 .metadata()
1646 .clone()
1647 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1648 None,
1649 None,
1650 Some(ORIGIN_SERVICE_VALUE),
1651 ));
1652
1653 metadata.set_schema_definition(&Arc::new(Definition::any()));
1655 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1656 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1657
1658 let output = do_transform_multiple_events(config, event, 2).await;
1659
1660 assert_eq!(2, output.len());
1661 assert_eq!(
1662 output[0].as_metric(),
1663 &Metric::new_with_metadata(
1664 "local_abc_status_set",
1665 MetricKind::Incremental,
1666 MetricValue::Set {
1667 values: vec!["42".into()].into_iter().collect()
1668 },
1669 metadata.clone(),
1670 )
1671 .with_timestamp(Some(ts()))
1672 );
1673 assert_eq!(
1674 output[1].as_metric(),
1675 &Metric::new_with_metadata(
1676 "xyz_exception_total",
1677 MetricKind::Incremental,
1678 MetricValue::Counter { value: 1.0 },
1679 metadata,
1680 )
1681 .with_namespace(Some("local"))
1682 .with_timestamp(Some(ts()))
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn user_ip_set() {
1688 let config = parse_config(
1689 r#"
1690 [[metrics]]
1691 type = "set"
1692 field = "user_ip"
1693 name = "unique_user_ip"
1694 "#,
1695 );
1696
1697 let event = create_event("user_ip", "1.2.3.4");
1698 let mut metadata =
1699 event
1700 .metadata()
1701 .clone()
1702 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1703 None,
1704 None,
1705 Some(ORIGIN_SERVICE_VALUE),
1706 ));
1707 metadata.set_schema_definition(&Arc::new(Definition::any()));
1709 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1710 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1711
1712 let metric = do_transform(config, event).await.unwrap();
1713
1714 assert_eq!(
1715 metric.into_metric(),
1716 Metric::new_with_metadata(
1717 "unique_user_ip",
1718 MetricKind::Incremental,
1719 MetricValue::Set {
1720 values: vec!["1.2.3.4".into()].into_iter().collect()
1721 },
1722 metadata,
1723 )
1724 .with_timestamp(Some(ts()))
1725 );
1726 }
1727
1728 #[tokio::test]
1729 async fn response_time_histogram() {
1730 let config = parse_config(
1731 r#"
1732 [[metrics]]
1733 type = "histogram"
1734 field = "response_time"
1735 "#,
1736 );
1737
1738 let event = create_event("response_time", "2.5");
1739 let mut metadata =
1740 event
1741 .metadata()
1742 .clone()
1743 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1744 None,
1745 None,
1746 Some(ORIGIN_SERVICE_VALUE),
1747 ));
1748
1749 metadata.set_schema_definition(&Arc::new(Definition::any()));
1751 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1752 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1753
1754 let metric = do_transform(config, event).await.unwrap();
1755
1756 assert_eq!(
1757 metric.into_metric(),
1758 Metric::new_with_metadata(
1759 "response_time",
1760 MetricKind::Incremental,
1761 MetricValue::Distribution {
1762 samples: vector_lib::samples![2.5 => 1],
1763 statistic: StatisticKind::Histogram
1764 },
1765 metadata
1766 )
1767 .with_timestamp(Some(ts()))
1768 );
1769 }
1770
1771 #[tokio::test]
1772 async fn response_time_summary() {
1773 let config = parse_config(
1774 r#"
1775 [[metrics]]
1776 type = "summary"
1777 field = "response_time"
1778 "#,
1779 );
1780
1781 let event = create_event("response_time", "2.5");
1782 let mut metadata =
1783 event
1784 .metadata()
1785 .clone()
1786 .with_origin_metadata(DatadogMetricOriginMetadata::new(
1787 None,
1788 None,
1789 Some(ORIGIN_SERVICE_VALUE),
1790 ));
1791
1792 metadata.set_schema_definition(&Arc::new(Definition::any()));
1794 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1795 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1796
1797 let metric = do_transform(config, event).await.unwrap();
1798
1799 assert_eq!(
1800 metric.into_metric(),
1801 Metric::new_with_metadata(
1802 "response_time",
1803 MetricKind::Incremental,
1804 MetricValue::Distribution {
1805 samples: vector_lib::samples![2.5 => 1],
1806 statistic: StatisticKind::Summary
1807 },
1808 metadata
1809 )
1810 .with_timestamp(Some(ts()))
1811 );
1812 }
1813
1814 fn create_log_event(json_str: &str) -> Event {
1817 create_log_event_with_namespace(json_str, Some("test_namespace"))
1818 }
1819
1820 fn create_log_event_with_namespace(json_str: &str, namespace: Option<&str>) -> Event {
1821 let mut log_value: Value =
1822 serde_json::from_str(json_str).expect("JSON was not well-formatted");
1823 log_value.insert("timestamp", ts());
1824
1825 if let Some(namespace) = namespace {
1826 log_value.insert("namespace", namespace);
1827 }
1828
1829 let mut metadata = EventMetadata::default();
1830 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
1831 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
1832
1833 Event::Log(LogEvent::from_parts(log_value, metadata.clone()))
1834 }
1835
1836 #[tokio::test]
1837 async fn transform_gauge() {
1838 let config = LogToMetricConfig {
1839 metrics: None,
1840 all_metrics: Some(true),
1841 };
1842
1843 let json_str = r#"{
1844 "gauge": {
1845 "value": 990.0
1846 },
1847 "kind": "absolute",
1848 "name": "test.transform.gauge",
1849 "tags": {
1850 "env": "test_env",
1851 "host": "localhost"
1852 }
1853 }"#;
1854 let log = create_log_event(json_str);
1855 let metric = do_transform(config, log.clone()).await.unwrap();
1856 assert_eq!(
1857 *metric.as_metric(),
1858 Metric::new_with_metadata(
1859 "test.transform.gauge",
1860 MetricKind::Absolute,
1861 MetricValue::Gauge { value: 990.0 },
1862 metric.metadata().clone(),
1863 )
1864 .with_namespace(Some("test_namespace"))
1865 .with_tags(Some(metric_tags!(
1866 "env" => "test_env",
1867 "host" => "localhost",
1868 )))
1869 .with_timestamp(Some(ts()))
1870 );
1871 }
1872
1873 #[tokio::test]
1874 async fn transform_histogram() {
1875 let config = LogToMetricConfig {
1876 metrics: None,
1877 all_metrics: Some(true),
1878 };
1879
1880 let json_str = r#"{
1881 "histogram": {
1882 "sum": 18.0,
1883 "count": 5,
1884 "buckets": [
1885 {
1886 "upper_limit": 1.0,
1887 "count": 1
1888 },
1889 {
1890 "upper_limit": 2.0,
1891 "count": 2
1892 },
1893 {
1894 "upper_limit": 5.0,
1895 "count": 1
1896 },
1897 {
1898 "upper_limit": 10.0,
1899 "count": 1
1900 }
1901 ]
1902 },
1903 "kind": "absolute",
1904 "name": "test.transform.histogram",
1905 "tags": {
1906 "env": "test_env",
1907 "host": "localhost"
1908 }
1909 }"#;
1910 let log = create_log_event(json_str);
1911 let metric = do_transform(config, log.clone()).await.unwrap();
1912 assert_eq!(
1913 *metric.as_metric(),
1914 Metric::new_with_metadata(
1915 "test.transform.histogram",
1916 MetricKind::Absolute,
1917 MetricValue::AggregatedHistogram {
1918 count: 5,
1919 sum: 18.0,
1920 buckets: vec![
1921 Bucket {
1922 upper_limit: 1.0,
1923 count: 1,
1924 },
1925 Bucket {
1926 upper_limit: 2.0,
1927 count: 2,
1928 },
1929 Bucket {
1930 upper_limit: 5.0,
1931 count: 1,
1932 },
1933 Bucket {
1934 upper_limit: 10.0,
1935 count: 1,
1936 },
1937 ],
1938 },
1939 metric.metadata().clone(),
1940 )
1941 .with_namespace(Some("test_namespace"))
1942 .with_tags(Some(metric_tags!(
1943 "env" => "test_env",
1944 "host" => "localhost",
1945 )))
1946 .with_timestamp(Some(ts()))
1947 );
1948 }
1949
1950 #[tokio::test]
1951 async fn transform_distribution_histogram() {
1952 let config = LogToMetricConfig {
1953 metrics: None,
1954 all_metrics: Some(true),
1955 };
1956
1957 let json_str = r#"{
1958 "distribution": {
1959 "samples": [
1960 {
1961 "value": 1.0,
1962 "rate": 1
1963 },
1964 {
1965 "value": 2.0,
1966 "rate": 2
1967 }
1968 ],
1969 "statistic": "histogram"
1970 },
1971 "kind": "absolute",
1972 "name": "test.transform.distribution_histogram",
1973 "tags": {
1974 "env": "test_env",
1975 "host": "localhost"
1976 }
1977 }"#;
1978 let log = create_log_event(json_str);
1979 let metric = do_transform(config, log.clone()).await.unwrap();
1980 assert_eq!(
1981 *metric.as_metric(),
1982 Metric::new_with_metadata(
1983 "test.transform.distribution_histogram",
1984 MetricKind::Absolute,
1985 MetricValue::Distribution {
1986 samples: vec![
1987 Sample {
1988 value: 1.0,
1989 rate: 1
1990 },
1991 Sample {
1992 value: 2.0,
1993 rate: 2
1994 },
1995 ],
1996 statistic: StatisticKind::Histogram,
1997 },
1998 metric.metadata().clone(),
1999 )
2000 .with_namespace(Some("test_namespace"))
2001 .with_tags(Some(metric_tags!(
2002 "env" => "test_env",
2003 "host" => "localhost",
2004 )))
2005 .with_timestamp(Some(ts()))
2006 );
2007 }
2008
2009 #[tokio::test]
2010 async fn transform_distribution_summary() {
2011 let config = LogToMetricConfig {
2012 metrics: None,
2013 all_metrics: Some(true),
2014 };
2015
2016 let json_str = r#"{
2017 "distribution": {
2018 "samples": [
2019 {
2020 "value": 1.0,
2021 "rate": 1
2022 },
2023 {
2024 "value": 2.0,
2025 "rate": 2
2026 }
2027 ],
2028 "statistic": "summary"
2029 },
2030 "kind": "absolute",
2031 "name": "test.transform.distribution_summary",
2032 "tags": {
2033 "env": "test_env",
2034 "host": "localhost"
2035 }
2036 }"#;
2037 let log = create_log_event(json_str);
2038 let metric = do_transform(config, log.clone()).await.unwrap();
2039 assert_eq!(
2040 *metric.as_metric(),
2041 Metric::new_with_metadata(
2042 "test.transform.distribution_summary",
2043 MetricKind::Absolute,
2044 MetricValue::Distribution {
2045 samples: vec![
2046 Sample {
2047 value: 1.0,
2048 rate: 1
2049 },
2050 Sample {
2051 value: 2.0,
2052 rate: 2
2053 },
2054 ],
2055 statistic: StatisticKind::Summary,
2056 },
2057 metric.metadata().clone(),
2058 )
2059 .with_namespace(Some("test_namespace"))
2060 .with_tags(Some(metric_tags!(
2061 "env" => "test_env",
2062 "host" => "localhost",
2063 )))
2064 .with_timestamp(Some(ts()))
2065 );
2066 }
2067
2068 #[tokio::test]
2069 async fn transform_summary() {
2070 let config = LogToMetricConfig {
2071 metrics: None,
2072 all_metrics: Some(true),
2073 };
2074
2075 let json_str = r#"{
2076 "summary": {
2077 "sum": 100.0,
2078 "count": 7,
2079 "quantiles": [
2080 {
2081 "quantile": 0.05,
2082 "value": 10.0
2083 },
2084 {
2085 "quantile": 0.95,
2086 "value": 25.0
2087 }
2088 ]
2089 },
2090 "kind": "absolute",
2091 "name": "test.transform.histogram",
2092 "tags": {
2093 "env": "test_env",
2094 "host": "localhost"
2095 }
2096 }"#;
2097 let log = create_log_event(json_str);
2098 let metric = do_transform(config, log.clone()).await.unwrap();
2099 assert_eq!(
2100 *metric.as_metric(),
2101 Metric::new_with_metadata(
2102 "test.transform.histogram",
2103 MetricKind::Absolute,
2104 MetricValue::AggregatedSummary {
2105 quantiles: vec![
2106 Quantile {
2107 quantile: 0.05,
2108 value: 10.0,
2109 },
2110 Quantile {
2111 quantile: 0.95,
2112 value: 25.0,
2113 },
2114 ],
2115 count: 7,
2116 sum: 100.0,
2117 },
2118 metric.metadata().clone(),
2119 )
2120 .with_namespace(Some("test_namespace"))
2121 .with_tags(Some(metric_tags!(
2122 "env" => "test_env",
2123 "host" => "localhost",
2124 )))
2125 .with_timestamp(Some(ts()))
2126 );
2127 }
2128
2129 #[tokio::test]
2130 async fn transform_counter() {
2131 let config = LogToMetricConfig {
2132 metrics: None,
2133 all_metrics: Some(true),
2134 };
2135
2136 let json_str = r#"{
2137 "counter": {
2138 "value": 10.0
2139 },
2140 "kind": "incremental",
2141 "name": "test.transform.counter",
2142 "tags": {
2143 "env": "test_env",
2144 "host": "localhost"
2145 }
2146 }"#;
2147 let log = create_log_event(json_str);
2148 let metric = do_transform(config, log.clone()).await.unwrap();
2149 assert_eq!(
2150 *metric.as_metric(),
2151 Metric::new_with_metadata(
2152 "test.transform.counter",
2153 MetricKind::Incremental,
2154 MetricValue::Counter { value: 10.0 },
2155 metric.metadata().clone(),
2156 )
2157 .with_namespace(Some("test_namespace"))
2158 .with_tags(Some(metric_tags!(
2159 "env" => "test_env",
2160 "host" => "localhost",
2161 )))
2162 .with_timestamp(Some(ts()))
2163 );
2164 }
2165
2166 #[tokio::test]
2167 async fn transform_set() {
2168 let config = LogToMetricConfig {
2169 metrics: None,
2170 all_metrics: Some(true),
2171 };
2172
2173 let json_str = r#"{
2174 "set": {
2175 "values": ["990.0", "1234"]
2176 },
2177 "kind": "incremental",
2178 "name": "test.transform.set",
2179 "tags": {
2180 "env": "test_env",
2181 "host": "localhost"
2182 }
2183 }"#;
2184 let log = create_log_event(json_str);
2185 let metric = do_transform(config, log.clone()).await.unwrap();
2186 assert_eq!(
2187 *metric.as_metric(),
2188 Metric::new_with_metadata(
2189 "test.transform.set",
2190 MetricKind::Incremental,
2191 MetricValue::Set {
2192 values: vec!["990.0".into(), "1234".into()].into_iter().collect()
2193 },
2194 metric.metadata().clone(),
2195 )
2196 .with_namespace(Some("test_namespace"))
2197 .with_tags(Some(metric_tags!(
2198 "env" => "test_env",
2199 "host" => "localhost",
2200 )))
2201 .with_timestamp(Some(ts()))
2202 );
2203 }
2204
2205 #[tokio::test]
2206 async fn transform_all_metrics_optional_namespace() {
2207 let config = LogToMetricConfig {
2208 metrics: None,
2209 all_metrics: Some(true),
2210 };
2211
2212 let json_str = r#"{
2213 "counter": {
2214 "value": 10.0
2215 },
2216 "kind": "incremental",
2217 "name": "test.transform.counter",
2218 "tags": {
2219 "env": "test_env",
2220 "host": "localhost"
2221 }
2222 }"#;
2223 let log = create_log_event_with_namespace(json_str, None);
2224 let metric = do_transform(config, log.clone()).await.unwrap();
2225 assert_eq!(
2226 *metric.as_metric(),
2227 Metric::new_with_metadata(
2228 "test.transform.counter",
2229 MetricKind::Incremental,
2230 MetricValue::Counter { value: 10.0 },
2231 metric.metadata().clone(),
2232 )
2233 .with_tags(Some(metric_tags!(
2234 "env" => "test_env",
2235 "host" => "localhost",
2236 )))
2237 .with_timestamp(Some(ts()))
2238 );
2239 }
2240}