1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::Utc;
4use serde_json::Value;
5use vector_lib::{
6 TimeZone,
7 codecs::MetricTagValues,
8 config::LogNamespace,
9 configurable::configurable_component,
10 lookup::{PathPrefix, event_path, owned_value_path, path},
11};
12use vrl::{
13 path::OwnedValuePath,
14 value::{Kind, kind::Collection},
15};
16
17use crate::{
18 config::{
19 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
20 TransformOutput, log_schema,
21 },
22 event::{self, Event, LogEvent, Metric},
23 internal_events::MetricToLogSerializeError,
24 schema::Definition,
25 transforms::{FunctionTransform, OutputBuffer, Transform},
26 types::Conversion,
27};
28
29#[configurable_component(transform("metric_to_log", "Convert metric events to log events."))]
31#[derive(Clone, Debug, Default)]
32#[serde(deny_unknown_fields)]
33pub struct MetricToLogConfig {
34 #[configurable(metadata(docs::examples = "host", docs::examples = "hostname"))]
41 pub host_tag: Option<String>,
42
43 pub timezone: Option<TimeZone>,
52
53 #[serde(default)]
55 #[configurable(metadata(docs::hidden))]
56 pub log_namespace: Option<bool>,
57
58 #[serde(default)]
66 pub metric_tag_values: MetricTagValues,
67}
68
69impl MetricToLogConfig {
70 pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
71 MetricToLog::new(
72 self.host_tag.as_deref(),
73 self.timezone.unwrap_or_else(|| context.globals.timezone()),
74 context.log_namespace(self.log_namespace),
75 self.metric_tag_values,
76 )
77 }
78}
79
80impl GenerateConfig for MetricToLogConfig {
81 fn generate_config() -> toml::Value {
82 toml::Value::try_from(Self {
83 host_tag: Some("host-tag".to_string()),
84 timezone: None,
85 log_namespace: None,
86 metric_tag_values: MetricTagValues::Single,
87 })
88 .unwrap()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "metric_to_log")]
94impl TransformConfig for MetricToLogConfig {
95 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
96 Ok(Transform::function(self.build_transform(context)))
97 }
98
99 fn input(&self) -> Input {
100 Input::metric()
101 }
102
103 fn outputs(
104 &self,
105 _: vector_lib::enrichment::TableRegistry,
106 input_definitions: &[(OutputId, Definition)],
107 global_log_namespace: LogNamespace,
108 ) -> Vec<TransformOutput> {
109 let log_namespace = global_log_namespace.merge(self.log_namespace);
110 let schema_definition = schema_definition(log_namespace);
111
112 vec![TransformOutput::new(
113 DataType::Log,
114 input_definitions
115 .iter()
116 .map(|(output, _)| (output.clone(), schema_definition.clone()))
117 .collect(),
118 )]
119 }
120
121 fn enable_concurrency(&self) -> bool {
122 true
123 }
124}
125
126fn schema_definition(log_namespace: LogNamespace) -> Definition {
127 let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace]))
128 .with_event_field(&owned_value_path!("name"), Kind::bytes(), None)
129 .with_event_field(
130 &owned_value_path!("namespace"),
131 Kind::bytes().or_undefined(),
132 None,
133 )
134 .with_event_field(
135 &owned_value_path!("tags"),
136 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
137 None,
138 )
139 .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None)
140 .with_event_field(
141 &owned_value_path!("counter"),
142 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
143 None,
144 )
145 .with_event_field(
146 &owned_value_path!("gauge"),
147 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
148 None,
149 )
150 .with_event_field(
151 &owned_value_path!("set"),
152 Kind::object(Collection::empty().with_known(
153 "values",
154 Kind::array(Collection::empty().with_unknown(Kind::bytes())),
155 ))
156 .or_undefined(),
157 None,
158 )
159 .with_event_field(
160 &owned_value_path!("distribution"),
161 Kind::object(
162 Collection::empty()
163 .with_known(
164 "samples",
165 Kind::array(
166 Collection::empty().with_unknown(Kind::object(
167 Collection::empty()
168 .with_known("value", Kind::float())
169 .with_known("rate", Kind::integer()),
170 )),
171 ),
172 )
173 .with_known("statistic", Kind::bytes()),
174 )
175 .or_undefined(),
176 None,
177 )
178 .with_event_field(
179 &owned_value_path!("aggregated_histogram"),
180 Kind::object(
181 Collection::empty()
182 .with_known(
183 "buckets",
184 Kind::array(
185 Collection::empty().with_unknown(Kind::object(
186 Collection::empty()
187 .with_known("upper_limit", Kind::float())
188 .with_known("count", Kind::integer()),
189 )),
190 ),
191 )
192 .with_known("count", Kind::integer())
193 .with_known("sum", Kind::float()),
194 )
195 .or_undefined(),
196 None,
197 )
198 .with_event_field(
199 &owned_value_path!("aggregated_summary"),
200 Kind::object(
201 Collection::empty()
202 .with_known(
203 "quantiles",
204 Kind::array(
205 Collection::empty().with_unknown(Kind::object(
206 Collection::empty()
207 .with_known("quantile", Kind::float())
208 .with_known("value", Kind::float()),
209 )),
210 ),
211 )
212 .with_known("count", Kind::integer())
213 .with_known("sum", Kind::float()),
214 )
215 .or_undefined(),
216 None,
217 )
218 .with_event_field(
219 &owned_value_path!("sketch"),
220 Kind::any().or_undefined(),
221 None,
222 );
223
224 match log_namespace {
225 LogNamespace::Vector => {
226 schema_definition = schema_definition.with_event_field(
228 &owned_value_path!("timestamp"),
229 Kind::bytes().or_undefined(),
230 None,
231 );
232
233 schema_definition = schema_definition.with_metadata_field(
236 &owned_value_path!("vector"),
237 Kind::object(Collection::empty()),
238 None,
239 );
240 }
241 LogNamespace::Legacy => {
242 if let Some(timestamp_key) = log_schema().timestamp_key() {
243 schema_definition =
244 schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None);
245 }
246
247 schema_definition = schema_definition.with_event_field(
248 log_schema().host_key().expect("valid host key"),
249 Kind::bytes().or_undefined(),
250 None,
251 );
252 }
253 }
254 schema_definition
255}
256
257#[derive(Clone, Debug)]
258pub struct MetricToLog {
259 host_tag: Option<OwnedValuePath>,
260 timezone: TimeZone,
261 log_namespace: LogNamespace,
262 tag_values: MetricTagValues,
263}
264
265impl MetricToLog {
266 pub fn new(
267 host_tag: Option<&str>,
268 timezone: TimeZone,
269 log_namespace: LogNamespace,
270 tag_values: MetricTagValues,
271 ) -> Self {
272 Self {
273 host_tag: host_tag.map_or(
274 log_schema().host_key().cloned().map(|mut key| {
275 key.push_front_field("tags");
276 key
277 }),
278 |host| Some(owned_value_path!("tags", host)),
279 ),
280 timezone,
281 log_namespace,
282 tag_values,
283 }
284 }
285
286 pub fn transform_one(&self, mut metric: Metric) -> Option<LogEvent> {
287 if self.tag_values == MetricTagValues::Single {
288 metric.reduce_tags_to_single();
289 }
290 serde_json::to_value(&metric)
291 .map_err(|error| emit!(MetricToLogSerializeError { error }))
292 .ok()
293 .and_then(|value| match value {
294 Value::Object(object) => {
295 let (_, _, metadata) = metric.into_parts();
296 let mut log = LogEvent::new_with_metadata(metadata);
297
298 for (key, value) in object {
300 log.insert(event_path!(&key), value);
301 }
302
303 if self.log_namespace == LogNamespace::Legacy {
304 let timestamp = log
307 .remove(event_path!("timestamp"))
308 .and_then(|value| {
309 Conversion::Timestamp(self.timezone)
310 .convert(value.coerce_to_bytes())
311 .ok()
312 })
313 .unwrap_or_else(|| event::Value::Timestamp(Utc::now()));
314
315 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
316
317 if let Some(host_tag) = &self.host_tag
318 && let Some(host_value) =
319 log.remove_prune((PathPrefix::Event, host_tag), true)
320 {
321 log.maybe_insert(log_schema().host_key_target_path(), host_value);
322 }
323 }
324 if self.log_namespace == LogNamespace::Vector {
325 log.insert(
328 (PathPrefix::Metadata, path!("vector")),
329 vrl::value::Value::Object(BTreeMap::new()),
330 );
331 }
332 Some(log)
333 }
334 _ => None,
335 })
336 }
337}
338
339impl FunctionTransform for MetricToLog {
340 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
341 let retval: Option<Event> = self
342 .transform_one(event.into_metric())
343 .map(|log| log.into());
344 output.extend(retval.into_iter())
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use std::sync::Arc;
351
352 use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
353 use futures::executor::block_on;
354 use proptest::prelude::*;
355 use similar_asserts::assert_eq;
356 use tokio::sync::mpsc;
357 use tokio_stream::wrappers::ReceiverStream;
358 use vector_lib::{config::ComponentKey, event::EventMetadata, metric_tags};
359
360 use super::*;
361 use crate::{
362 event::{
363 KeyString, Metric, Value,
364 metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet},
365 },
366 test_util::{components::assert_transform_compliance, random_string},
367 transforms::test::create_topology,
368 };
369
370 #[test]
371 fn generate_config() {
372 crate::test_util::test_generate_config::<MetricToLogConfig>();
373 }
374
375 async fn do_transform(metric: Metric) -> Option<LogEvent> {
376 assert_transform_compliance(async move {
377 let config = MetricToLogConfig {
378 host_tag: Some("host".into()),
379 timezone: None,
380 log_namespace: Some(false),
381 ..Default::default()
382 };
383 let (tx, rx) = mpsc::channel(1);
384 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
385
386 tx.send(metric.into()).await.unwrap();
387
388 let result = out.recv().await;
389
390 drop(tx);
391 topology.stop().await;
392 assert_eq!(out.recv().await, None);
393
394 result
395 })
396 .await
397 .map(|e| e.into_log())
398 }
399
400 fn ts() -> DateTime<Utc> {
401 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
402 .single()
403 .and_then(|t| t.with_nanosecond(11))
404 .expect("invalid timestamp")
405 }
406
407 fn tags() -> MetricTags {
408 metric_tags! {
409 "host" => "localhost",
410 "some_tag" => "some_value",
411 }
412 }
413
414 fn event_metadata() -> EventMetadata {
415 EventMetadata::default().with_source_type("unit_test_stream")
416 }
417
418 #[tokio::test]
419 async fn transform_counter() {
420 let counter = Metric::new_with_metadata(
421 "counter",
422 MetricKind::Absolute,
423 MetricValue::Counter { value: 1.0 },
424 event_metadata(),
425 )
426 .with_tags(Some(tags()))
427 .with_timestamp(Some(ts()));
428 let mut metadata = counter.metadata().clone();
429 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
430 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
431 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
432
433 let log = do_transform(counter).await.unwrap();
434 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
435
436 assert_eq!(
437 collected,
438 vec![
439 (KeyString::from("counter.value"), &Value::from(1.0)),
440 (KeyString::from("host"), &Value::from("localhost")),
441 (KeyString::from("kind"), &Value::from("absolute")),
442 (KeyString::from("name"), &Value::from("counter")),
443 (KeyString::from("tags.some_tag"), &Value::from("some_value")),
444 (KeyString::from("timestamp"), &Value::from(ts())),
445 ]
446 );
447 assert_eq!(log.metadata(), &metadata);
448 }
449
450 #[tokio::test]
451 async fn transform_gauge() {
452 let gauge = Metric::new_with_metadata(
453 "gauge",
454 MetricKind::Absolute,
455 MetricValue::Gauge { value: 1.0 },
456 event_metadata(),
457 )
458 .with_timestamp(Some(ts()));
459 let mut metadata = gauge.metadata().clone();
460 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
461 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
462 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
463
464 let log = do_transform(gauge).await.unwrap();
465 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
466
467 assert_eq!(
468 collected,
469 vec![
470 (KeyString::from("gauge.value"), &Value::from(1.0)),
471 (KeyString::from("kind"), &Value::from("absolute")),
472 (KeyString::from("name"), &Value::from("gauge")),
473 (KeyString::from("timestamp"), &Value::from(ts())),
474 ]
475 );
476 assert_eq!(log.metadata(), &metadata);
477 }
478
479 #[tokio::test]
480 async fn transform_set() {
481 let set = Metric::new_with_metadata(
482 "set",
483 MetricKind::Absolute,
484 MetricValue::Set {
485 values: vec!["one".into(), "two".into()].into_iter().collect(),
486 },
487 event_metadata(),
488 )
489 .with_timestamp(Some(ts()));
490 let mut metadata = set.metadata().clone();
491 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
492 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
493 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
494
495 let log = do_transform(set).await.unwrap();
496 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
497
498 assert_eq!(
499 collected,
500 vec![
501 (KeyString::from("kind"), &Value::from("absolute")),
502 (KeyString::from("name"), &Value::from("set")),
503 (KeyString::from("set.values[0]"), &Value::from("one")),
504 (KeyString::from("set.values[1]"), &Value::from("two")),
505 (KeyString::from("timestamp"), &Value::from(ts())),
506 ]
507 );
508 assert_eq!(log.metadata(), &metadata);
509 }
510
511 #[tokio::test]
512 async fn transform_distribution() {
513 let distro = Metric::new_with_metadata(
514 "distro",
515 MetricKind::Absolute,
516 MetricValue::Distribution {
517 samples: vector_lib::samples![1.0 => 10, 2.0 => 20],
518 statistic: StatisticKind::Histogram,
519 },
520 event_metadata(),
521 )
522 .with_timestamp(Some(ts()));
523 let mut metadata = distro.metadata().clone();
524 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
525 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
526 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
527
528 let log = do_transform(distro).await.unwrap();
529 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
530
531 assert_eq!(
532 collected,
533 vec![
534 (
535 KeyString::from("distribution.samples[0].rate"),
536 &Value::from(10)
537 ),
538 (
539 KeyString::from("distribution.samples[0].value"),
540 &Value::from(1.0)
541 ),
542 (
543 KeyString::from("distribution.samples[1].rate"),
544 &Value::from(20)
545 ),
546 (
547 KeyString::from("distribution.samples[1].value"),
548 &Value::from(2.0)
549 ),
550 (
551 KeyString::from("distribution.statistic"),
552 &Value::from("histogram")
553 ),
554 (KeyString::from("kind"), &Value::from("absolute")),
555 (KeyString::from("name"), &Value::from("distro")),
556 (KeyString::from("timestamp"), &Value::from(ts())),
557 ]
558 );
559 assert_eq!(log.metadata(), &metadata);
560 }
561
562 #[tokio::test]
563 async fn transform_histogram() {
564 let histo = Metric::new_with_metadata(
565 "histo",
566 MetricKind::Absolute,
567 MetricValue::AggregatedHistogram {
568 buckets: vector_lib::buckets![1.0 => 10, 2.0 => 20],
569 count: 30,
570 sum: 50.0,
571 },
572 event_metadata(),
573 )
574 .with_timestamp(Some(ts()));
575 let mut metadata = histo.metadata().clone();
576 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
577 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
578 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
579
580 let log = do_transform(histo).await.unwrap();
581 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
582
583 assert_eq!(
584 collected,
585 vec![
586 (
587 KeyString::from("aggregated_histogram.buckets[0].count"),
588 &Value::from(10)
589 ),
590 (
591 KeyString::from("aggregated_histogram.buckets[0].upper_limit"),
592 &Value::from(1.0)
593 ),
594 (
595 KeyString::from("aggregated_histogram.buckets[1].count"),
596 &Value::from(20)
597 ),
598 (
599 KeyString::from("aggregated_histogram.buckets[1].upper_limit"),
600 &Value::from(2.0)
601 ),
602 (
603 KeyString::from("aggregated_histogram.count"),
604 &Value::from(30)
605 ),
606 (
607 KeyString::from("aggregated_histogram.sum"),
608 &Value::from(50.0)
609 ),
610 (KeyString::from("kind"), &Value::from("absolute")),
611 (KeyString::from("name"), &Value::from("histo")),
612 (KeyString::from("timestamp"), &Value::from(ts())),
613 ]
614 );
615 assert_eq!(log.metadata(), &metadata);
616 }
617
618 #[tokio::test]
619 async fn transform_summary() {
620 let summary = Metric::new_with_metadata(
621 "summary",
622 MetricKind::Absolute,
623 MetricValue::AggregatedSummary {
624 quantiles: vector_lib::quantiles![50.0 => 10.0, 90.0 => 20.0],
625 count: 30,
626 sum: 50.0,
627 },
628 event_metadata(),
629 )
630 .with_timestamp(Some(ts()));
631 let mut metadata = summary.metadata().clone();
632 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
633 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
634 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
635
636 let log = do_transform(summary).await.unwrap();
637 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
638
639 assert_eq!(
640 collected,
641 vec![
642 (
643 KeyString::from("aggregated_summary.count"),
644 &Value::from(30)
645 ),
646 (
647 KeyString::from("aggregated_summary.quantiles[0].quantile"),
648 &Value::from(50.0)
649 ),
650 (
651 KeyString::from("aggregated_summary.quantiles[0].value"),
652 &Value::from(10.0)
653 ),
654 (
655 KeyString::from("aggregated_summary.quantiles[1].quantile"),
656 &Value::from(90.0)
657 ),
658 (
659 KeyString::from("aggregated_summary.quantiles[1].value"),
660 &Value::from(20.0)
661 ),
662 (
663 KeyString::from("aggregated_summary.sum"),
664 &Value::from(50.0)
665 ),
666 (KeyString::from("kind"), &Value::from("absolute")),
667 (KeyString::from("name"), &Value::from("summary")),
668 (KeyString::from("timestamp"), &Value::from(ts())),
669 ]
670 );
671 assert_eq!(log.metadata(), &metadata);
672 }
673
674 proptest! {
676 #[test]
677 fn transform_tag_single_encoding(values: TagValueSet) {
678 let name = random_string(16);
679 let tags = block_on(transform_tags(
680 MetricTagValues::Single,
681 values.iter()
682 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
683 .collect(),
684 ));
685 let value = values.into_single().map(|value| Value::Bytes(value.into()));
687 assert_eq!(tags.get(&*name), value.as_ref());
688 }
689
690 #[test]
691 fn transform_tag_full_encoding(values: TagValueSet) {
692 let name = random_string(16);
693 let tags = block_on(transform_tags(
694 MetricTagValues::Full,
695 values.iter()
696 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
697 .collect(),
698 ));
699 let tag = tags.get(&*name);
700 match values.len() {
701 0 => assert_eq!(tag, None),
703 1 => assert_eq!(tag, Some(&tag_to_value(values.into_iter().next().unwrap()))),
705 _ => assert_eq!(tag, Some(&Value::Array(values.into_iter().map(tag_to_value).collect()))),
707 }
708 }
709 }
710
711 fn tag_to_value(tag: TagValue) -> Value {
712 tag.into_option().into()
713 }
714
715 async fn transform_tags(metric_tag_values: MetricTagValues, tags: MetricTags) -> Value {
716 let counter = Metric::new(
717 "counter",
718 MetricKind::Absolute,
719 MetricValue::Counter { value: 1.0 },
720 )
721 .with_tags(Some(tags))
722 .with_timestamp(Some(ts()));
723
724 let mut output = OutputBuffer::with_capacity(1);
725
726 MetricToLogConfig {
727 metric_tag_values,
728 ..Default::default()
729 }
730 .build_transform(&TransformContext::default())
731 .transform(&mut output, counter.into());
732
733 assert_eq!(output.len(), 1);
734 output.into_events().next().unwrap().into_log()["tags"].clone()
735 }
736}