use chrono::Utc;
use serde_json::Value;
use std::collections::{BTreeMap, BTreeSet};
use vector_lib::codecs::MetricTagValues;
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{event_path, owned_value_path, path, PathPrefix};
use vector_lib::TimeZone;
use vrl::path::OwnedValuePath;
use vrl::value::kind::Collection;
use vrl::value::Kind;
use crate::config::OutputId;
use crate::{
config::{
log_schema, DataType, GenerateConfig, Input, TransformConfig, TransformContext,
TransformOutput,
},
event::{self, Event, LogEvent, Metric},
internal_events::MetricToLogSerializeError,
schema::Definition,
transforms::{FunctionTransform, OutputBuffer, Transform},
types::Conversion,
};
#[configurable_component(transform("metric_to_log", "Convert metric events to log events."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct MetricToLogConfig {
#[configurable(metadata(docs::examples = "host", docs::examples = "hostname"))]
pub host_tag: Option<String>,
pub timezone: Option<TimeZone>,
#[serde(default)]
#[configurable(metadata(docs::hidden))]
pub log_namespace: Option<bool>,
#[serde(default)]
pub metric_tag_values: MetricTagValues,
}
impl MetricToLogConfig {
pub fn build_transform(&self, context: &TransformContext) -> MetricToLog {
MetricToLog::new(
self.host_tag.as_deref(),
self.timezone.unwrap_or_else(|| context.globals.timezone()),
context.log_namespace(self.log_namespace),
self.metric_tag_values,
)
}
}
impl GenerateConfig for MetricToLogConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
host_tag: Some("host-tag".to_string()),
timezone: None,
log_namespace: None,
metric_tag_values: MetricTagValues::Single,
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "metric_to_log")]
impl TransformConfig for MetricToLogConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Ok(Transform::function(self.build_transform(context)))
}
fn input(&self) -> Input {
Input::metric()
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, Definition)],
global_log_namespace: LogNamespace,
) -> Vec<TransformOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = schema_definition(log_namespace);
vec![TransformOutput::new(
DataType::Log,
input_definitions
.iter()
.map(|(output, _)| (output.clone(), schema_definition.clone()))
.collect(),
)]
}
fn enable_concurrency(&self) -> bool {
true
}
}
fn schema_definition(log_namespace: LogNamespace) -> Definition {
let mut schema_definition = Definition::default_for_namespace(&BTreeSet::from([log_namespace]))
.with_event_field(&owned_value_path!("name"), Kind::bytes(), None)
.with_event_field(
&owned_value_path!("namespace"),
Kind::bytes().or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("tags"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
)
.with_event_field(&owned_value_path!("kind"), Kind::bytes(), None)
.with_event_field(
&owned_value_path!("counter"),
Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("gauge"),
Kind::object(Collection::empty().with_known("value", Kind::float())).or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("set"),
Kind::object(Collection::empty().with_known(
"values",
Kind::array(Collection::empty().with_unknown(Kind::bytes())),
))
.or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("distribution"),
Kind::object(
Collection::empty()
.with_known(
"samples",
Kind::array(
Collection::empty().with_unknown(Kind::object(
Collection::empty()
.with_known("value", Kind::float())
.with_known("rate", Kind::integer()),
)),
),
)
.with_known("statistic", Kind::bytes()),
)
.or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("aggregated_histogram"),
Kind::object(
Collection::empty()
.with_known(
"buckets",
Kind::array(
Collection::empty().with_unknown(Kind::object(
Collection::empty()
.with_known("upper_limit", Kind::float())
.with_known("count", Kind::integer()),
)),
),
)
.with_known("count", Kind::integer())
.with_known("sum", Kind::float()),
)
.or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("aggregated_summary"),
Kind::object(
Collection::empty()
.with_known(
"quantiles",
Kind::array(
Collection::empty().with_unknown(Kind::object(
Collection::empty()
.with_known("quantile", Kind::float())
.with_known("value", Kind::float()),
)),
),
)
.with_known("count", Kind::integer())
.with_known("sum", Kind::float()),
)
.or_undefined(),
None,
)
.with_event_field(
&owned_value_path!("sketch"),
Kind::any().or_undefined(),
None,
);
match log_namespace {
LogNamespace::Vector => {
schema_definition = schema_definition.with_event_field(
&owned_value_path!("timestamp"),
Kind::bytes().or_undefined(),
None,
);
schema_definition = schema_definition.with_metadata_field(
&owned_value_path!("vector"),
Kind::object(Collection::empty()),
None,
);
}
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key() {
schema_definition =
schema_definition.with_event_field(timestamp_key, Kind::timestamp(), None);
}
schema_definition = schema_definition.with_event_field(
log_schema().host_key().expect("valid host key"),
Kind::bytes().or_undefined(),
None,
);
}
}
schema_definition
}
#[derive(Clone, Debug)]
pub struct MetricToLog {
host_tag: Option<OwnedValuePath>,
timezone: TimeZone,
log_namespace: LogNamespace,
tag_values: MetricTagValues,
}
impl MetricToLog {
pub fn new(
host_tag: Option<&str>,
timezone: TimeZone,
log_namespace: LogNamespace,
tag_values: MetricTagValues,
) -> Self {
Self {
host_tag: host_tag.map_or(
log_schema().host_key().cloned().map(|mut key| {
key.push_front_field("tags");
key
}),
|host| Some(owned_value_path!("tags", host)),
),
timezone,
log_namespace,
tag_values,
}
}
pub fn transform_one(&self, mut metric: Metric) -> Option<LogEvent> {
if self.tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_value(&metric)
.map_err(|error| emit!(MetricToLogSerializeError { error }))
.ok()
.and_then(|value| match value {
Value::Object(object) => {
let (_, _, metadata) = metric.into_parts();
let mut log = LogEvent::new_with_metadata(metadata);
for (key, value) in object {
log.insert(event_path!(&key), value);
}
if self.log_namespace == LogNamespace::Legacy {
let timestamp = log
.remove(event_path!("timestamp"))
.and_then(|value| {
Conversion::Timestamp(self.timezone)
.convert(value.coerce_to_bytes())
.ok()
})
.unwrap_or_else(|| event::Value::Timestamp(Utc::now()));
log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
if let Some(host_tag) = &self.host_tag {
if let Some(host_value) =
log.remove_prune((PathPrefix::Event, host_tag), true)
{
log.maybe_insert(log_schema().host_key_target_path(), host_value);
}
}
}
if self.log_namespace == LogNamespace::Vector {
log.insert(
(PathPrefix::Metadata, path!("vector")),
vrl::value::Value::Object(BTreeMap::new()),
);
}
Some(log)
}
_ => None,
})
}
}
impl FunctionTransform for MetricToLog {
fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
let retval: Option<Event> = self
.transform_one(event.into_metric())
.map(|log| log.into());
output.extend(retval.into_iter())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
use futures::executor::block_on;
use proptest::prelude::*;
use similar_asserts::assert_eq;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use vector_lib::config::ComponentKey;
use vector_lib::{event::EventMetadata, metric_tags};
use super::*;
use crate::event::{
metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet},
KeyString, Metric, Value,
};
use crate::test_util::{components::assert_transform_compliance, random_string};
use crate::transforms::test::create_topology;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<MetricToLogConfig>();
}
async fn do_transform(metric: Metric) -> Option<LogEvent> {
assert_transform_compliance(async move {
let config = MetricToLogConfig {
host_tag: Some("host".into()),
timezone: None,
log_namespace: Some(false),
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
tx.send(metric.into()).await.unwrap();
let result = out.recv().await;
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
result
})
.await
.map(|e| e.into_log())
}
fn ts() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp")
}
fn tags() -> MetricTags {
metric_tags! {
"host" => "localhost",
"some_tag" => "some_value",
}
}
fn event_metadata() -> EventMetadata {
EventMetadata::default().with_source_type("unit_test_stream")
}
#[tokio::test]
async fn transform_counter() {
let counter = Metric::new_with_metadata(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
event_metadata(),
)
.with_tags(Some(tags()))
.with_timestamp(Some(ts()));
let mut metadata = counter.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(counter).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(KeyString::from("counter.value"), &Value::from(1.0)),
(KeyString::from("host"), &Value::from("localhost")),
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("counter")),
(KeyString::from("tags.some_tag"), &Value::from("some_value")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
#[tokio::test]
async fn transform_gauge() {
let gauge = Metric::new_with_metadata(
"gauge",
MetricKind::Absolute,
MetricValue::Gauge { value: 1.0 },
event_metadata(),
)
.with_timestamp(Some(ts()));
let mut metadata = gauge.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(gauge).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(KeyString::from("gauge.value"), &Value::from(1.0)),
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("gauge")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
#[tokio::test]
async fn transform_set() {
let set = Metric::new_with_metadata(
"set",
MetricKind::Absolute,
MetricValue::Set {
values: vec!["one".into(), "two".into()].into_iter().collect(),
},
event_metadata(),
)
.with_timestamp(Some(ts()));
let mut metadata = set.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(set).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("set")),
(KeyString::from("set.values[0]"), &Value::from("one")),
(KeyString::from("set.values[1]"), &Value::from("two")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
#[tokio::test]
async fn transform_distribution() {
let distro = Metric::new_with_metadata(
"distro",
MetricKind::Absolute,
MetricValue::Distribution {
samples: vector_lib::samples![1.0 => 10, 2.0 => 20],
statistic: StatisticKind::Histogram,
},
event_metadata(),
)
.with_timestamp(Some(ts()));
let mut metadata = distro.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(distro).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(
KeyString::from("distribution.samples[0].rate"),
&Value::from(10)
),
(
KeyString::from("distribution.samples[0].value"),
&Value::from(1.0)
),
(
KeyString::from("distribution.samples[1].rate"),
&Value::from(20)
),
(
KeyString::from("distribution.samples[1].value"),
&Value::from(2.0)
),
(
KeyString::from("distribution.statistic"),
&Value::from("histogram")
),
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("distro")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
#[tokio::test]
async fn transform_histogram() {
let histo = Metric::new_with_metadata(
"histo",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
buckets: vector_lib::buckets![1.0 => 10, 2.0 => 20],
count: 30,
sum: 50.0,
},
event_metadata(),
)
.with_timestamp(Some(ts()));
let mut metadata = histo.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(histo).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(
KeyString::from("aggregated_histogram.buckets[0].count"),
&Value::from(10)
),
(
KeyString::from("aggregated_histogram.buckets[0].upper_limit"),
&Value::from(1.0)
),
(
KeyString::from("aggregated_histogram.buckets[1].count"),
&Value::from(20)
),
(
KeyString::from("aggregated_histogram.buckets[1].upper_limit"),
&Value::from(2.0)
),
(
KeyString::from("aggregated_histogram.count"),
&Value::from(30)
),
(
KeyString::from("aggregated_histogram.sum"),
&Value::from(50.0)
),
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("histo")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
#[tokio::test]
async fn transform_summary() {
let summary = Metric::new_with_metadata(
"summary",
MetricKind::Absolute,
MetricValue::AggregatedSummary {
quantiles: vector_lib::quantiles![50.0 => 10.0, 90.0 => 20.0],
count: 30,
sum: 50.0,
},
event_metadata(),
)
.with_timestamp(Some(ts()));
let mut metadata = summary.metadata().clone();
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_schema_definition(&Arc::new(schema_definition(LogNamespace::Legacy)));
let log = do_transform(summary).await.unwrap();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
(
KeyString::from("aggregated_summary.count"),
&Value::from(30)
),
(
KeyString::from("aggregated_summary.quantiles[0].quantile"),
&Value::from(50.0)
),
(
KeyString::from("aggregated_summary.quantiles[0].value"),
&Value::from(10.0)
),
(
KeyString::from("aggregated_summary.quantiles[1].quantile"),
&Value::from(90.0)
),
(
KeyString::from("aggregated_summary.quantiles[1].value"),
&Value::from(20.0)
),
(
KeyString::from("aggregated_summary.sum"),
&Value::from(50.0)
),
(KeyString::from("kind"), &Value::from("absolute")),
(KeyString::from("name"), &Value::from("summary")),
(KeyString::from("timestamp"), &Value::from(ts())),
]
);
assert_eq!(log.metadata(), &metadata);
}
proptest! {
#[test]
fn transform_tag_single_encoding(values: TagValueSet) {
let name = random_string(16);
let tags = block_on(transform_tags(
MetricTagValues::Single,
values.iter()
.map(|value| (name.clone(), TagValue::from(value.map(String::from))))
.collect(),
));
let value = values.into_single().map(|value| Value::Bytes(value.into()));
assert_eq!(tags.get(&*name), value.as_ref());
}
#[test]
fn transform_tag_full_encoding(values: TagValueSet) {
let name = random_string(16);
let tags = block_on(transform_tags(
MetricTagValues::Full,
values.iter()
.map(|value| (name.clone(), TagValue::from(value.map(String::from))))
.collect(),
));
let tag = tags.get(&*name);
match values.len() {
0 => assert_eq!(tag, None),
1 => assert_eq!(tag, Some(&tag_to_value(values.into_iter().next().unwrap()))),
_ => assert_eq!(tag, Some(&Value::Array(values.into_iter().map(tag_to_value).collect()))),
}
}
}
fn tag_to_value(tag: TagValue) -> Value {
tag.into_option().into()
}
async fn transform_tags(metric_tag_values: MetricTagValues, tags: MetricTags) -> Value {
let counter = Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(tags))
.with_timestamp(Some(ts()));
let mut output = OutputBuffer::with_capacity(1);
MetricToLogConfig {
metric_tag_values,
..Default::default()
}
.build_transform(&TransformContext::default())
.transform(&mut output, counter.into());
assert_eq!(output.len(), 1);
output.into_events().next().unwrap().into_log()["tags"].clone()
}
}