1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::Utc;
4use serde_json::Value;
5use vector_lib::{
6 TimeZone,
7 codecs::MetricTagValues,
8 config::LogNamespace,
9 configurable::configurable_component,
10 lookup::{PathPrefix, event_path, owned_value_path, path},
11};
12use vrl::{
13 path::OwnedValuePath,
14 value::{Kind, kind::Collection},
15};
16
17use crate::{
18 config::{
19 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
20 TransformOutput, log_schema,
21 },
22 event::{self, Event, LogEvent, Metric},
23 internal_events::MetricToLogSerializeError,
24 schema::Definition,
25 transforms::{FunctionTransform, OutputBuffer, Transform},
26 types::Conversion,
27};
28
29#[configurable_component(transform("metric_to_log", "Convert metric events to log events."))]
31#[derive(Clone, Debug, Default)]
32#[serde(deny_unknown_fields)]
33pub struct MetricToLogConfig {
34 #[configurable(metadata(docs::examples = "host", docs::examples = "hostname"))]
41 pub host_tag: Option<String>,
42
43 pub timezone: Option<TimeZone>,
52
53 #[serde(default)]
55 #[configurable(metadata(docs::hidden))]
56 pub log_namespace: Option<bool>,
57
58 #[serde(default)]
66 pub metric_tag_values: MetricTagValues,
67}
68
69impl MetricToLogConfig {
70 pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
71 MetricToLog::new(
72 self.host_tag.as_deref(),
73 self.timezone.unwrap_or_else(|| context.globals.timezone()),
74 context.log_namespace(self.log_namespace),
75 self.metric_tag_values,
76 )
77 }
78}
79
80impl GenerateConfig for MetricToLogConfig {
81 fn generate_config() -> toml::Value {
82 toml::Value::try_from(Self {
83 host_tag: Some("host-tag".to_string()),
84 timezone: None,
85 log_namespace: None,
86 metric_tag_values: MetricTagValues::Single,
87 })
88 .unwrap()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "metric_to_log")]
94impl TransformConfig for MetricToLogConfig {
95 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
96 Ok(Transform::function(self.build_transform(context)))
97 }
98
99 fn input(&self) -> Input {
100 Input::metric()
101 }
102
103 fn outputs(
104 &self,
105 context: &TransformContext,
106 input_definitions: &[(OutputId, Definition)],
107 ) -> Vec<TransformOutput> {
108 let log_namespace = context.schema.log_namespace().merge(self.log_namespace);
109 let schema_definition = schema_definition(log_namespace);
110
111 vec![TransformOutput::new(
112 DataType::Log,
113 input_definitions
114 .iter()
115 .map(|(output, _)| (output.clone(), schema_definition.clone()))
116 .collect(),
117 )]
118 }
119
120 fn enable_concurrency(&self) -> bool {
121 true
122 }
123}
124
125fn schema_definition(log_namespace: LogNamespace) -> Definition {
126 let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace]))
127 .with_event_field(&owned_value_path!("name"), Kind::bytes(), None)
128 .with_event_field(
129 &owned_value_path!("namespace"),
130 Kind::bytes().or_undefined(),
131 None,
132 )
133 .with_event_field(
134 &owned_value_path!("tags"),
135 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
136 None,
137 )
138 .with_event_field(&owned_value_path!("kind"), Kind::bytes(), None)
139 .with_event_field(
140 &owned_value_path!("counter"),
141 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
142 None,
143 )
144 .with_event_field(
145 &owned_value_path!("gauge"),
146 Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
147 None,
148 )
149 .with_event_field(
150 &owned_value_path!("set"),
151 Kind::object(Collection::empty().with_known(
152 "values",
153 Kind::array(Collection::empty().with_unknown(Kind::bytes())),
154 ))
155 .or_undefined(),
156 None,
157 )
158 .with_event_field(
159 &owned_value_path!("distribution"),
160 Kind::object(
161 Collection::empty()
162 .with_known(
163 "samples",
164 Kind::array(
165 Collection::empty().with_unknown(Kind::object(
166 Collection::empty()
167 .with_known("value", Kind::float())
168 .with_known("rate", Kind::integer()),
169 )),
170 ),
171 )
172 .with_known("statistic", Kind::bytes()),
173 )
174 .or_undefined(),
175 None,
176 )
177 .with_event_field(
178 &owned_value_path!("aggregated_histogram"),
179 Kind::object(
180 Collection::empty()
181 .with_known(
182 "buckets",
183 Kind::array(
184 Collection::empty().with_unknown(Kind::object(
185 Collection::empty()
186 .with_known("upper_limit", Kind::float())
187 .with_known("count", Kind::integer()),
188 )),
189 ),
190 )
191 .with_known("count", Kind::integer())
192 .with_known("sum", Kind::float()),
193 )
194 .or_undefined(),
195 None,
196 )
197 .with_event_field(
198 &owned_value_path!("aggregated_summary"),
199 Kind::object(
200 Collection::empty()
201 .with_known(
202 "quantiles",
203 Kind::array(
204 Collection::empty().with_unknown(Kind::object(
205 Collection::empty()
206 .with_known("quantile", Kind::float())
207 .with_known("value", Kind::float()),
208 )),
209 ),
210 )
211 .with_known("count", Kind::integer())
212 .with_known("sum", Kind::float()),
213 )
214 .or_undefined(),
215 None,
216 )
217 .with_event_field(
218 &owned_value_path!("sketch"),
219 Kind::any().or_undefined(),
220 None,
221 );
222
223 match log_namespace {
224 LogNamespace::Vector => {
225 schema_definition = schema_definition.with_event_field(
227 &owned_value_path!("timestamp"),
228 Kind::bytes().or_undefined(),
229 None,
230 );
231
232 schema_definition = schema_definition.with_metadata_field(
235 &owned_value_path!("vector"),
236 Kind::object(Collection::empty()),
237 None,
238 );
239 }
240 LogNamespace::Legacy => {
241 if let Some(timestamp_key) = log_schema().timestamp_key() {
242 schema_definition =
243 schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None);
244 }
245
246 schema_definition = schema_definition.with_event_field(
247 log_schema().host_key().expect("valid host key"),
248 Kind::bytes().or_undefined(),
249 None,
250 );
251 }
252 }
253 schema_definition
254}
255
256#[derive(Clone, Debug)]
257pub struct MetricToLog {
258 host_tag: Option<OwnedValuePath>,
259 timezone: TimeZone,
260 log_namespace: LogNamespace,
261 tag_values: MetricTagValues,
262}
263
264impl MetricToLog {
265 pub fn new(
266 host_tag: Option<&str>,
267 timezone: TimeZone,
268 log_namespace: LogNamespace,
269 tag_values: MetricTagValues,
270 ) -> Self {
271 Self {
272 host_tag: host_tag.map_or(
273 log_schema().host_key().cloned().map(|mut key| {
274 key.push_front_field("tags");
275 key
276 }),
277 |host| Some(owned_value_path!("tags", host)),
278 ),
279 timezone,
280 log_namespace,
281 tag_values,
282 }
283 }
284
285 pub fn transform_one(&self, mut metric: Metric) -> Option<LogEvent> {
286 if self.tag_values == MetricTagValues::Single {
287 metric.reduce_tags_to_single();
288 }
289 serde_json::to_value(&metric)
290 .map_err(|error| emit!(MetricToLogSerializeError { error }))
291 .ok()
292 .and_then(|value| match value {
293 Value::Object(object) => {
294 let (_, _, metadata) = metric.into_parts();
295 let mut log = LogEvent::new_with_metadata(metadata);
296
297 for (key, value) in object {
299 log.insert(event_path!(&key), value);
300 }
301
302 if self.log_namespace == LogNamespace::Legacy {
303 let timestamp = log
306 .remove(event_path!("timestamp"))
307 .and_then(|value| {
308 Conversion::Timestamp(self.timezone)
309 .convert(value.coerce_to_bytes())
310 .ok()
311 })
312 .unwrap_or_else(|| event::Value::Timestamp(Utc::now()));
313
314 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
315
316 if let Some(host_tag) = &self.host_tag
317 && let Some(host_value) =
318 log.remove_prune((PathPrefix::Event, host_tag), true)
319 {
320 log.maybe_insert(log_schema().host_key_target_path(), host_value);
321 }
322 }
323 if self.log_namespace == LogNamespace::Vector {
324 log.insert(
327 (PathPrefix::Metadata, path!("vector")),
328 vrl::value::Value::Object(BTreeMap::new()),
329 );
330 }
331 Some(log)
332 }
333 _ => None,
334 })
335 }
336}
337
338impl FunctionTransform for MetricToLog {
339 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
340 let retval: Option<Event> = self
341 .transform_one(event.into_metric())
342 .map(|log| log.into());
343 output.extend(retval.into_iter())
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use std::sync::Arc;
350
351 use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
352 use futures::executor::block_on;
353 use proptest::prelude::*;
354 use similar_asserts::assert_eq;
355 use tokio::sync::mpsc;
356 use tokio_stream::wrappers::ReceiverStream;
357 use vector_lib::{config::ComponentKey, event::EventMetadata, metric_tags};
358
359 use super::*;
360 use crate::{
361 event::{
362 KeyString, Metric, Value,
363 metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet},
364 },
365 test_util::{components::assert_transform_compliance, random_string},
366 transforms::test::create_topology,
367 };
368
369 #[test]
370 fn generate_config() {
371 crate::test_util::test_generate_config::<MetricToLogConfig>();
372 }
373
374 async fn do_transform(metric: Metric) -> Option<LogEvent> {
375 assert_transform_compliance(async move {
376 let config = MetricToLogConfig {
377 host_tag: Some("host".into()),
378 timezone: None,
379 log_namespace: Some(false),
380 ..Default::default()
381 };
382 let (tx, rx) = mpsc::channel(1);
383 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
384
385 tx.send(metric.into()).await.unwrap();
386
387 let result = out.recv().await;
388
389 drop(tx);
390 topology.stop().await;
391 assert_eq!(out.recv().await, None);
392
393 result
394 })
395 .await
396 .map(|e| e.into_log())
397 }
398
399 fn ts() -> DateTime<Utc> {
400 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
401 .single()
402 .and_then(|t| t.with_nanosecond(11))
403 .expect("invalid timestamp")
404 }
405
406 fn tags() -> MetricTags {
407 metric_tags! {
408 "host" => "localhost",
409 "some_tag" => "some_value",
410 }
411 }
412
413 fn event_metadata() -> EventMetadata {
414 EventMetadata::default().with_source_type("unit_test_stream")
415 }
416
417 #[tokio::test]
418 async fn transform_counter() {
419 let counter = Metric::new_with_metadata(
420 "counter",
421 MetricKind::Absolute,
422 MetricValue::Counter { value: 1.0 },
423 event_metadata(),
424 )
425 .with_tags(Some(tags()))
426 .with_timestamp(Some(ts()));
427 let mut metadata = counter.metadata().clone();
428 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
429 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
430 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
431
432 let log = do_transform(counter).await.unwrap();
433 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
434
435 assert_eq!(
436 collected,
437 vec![
438 (KeyString::from("counter.value"), &Value::from(1.0)),
439 (KeyString::from("host"), &Value::from("localhost")),
440 (KeyString::from("kind"), &Value::from("absolute")),
441 (KeyString::from("name"), &Value::from("counter")),
442 (KeyString::from("tags.some_tag"), &Value::from("some_value")),
443 (KeyString::from("timestamp"), &Value::from(ts())),
444 ]
445 );
446 assert_eq!(log.metadata(), &metadata);
447 }
448
449 #[tokio::test]
450 async fn transform_gauge() {
451 let gauge = Metric::new_with_metadata(
452 "gauge",
453 MetricKind::Absolute,
454 MetricValue::Gauge { value: 1.0 },
455 event_metadata(),
456 )
457 .with_timestamp(Some(ts()));
458 let mut metadata = gauge.metadata().clone();
459 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
460 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
461 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
462
463 let log = do_transform(gauge).await.unwrap();
464 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
465
466 assert_eq!(
467 collected,
468 vec![
469 (KeyString::from("gauge.value"), &Value::from(1.0)),
470 (KeyString::from("kind"), &Value::from("absolute")),
471 (KeyString::from("name"), &Value::from("gauge")),
472 (KeyString::from("timestamp"), &Value::from(ts())),
473 ]
474 );
475 assert_eq!(log.metadata(), &metadata);
476 }
477
478 #[tokio::test]
479 async fn transform_set() {
480 let set = Metric::new_with_metadata(
481 "set",
482 MetricKind::Absolute,
483 MetricValue::Set {
484 values: vec!["one".into(), "two".into()].into_iter().collect(),
485 },
486 event_metadata(),
487 )
488 .with_timestamp(Some(ts()));
489 let mut metadata = set.metadata().clone();
490 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
491 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
492 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
493
494 let log = do_transform(set).await.unwrap();
495 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
496
497 assert_eq!(
498 collected,
499 vec![
500 (KeyString::from("kind"), &Value::from("absolute")),
501 (KeyString::from("name"), &Value::from("set")),
502 (KeyString::from("set.values[0]"), &Value::from("one")),
503 (KeyString::from("set.values[1]"), &Value::from("two")),
504 (KeyString::from("timestamp"), &Value::from(ts())),
505 ]
506 );
507 assert_eq!(log.metadata(), &metadata);
508 }
509
510 #[tokio::test]
511 async fn transform_distribution() {
512 let distro = Metric::new_with_metadata(
513 "distro",
514 MetricKind::Absolute,
515 MetricValue::Distribution {
516 samples: vector_lib::samples![1.0 => 10, 2.0 => 20],
517 statistic: StatisticKind::Histogram,
518 },
519 event_metadata(),
520 )
521 .with_timestamp(Some(ts()));
522 let mut metadata = distro.metadata().clone();
523 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
524 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
525 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
526
527 let log = do_transform(distro).await.unwrap();
528 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
529
530 assert_eq!(
531 collected,
532 vec![
533 (
534 KeyString::from("distribution.samples[0].rate"),
535 &Value::from(10)
536 ),
537 (
538 KeyString::from("distribution.samples[0].value"),
539 &Value::from(1.0)
540 ),
541 (
542 KeyString::from("distribution.samples[1].rate"),
543 &Value::from(20)
544 ),
545 (
546 KeyString::from("distribution.samples[1].value"),
547 &Value::from(2.0)
548 ),
549 (
550 KeyString::from("distribution.statistic"),
551 &Value::from("histogram")
552 ),
553 (KeyString::from("kind"), &Value::from("absolute")),
554 (KeyString::from("name"), &Value::from("distro")),
555 (KeyString::from("timestamp"), &Value::from(ts())),
556 ]
557 );
558 assert_eq!(log.metadata(), &metadata);
559 }
560
561 #[tokio::test]
562 async fn transform_histogram() {
563 let histo = Metric::new_with_metadata(
564 "histo",
565 MetricKind::Absolute,
566 MetricValue::AggregatedHistogram {
567 buckets: vector_lib::buckets![1.0 => 10, 2.0 => 20],
568 count: 30,
569 sum: 50.0,
570 },
571 event_metadata(),
572 )
573 .with_timestamp(Some(ts()));
574 let mut metadata = histo.metadata().clone();
575 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
576 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
577 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
578
579 let log = do_transform(histo).await.unwrap();
580 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
581
582 assert_eq!(
583 collected,
584 vec![
585 (
586 KeyString::from("aggregated_histogram.buckets[0].count"),
587 &Value::from(10)
588 ),
589 (
590 KeyString::from("aggregated_histogram.buckets[0].upper_limit"),
591 &Value::from(1.0)
592 ),
593 (
594 KeyString::from("aggregated_histogram.buckets[1].count"),
595 &Value::from(20)
596 ),
597 (
598 KeyString::from("aggregated_histogram.buckets[1].upper_limit"),
599 &Value::from(2.0)
600 ),
601 (
602 KeyString::from("aggregated_histogram.count"),
603 &Value::from(30)
604 ),
605 (
606 KeyString::from("aggregated_histogram.sum"),
607 &Value::from(50.0)
608 ),
609 (KeyString::from("kind"), &Value::from("absolute")),
610 (KeyString::from("name"), &Value::from("histo")),
611 (KeyString::from("timestamp"), &Value::from(ts())),
612 ]
613 );
614 assert_eq!(log.metadata(), &metadata);
615 }
616
617 #[tokio::test]
618 async fn transform_summary() {
619 let summary = Metric::new_with_metadata(
620 "summary",
621 MetricKind::Absolute,
622 MetricValue::AggregatedSummary {
623 quantiles: vector_lib::quantiles![50.0 => 10.0, 90.0 => 20.0],
624 count: 30,
625 sum: 50.0,
626 },
627 event_metadata(),
628 )
629 .with_timestamp(Some(ts()));
630 let mut metadata = summary.metadata().clone();
631 metadata.set_source_id(Arc::new(ComponentKey::from("in")));
632 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
633 metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
634
635 let log = do_transform(summary).await.unwrap();
636 let collected: Vec<_> = log.all_event_fields().unwrap().collect();
637
638 assert_eq!(
639 collected,
640 vec![
641 (
642 KeyString::from("aggregated_summary.count"),
643 &Value::from(30)
644 ),
645 (
646 KeyString::from("aggregated_summary.quantiles[0].quantile"),
647 &Value::from(50.0)
648 ),
649 (
650 KeyString::from("aggregated_summary.quantiles[0].value"),
651 &Value::from(10.0)
652 ),
653 (
654 KeyString::from("aggregated_summary.quantiles[1].quantile"),
655 &Value::from(90.0)
656 ),
657 (
658 KeyString::from("aggregated_summary.quantiles[1].value"),
659 &Value::from(20.0)
660 ),
661 (
662 KeyString::from("aggregated_summary.sum"),
663 &Value::from(50.0)
664 ),
665 (KeyString::from("kind"), &Value::from("absolute")),
666 (KeyString::from("name"), &Value::from("summary")),
667 (KeyString::from("timestamp"), &Value::from(ts())),
668 ]
669 );
670 assert_eq!(log.metadata(), &metadata);
671 }
672
673 proptest! {
675 #[test]
676 fn transform_tag_single_encoding(values: TagValueSet) {
677 let name = random_string(16);
678 let tags = block_on(transform_tags(
679 MetricTagValues::Single,
680 values.iter()
681 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
682 .collect(),
683 ));
684 let value = values.into_single().map(|value| Value::Bytes(value.into()));
686 assert_eq!(tags.get(&*name), value.as_ref());
687 }
688
689 #[test]
690 fn transform_tag_full_encoding(values: TagValueSet) {
691 let name = random_string(16);
692 let tags = block_on(transform_tags(
693 MetricTagValues::Full,
694 values.iter()
695 .map(|value| (name.clone(), TagValue::from(value.map(String::from))))
696 .collect(),
697 ));
698 let tag = tags.get(&*name);
699 match values.len() {
700 0 => assert_eq!(tag, None),
702 1 => assert_eq!(tag, Some(&tag_to_value(values.into_iter().next().unwrap()))),
704 _ => assert_eq!(tag, Some(&Value::Array(values.into_iter().map(tag_to_value).collect()))),
706 }
707 }
708 }
709
710 fn tag_to_value(tag: TagValue) -> Value {
711 tag.into_option().into()
712 }
713
714 async fn transform_tags(metric_tag_values: MetricTagValues, tags: MetricTags) -> Value {
715 let counter = Metric::new(
716 "counter",
717 MetricKind::Absolute,
718 MetricValue::Counter { value: 1.0 },
719 )
720 .with_tags(Some(tags))
721 .with_timestamp(Some(ts()));
722
723 let mut output = OutputBuffer::with_capacity(1);
724
725 MetricToLogConfig {
726 metric_tag_values,
727 ..Default::default()
728 }
729 .build_transform(&TransformContext::default())
730 .transform(&mut output, counter.into());
731
732 assert_eq!(output.len(), 1);
733 output.into_events().next().unwrap().into_log()["tags"].clone()
734 }
735}