1use chrono::Utc;
2use serde_json::Value;
3use std::collections::{BTreeMap, BTreeSet};
4use vector_lib::codecs::MetricTagValues;
5use vector_lib::config::LogNamespace;
6use vector_lib::configurable::configurable_component;
7use vector_lib::lookup::{event_path, owned_value_path, path, PathPrefix};
8use vector_lib::TimeZone;
9use vrl::path::OwnedValuePath;
10use vrl::value::kind::Collection;
11use vrl::value::Kind;
12
13use crate::config::OutputId;
14use crate::{
15 config::{
16 log_schema, DataType, GenerateConfig, Input, TransformConfig, TransformContext,
17 TransformOutput,
18 },
19 event::{self, Event, LogEvent, Metric},
20 internal_events::MetricToLogSerializeError,
21 schema::Definition,
22 transforms::{FunctionTransform, OutputBuffer, Transform},
23 types::Conversion,
24};
25
26#[configurable_component(transform("metric_to_log", "Convert metric events to log events."))]
28#[derive(Clone, Debug, Default)]
29#[serde(deny_unknown_fields)]
30pub struct MetricToLogConfig {
31 #[configurable(metadata(docs::examples = "host", docs::examples = "hostname"))]
38 pub host_tag: Option<String>,
39
40 pub timezone: Option<TimeZone>,
49
50 #[serde(default)]
52 #[configurable(metadata(docs::hidden))]
53 pub log_namespace: Option<bool>,
54
55 #[serde(default)]
63 pub metric_tag_values: MetricTagValues,
64}
65
66impl MetricToLogConfig {
67 pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
68 MetricToLog::new(
69 self.host_tag.as_deref(),
70 self.timezone.unwrap_or_else(|| context.globals.timezone()),
71 context.log_namespace(self.log_namespace),
72 self.metric_tag_values,
73 )
74 }
75}
76
77impl GenerateConfig for MetricToLogConfig {
78 fn generate_config() -> toml::Value {
79 toml::Value::try_from(Self {
80 host_tag: Some("host-tag".to_string()),
81 timezone: None,
82 log_namespace: None,
83 metric_tag_values: MetricTagValues::Single,
84 })
85 .unwrap()
86 }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "metric_to_log")]
91impl TransformConfig for MetricToLogConfig {
92 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
93 Ok(Transform::function(self.build_transform(context)))
94 }
95
96 fn input(&self) -> Input {
97 Input::metric()
98 }
99
100 fn outputs(
101 &self,
102 _: vector_lib::enrichment::TableRegistry,
103 input_definitions: &[(OutputId, Definition)],
104 global_log_namespace: LogNamespace,
105 ) -> Vec<TransformOutput> {
106 let log_namespace = global_log_namespace.merge(self.log_namespace);
107 let schema_definition = schema_definition(log_namespace);
108
109 vec![TransformOutput::new(
110 DataType::Log,
111 input_definitions
112 .iter()
113 .map(|(output, _)| (output.clone(), schema_definition.clone()))
114 .collect(),
115 )]
116 }
117
118 fn enable_concurrency(&self) -> bool {
119 true
120 }
121}
122
123fn schema_definition(log_namespace: LogNamespace) -> Definition {
124 let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace]))
125 .with_event_field(&owned_value_path!("name"), Kind::bytes(), None)
126 .with_event_field(
127 &owned_value_path!("namespace"),
128 Kind::bytes().or_undefined(),
129 None,
130 )
131 .with_event_field(
132 &owned_value_path!("tags"),
133 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
134 None,
135 )
136 .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None)
137 .with_event_field(
138 &owned_value_path!("counter"),
139 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
140 None,
141 )
142 .with_event_field(
143 &owned_value_path!("gauge"),
144 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
145 None,
146 )
147 .with_event_field(
148 &owned_value_path!("set"),
149 Kind::object(Collection::empty().with_known(
150 "values",
151 Kind::array(Collection::empty().with_unknown(Kind::bytes())),
152 ))
153 .or_undefined(),
154 None,
155 )
156 .with_event_field(
157 &owned_value_path!("distribution"),
158 Kind::object(
159 Collection::empty()
160 .with_known(
161 "samples",
162 Kind::array(
163 Collection::empty().with_unknown(Kind::object(
164 Collection::empty()
165 .with_known("value", Kind::float())
166 .with_known("rate", Kind::integer()),
167 )),
168 ),
169 )
170 .with_known("statistic", Kind::bytes()),
171 )
172 .or_undefined(),
173 None,
174 )
175 .with_event_field(
176 &owned_value_path!("aggregated_histogram"),
177 Kind::object(
178 Collection::empty()
179 .with_known(
180 "buckets",
181 Kind::array(
182 Collection::empty().with_unknown(Kind::object(
183 Collection::empty()
184 .with_known("upper_limit", Kind::float())
185 .with_known("count", Kind::integer()),
186 )),
187 ),
188 )
189 .with_known("count", Kind::integer())
190 .with_known("sum", Kind::float()),
191 )
192 .or_undefined(),
193 None,
194 )
195 .with_event_field(
196 &owned_value_path!("aggregated_summary"),
197 Kind::object(
198 Collection::empty()
199 .with_known(
200 "quantiles",
201 Kind::array(
202 Collection::empty().with_unknown(Kind::object(
203 Collection::empty()
204 .with_known("quantile", Kind::float())
205 .with_known("value", Kind::float()),
206 )),
207 ),
208 )
209 .with_known("count", Kind::integer())
210 .with_known("sum", Kind::float()),
211 )
212 .or_undefined(),
213 None,
214 )
215 .with_event_field(
216 &owned_value_path!("sketch"),
217 Kind::any().or_undefined(),
218 None,
219 );
220
221 match log_namespace {
222 LogNamespace::Vector => {
223 schema_definition = schema_definition.with_event_field(
225 &owned_value_path!("timestamp"),
226 Kind::bytes().or_undefined(),
227 None,
228 );
229
230 schema_definition = schema_definition.with_metadata_field(
233 &owned_value_path!("vector"),
234 Kind::object(Collection::empty()),
235 None,
236 );
237 }
238 LogNamespace::Legacy => {
239 if let Some(timestamp_key) = log_schema().timestamp_key() {
240 schema_definition =
241 schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None);
242 }
243
244 schema_definition = schema_definition.with_event_field(
245 log_schema().host_key().expect("valid host key"),
246 Kind::bytes().or_undefined(),
247 None,
248 );
249 }
250 }
251 schema_definition
252}
253
254#[derive(Clone, Debug)]
255pub struct MetricToLog {
256 host_tag: Option<OwnedValuePath>,
257 timezone: TimeZone,
258 log_namespace: LogNamespace,
259 tag_values: MetricTagValues,
260}
261
262impl MetricToLog {
263 pub fn new(
264 host_tag: Option<&str>,
265 timezone: TimeZone,
266 log_namespace: LogNamespace,
267 tag_values: MetricTagValues,
268 ) -> Self {
269 Self {
270 host_tag: host_tag.map_or(
271 log_schema().host_key().cloned().map(|mut key| {
272 key.push_front_field("tags");
273 key
274 }),
275 |host| Some(owned_value_path!("tags", host)),
276 ),
277 timezone,
278 log_namespace,
279 tag_values,
280 }
281 }
282
283 pub fn transform_one(&self, mut metric: Metric) -> Option<LogEvent> {
284 if self.tag_values == MetricTagValues::Single {
285 metric.reduce_tags_to_single();
286 }
287 serde_json::to_value(&metric)
288 .map_err(|error| emit!(MetricToLogSerializeError { error }))
289 .ok()
290 .and_then(|value| match value {
291 Value::Object(object) => {
292 let (_, _, metadata) = metric.into_parts();
293 let mut log = LogEvent::new_with_metadata(metadata);
294
295 for (key, value) in object {
297 log.insert(event_path!(&key), value);
298 }
299
300 if self.log_namespace == LogNamespace::Legacy {
301 let timestamp = log
304 .remove(event_path!("timestamp"))
305 .and_then(|value| {
306 Conversion::Timestamp(self.timezone)
307 .convert(value.coerce_to_bytes())
308 .ok()
309 })
310 .unwrap_or_else(|| event::Value::Timestamp(Utc::now()));
311
312 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
313
314 if let Some(host_tag) = &self.host_tag {
315 if let Some(host_value) =
316 log.remove_prune((PathPrefix::Event, host_tag), true)
317 {
318 log.maybe_insert(log_schema().host_key_target_path(), host_value);
319 }
320 }
321 }
322 if self.log_namespace == LogNamespace::Vector {
323 log.insert(
326 (PathPrefix::Metadata, path!("vector")),
327 vrl::value::Value::Object(BTreeMap::new()),
328 );
329 }
330 Some(log)
331 }
332 _ => None,
333 })
334 }
335}
336
337impl FunctionTransform for MetricToLog {
338 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
339 let retval: Option<Event> = self
340 .transform_one(event.into_metric())
341 .map(|log| log.into());
342 output.extend(retval.into_iter())
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use std::sync::Arc;
349
350 use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
351 use futures::executor::block_on;
352 use proptest::prelude::*;
353 use similar_asserts::assert_eq;
354 use tokio::sync::mpsc;
355 use tokio_stream::wrappers::ReceiverStream;
356 use vector_lib::config::ComponentKey;
357 use vector_lib::{event::EventMetadata, metric_tags};
358
359 use super::*;
360 use crate::event::{
361 metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet},
362 KeyString, Metric, Value,
363 };
364 use crate::test_util::{components::assert_transform_compliance, random_string};
365 use crate::transforms::test::create_topology;
366
367 #[test]
368 fn generate_config() {
369 crate::test_util::test_generate_config::<MetricToLogConfig>();
370 }
371
372 async fn do_transform(metric: Metric) -> Option<LogEvent> {
373 assert_transform_compliance(async move {
374 let config = MetricToLogConfig {
375 host_tag: Some("host".into()),
376 timezone: None,
377 log_namespace: Some(false),
378 ..Default::default()
379 };
380 let (tx, rx) = mpsc::channel(1);
381 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
382
383 tx.send(metric.into()).await.unwrap();
384
385 let result = out.recv().await;
386
387 drop(tx);
388 topology.stop().await;
389 assert_eq!(out.recv().await, None);
390
391 result
392 })
393 .await
394 .map(|e| e.into_log())
395 }
396
397 fn ts() -> DateTime<Utc> {
398 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
399 .single()
400 .and_then(|t| t.with_nanosecond(11))
401 .expect("invalid timestamp")
402 }
403
404 fn tags() -> MetricTags {
405 metric_tags! {
406 "host" => "localhost",
407 "some_tag" => "some_value",
408 }
409 }
410
411 fn event_metadata() -> EventMetadata {
412 EventMetadata::default().with_source_type("unit_test_stream")
413 }
414
415 #[tokio::test]
416 async fn transform_counter() {
417 let counter = Metric::new_with_metadata(
418 "counter",
419 MetricKind::Absolute,
420 MetricValue::Counter { value: 1.0 },
421 event_metadata(),
422 )
423 .with_tags(Some(tags()))
424 .with_timestamp(Some(ts()));
425 let mut metadata = counter.metadata().clone();
426 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
427 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
428 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
429
430 let log = do_transform(counter).await.unwrap();
431 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
432
433 assert_eq!(
434 collected,
435 vec![
436 (KeyString::from("counter.value"), &Value::from(1.0)),
437 (KeyString::from("host"), &Value::from("localhost")),
438 (KeyString::from("kind"), &Value::from("absolute")),
439 (KeyString::from("name"), &Value::from("counter")),
440 (KeyString::from("tags.some_tag"), &Value::from("some_value")),
441 (KeyString::from("timestamp"), &Value::from(ts())),
442 ]
443 );
444 assert_eq!(log.metadata(), &metadata);
445 }
446
447 #[tokio::test]
448 async fn transform_gauge() {
449 let gauge = Metric::new_with_metadata(
450 "gauge",
451 MetricKind::Absolute,
452 MetricValue::Gauge { value: 1.0 },
453 event_metadata(),
454 )
455 .with_timestamp(Some(ts()));
456 let mut metadata = gauge.metadata().clone();
457 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
458 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
459 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
460
461 let log = do_transform(gauge).await.unwrap();
462 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
463
464 assert_eq!(
465 collected,
466 vec![
467 (KeyString::from("gauge.value"), &Value::from(1.0)),
468 (KeyString::from("kind"), &Value::from("absolute")),
469 (KeyString::from("name"), &Value::from("gauge")),
470 (KeyString::from("timestamp"), &Value::from(ts())),
471 ]
472 );
473 assert_eq!(log.metadata(), &metadata);
474 }
475
476 #[tokio::test]
477 async fn transform_set() {
478 let set = Metric::new_with_metadata(
479 "set",
480 MetricKind::Absolute,
481 MetricValue::Set {
482 values: vec!["one".into(), "two".into()].into_iter().collect(),
483 },
484 event_metadata(),
485 )
486 .with_timestamp(Some(ts()));
487 let mut metadata = set.metadata().clone();
488 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
489 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
490 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
491
492 let log = do_transform(set).await.unwrap();
493 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
494
495 assert_eq!(
496 collected,
497 vec![
498 (KeyString::from("kind"), &Value::from("absolute")),
499 (KeyString::from("name"), &Value::from("set")),
500 (KeyString::from("set.values[0]"), &Value::from("one")),
501 (KeyString::from("set.values[1]"), &Value::from("two")),
502 (KeyString::from("timestamp"), &Value::from(ts())),
503 ]
504 );
505 assert_eq!(log.metadata(), &metadata);
506 }
507
508 #[tokio::test]
509 async fn transform_distribution() {
510 let distro = Metric::new_with_metadata(
511 "distro",
512 MetricKind::Absolute,
513 MetricValue::Distribution {
514 samples: vector_lib::samples![1.0 => 10, 2.0 => 20],
515 statistic: StatisticKind::Histogram,
516 },
517 event_metadata(),
518 )
519 .with_timestamp(Some(ts()));
520 let mut metadata = distro.metadata().clone();
521 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
522 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
523 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
524
525 let log = do_transform(distro).await.unwrap();
526 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
527
528 assert_eq!(
529 collected,
530 vec![
531 (
532 KeyString::from("distribution.samples[0].rate"),
533 &Value::from(10)
534 ),
535 (
536 KeyString::from("distribution.samples[0].value"),
537 &Value::from(1.0)
538 ),
539 (
540 KeyString::from("distribution.samples[1].rate"),
541 &Value::from(20)
542 ),
543 (
544 KeyString::from("distribution.samples[1].value"),
545 &Value::from(2.0)
546 ),
547 (
548 KeyString::from("distribution.statistic"),
549 &Value::from("histogram")
550 ),
551 (KeyString::from("kind"), &Value::from("absolute")),
552 (KeyString::from("name"), &Value::from("distro")),
553 (KeyString::from("timestamp"), &Value::from(ts())),
554 ]
555 );
556 assert_eq!(log.metadata(), &metadata);
557 }
558
559 #[tokio::test]
560 async fn transform_histogram() {
561 let histo = Metric::new_with_metadata(
562 "histo",
563 MetricKind::Absolute,
564 MetricValue::AggregatedHistogram {
565 buckets: vector_lib::buckets![1.0 => 10, 2.0 => 20],
566 count: 30,
567 sum: 50.0,
568 },
569 event_metadata(),
570 )
571 .with_timestamp(Some(ts()));
572 let mut metadata = histo.metadata().clone();
573 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
574 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
575 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
576
577 let log = do_transform(histo).await.unwrap();
578 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
579
580 assert_eq!(
581 collected,
582 vec![
583 (
584 KeyString::from("aggregated_histogram.buckets[0].count"),
585 &Value::from(10)
586 ),
587 (
588 KeyString::from("aggregated_histogram.buckets[0].upper_limit"),
589 &Value::from(1.0)
590 ),
591 (
592 KeyString::from("aggregated_histogram.buckets[1].count"),
593 &Value::from(20)
594 ),
595 (
596 KeyString::from("aggregated_histogram.buckets[1].upper_limit"),
597 &Value::from(2.0)
598 ),
599 (
600 KeyString::from("aggregated_histogram.count"),
601 &Value::from(30)
602 ),
603 (
604 KeyString::from("aggregated_histogram.sum"),
605 &Value::from(50.0)
606 ),
607 (KeyString::from("kind"), &Value::from("absolute")),
608 (KeyString::from("name"), &Value::from("histo")),
609 (KeyString::from("timestamp"), &Value::from(ts())),
610 ]
611 );
612 assert_eq!(log.metadata(), &metadata);
613 }
614
615 #[tokio::test]
616 async fn transform_summary() {
617 let summary = Metric::new_with_metadata(
618 "summary",
619 MetricKind::Absolute,
620 MetricValue::AggregatedSummary {
621 quantiles: vector_lib::quantiles![50.0 => 10.0, 90.0 => 20.0],
622 count: 30,
623 sum: 50.0,
624 },
625 event_metadata(),
626 )
627 .with_timestamp(Some(ts()));
628 let mut metadata = summary.metadata().clone();
629 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
630 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
631 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
632
633 let log = do_transform(summary).await.unwrap();
634 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
635
636 assert_eq!(
637 collected,
638 vec![
639 (
640 KeyString::from("aggregated_summary.count"),
641 &Value::from(30)
642 ),
643 (
644 KeyString::from("aggregated_summary.quantiles[0].quantile"),
645 &Value::from(50.0)
646 ),
647 (
648 KeyString::from("aggregated_summary.quantiles[0].value"),
649 &Value::from(10.0)
650 ),
651 (
652 KeyString::from("aggregated_summary.quantiles[1].quantile"),
653 &Value::from(90.0)
654 ),
655 (
656 KeyString::from("aggregated_summary.quantiles[1].value"),
657 &Value::from(20.0)
658 ),
659 (
660 KeyString::from("aggregated_summary.sum"),
661 &Value::from(50.0)
662 ),
663 (KeyString::from("kind"), &Value::from("absolute")),
664 (KeyString::from("name"), &Value::from("summary")),
665 (KeyString::from("timestamp"), &Value::from(ts())),
666 ]
667 );
668 assert_eq!(log.metadata(), &metadata);
669 }
670
671 proptest! {
673 #[test]
674 fn transform_tag_single_encoding(values: TagValueSet) {
675 let name = random_string(16);
676 let tags = block_on(transform_tags(
677 MetricTagValues::Single,
678 values.iter()
679 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
680 .collect(),
681 ));
682 let value = values.into_single().map(|value| Value::Bytes(value.into()));
684 assert_eq!(tags.get(&*name), value.as_ref());
685 }
686
687 #[test]
688 fn transform_tag_full_encoding(values: TagValueSet) {
689 let name = random_string(16);
690 let tags = block_on(transform_tags(
691 MetricTagValues::Full,
692 values.iter()
693 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
694 .collect(),
695 ));
696 let tag = tags.get(&*name);
697 match values.len() {
698 0 => assert_eq!(tag, None),
700 1 => assert_eq!(tag, Some(&tag_to_value(values.into_iter().next().unwrap()))),
702 _ => assert_eq!(tag, Some(&Value::Array(values.into_iter().map(tag_to_value).collect()))),
704 }
705 }
706 }
707
708 fn tag_to_value(tag: TagValue) -> Value {
709 tag.into_option().into()
710 }
711
712 async fn transform_tags(metric_tag_values: MetricTagValues, tags: MetricTags) -> Value {
713 let counter = Metric::new(
714 "counter",
715 MetricKind::Absolute,
716 MetricValue::Counter { value: 1.0 },
717 )
718 .with_tags(Some(tags))
719 .with_timestamp(Some(ts()));
720
721 let mut output = OutputBuffer::with_capacity(1);
722
723 MetricToLogConfig {
724 metric_tag_values,
725 ..Default::default()
726 }
727 .build_transform(&TransformContext::default())
728 .transform(&mut output, counter.into());
729
730 assert_eq!(output.len(), 1);
731 output.into_events().next().unwrap().into_log()["tags"].clone()
732 }
733}