use super::proto::{
common::v1::{InstrumentationScope, KeyValue},
metrics::v1::{
metric::Data, number_data_point::Value as NumberDataPointValue, AggregationTemporality,
ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint,
NumberDataPoint, ResourceMetrics, Sum, Summary, SummaryDataPoint,
},
resource::v1::Resource,
};
use chrono::{TimeZone, Utc};
use vector_core::event::{
metric::{Bucket, Quantile, TagValue},
Event, Metric as MetricEvent, MetricKind, MetricTags, MetricValue,
};
impl ResourceMetrics {
pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
let resource = self.resource.clone();
self.scope_metrics
.into_iter()
.flat_map(move |scope_metrics| {
let scope = scope_metrics.scope;
let resource = resource.clone();
scope_metrics.metrics.into_iter().flat_map(move |metric| {
let metric_name = metric.name.clone();
match metric.data {
Some(Data::Gauge(g)) => {
Self::convert_gauge(g, &resource, &scope, &metric_name)
}
Some(Data::Sum(s)) => Self::convert_sum(s, &resource, &scope, &metric_name),
Some(Data::Histogram(h)) => {
Self::convert_histogram(h, &resource, &scope, &metric_name)
}
Some(Data::ExponentialHistogram(e)) => {
Self::convert_exp_histogram(e, &resource, &scope, &metric_name)
}
Some(Data::Summary(su)) => {
Self::convert_summary(su, &resource, &scope, &metric_name)
}
_ => Vec::new(),
}
})
})
}
fn convert_gauge(
gauge: Gauge,
resource: &Option<Resource>,
scope: &Option<InstrumentationScope>,
metric_name: &str,
) -> Vec<Event> {
let resource = resource.clone();
let scope = scope.clone();
let metric_name = metric_name.to_string();
gauge
.data_points
.into_iter()
.map(move |point| {
GaugeMetric {
resource: resource.clone(),
scope: scope.clone(),
point,
}
.into_metric(metric_name.clone())
})
.collect()
}
fn convert_sum(
sum: Sum,
resource: &Option<Resource>,
scope: &Option<InstrumentationScope>,
metric_name: &str,
) -> Vec<Event> {
let resource = resource.clone();
let scope = scope.clone();
let metric_name = metric_name.to_string();
sum.data_points
.into_iter()
.map(move |point| {
SumMetric {
aggregation_temporality: sum.aggregation_temporality,
resource: resource.clone(),
scope: scope.clone(),
is_monotonic: sum.is_monotonic,
point,
}
.into_metric(metric_name.clone())
})
.collect()
}
fn convert_histogram(
histogram: Histogram,
resource: &Option<Resource>,
scope: &Option<InstrumentationScope>,
metric_name: &str,
) -> Vec<Event> {
let resource = resource.clone();
let scope = scope.clone();
let metric_name = metric_name.to_string();
histogram
.data_points
.into_iter()
.map(move |point| {
HistogramMetric {
aggregation_temporality: histogram.aggregation_temporality,
resource: resource.clone(),
scope: scope.clone(),
point,
}
.into_metric(metric_name.clone())
})
.collect()
}
fn convert_exp_histogram(
histogram: ExponentialHistogram,
resource: &Option<Resource>,
scope: &Option<InstrumentationScope>,
metric_name: &str,
) -> Vec<Event> {
let resource = resource.clone();
let scope = scope.clone();
let metric_name = metric_name.to_string();
histogram
.data_points
.into_iter()
.map(move |point| {
ExpHistogramMetric {
aggregation_temporality: histogram.aggregation_temporality,
resource: resource.clone(),
scope: scope.clone(),
point,
}
.into_metric(metric_name.clone())
})
.collect()
}
fn convert_summary(
summary: Summary,
resource: &Option<Resource>,
scope: &Option<InstrumentationScope>,
metric_name: &str,
) -> Vec<Event> {
let resource = resource.clone();
let scope = scope.clone();
let metric_name = metric_name.to_string();
summary
.data_points
.into_iter()
.map(move |point| {
SummaryMetric {
resource: resource.clone(),
scope: scope.clone(),
point,
}
.into_metric(metric_name.clone())
})
.collect()
}
}
struct GaugeMetric {
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
point: NumberDataPoint,
}
struct SumMetric {
aggregation_temporality: i32,
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
point: NumberDataPoint,
is_monotonic: bool,
}
struct SummaryMetric {
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
point: SummaryDataPoint,
}
struct HistogramMetric {
aggregation_temporality: i32,
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
point: HistogramDataPoint,
}
struct ExpHistogramMetric {
aggregation_temporality: i32,
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
point: ExponentialHistogramDataPoint,
}
pub fn build_metric_tags(
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
attributes: &[KeyValue],
) -> MetricTags {
let mut tags = MetricTags::default();
if let Some(res) = resource {
for attr in res.attributes {
if let Some(value) = &attr.value {
if let Some(pb_value) = &value.value {
tags.insert(
format!("resource.{}", attr.key.clone()),
TagValue::from(pb_value.clone()),
);
}
}
}
}
if let Some(scope) = scope {
if !scope.name.is_empty() {
tags.insert("scope.name".to_string(), scope.name);
}
if !scope.version.is_empty() {
tags.insert("scope.version".to_string(), scope.version);
}
for attr in scope.attributes {
if let Some(value) = &attr.value {
if let Some(pb_value) = &value.value {
tags.insert(
format!("scope.{}", attr.key.clone()),
TagValue::from(pb_value.clone()),
);
}
}
}
}
for attr in attributes {
if let Some(value) = &attr.value {
if let Some(pb_value) = &value.value {
tags.insert(attr.key.clone(), TagValue::from(pb_value.clone()));
}
}
}
tags
}
impl SumMetric {
fn into_metric(self, metric_name: String) -> Event {
let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
let value = self.point.value.to_f64().unwrap_or(0.0);
let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
MetricKind::Incremental
} else {
MetricKind::Absolute
};
let metric_value = if self.is_monotonic {
MetricValue::Counter { value }
} else {
MetricValue::Gauge { value }
};
MetricEvent::new(metric_name, kind, metric_value)
.with_tags(Some(attributes))
.with_timestamp(timestamp)
.into()
}
}
impl GaugeMetric {
fn into_metric(self, metric_name: String) -> Event {
let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
let value = self.point.value.to_f64().unwrap_or(0.0);
let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
MetricEvent::new(
metric_name,
MetricKind::Absolute,
MetricValue::Gauge { value },
)
.with_timestamp(timestamp)
.with_tags(Some(attributes))
.into()
}
}
impl HistogramMetric {
fn into_metric(self, metric_name: String) -> Event {
let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
let buckets = match self.point.bucket_counts.len() {
0 => Vec::new(),
n => {
let mut buckets = Vec::with_capacity(n);
for (i, &count) in self.point.bucket_counts.iter().enumerate() {
let upper_limit = self
.point
.explicit_bounds
.get(i)
.copied()
.unwrap_or(f64::INFINITY);
buckets.push(Bucket { count, upper_limit });
}
buckets
}
};
let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
MetricKind::Incremental
} else {
MetricKind::Absolute
};
MetricEvent::new(
metric_name,
kind,
MetricValue::AggregatedHistogram {
buckets,
count: self.point.count,
sum: self.point.sum.unwrap_or(0.0),
},
)
.with_timestamp(timestamp)
.with_tags(Some(attributes))
.into()
}
}
impl ExpHistogramMetric {
fn into_metric(self, metric_name: String) -> Event {
let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
let scale = self.point.scale;
let base = 2f64.powf(2f64.powi(-scale));
let mut buckets = Vec::new();
if let Some(negative_buckets) = self.point.negative {
for (i, &count) in negative_buckets.bucket_counts.iter().enumerate() {
let index = negative_buckets.offset + i as i32;
let upper_limit = -base.powi(index);
buckets.push(Bucket { count, upper_limit });
}
}
if self.point.zero_count > 0 {
buckets.push(Bucket {
count: self.point.zero_count,
upper_limit: 0.0,
});
}
if let Some(positive_buckets) = self.point.positive {
for (i, &count) in positive_buckets.bucket_counts.iter().enumerate() {
let index = positive_buckets.offset + i as i32;
let upper_limit = base.powi(index + 1);
buckets.push(Bucket { count, upper_limit });
}
}
let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
MetricKind::Incremental
} else {
MetricKind::Absolute
};
MetricEvent::new(
metric_name,
kind,
MetricValue::AggregatedHistogram {
buckets,
count: self.point.count,
sum: self.point.sum.unwrap_or(0.0),
},
)
.with_timestamp(timestamp)
.with_tags(Some(attributes))
.into()
}
}
impl SummaryMetric {
fn into_metric(self, metric_name: String) -> Event {
let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
let quantiles: Vec<Quantile> = self
.point
.quantile_values
.iter()
.map(|q| Quantile {
quantile: q.quantile,
value: q.value,
})
.collect();
MetricEvent::new(
metric_name,
MetricKind::Absolute,
MetricValue::AggregatedSummary {
quantiles,
count: self.point.count,
sum: self.point.sum,
},
)
.with_timestamp(timestamp)
.with_tags(Some(attributes))
.into()
}
}
pub trait ToF64 {
fn to_f64(self) -> Option<f64>;
}
impl ToF64 for Option<NumberDataPointValue> {
fn to_f64(self) -> Option<f64> {
match self {
Some(NumberDataPointValue::AsDouble(f)) => Some(f),
Some(NumberDataPointValue::AsInt(i)) => Some(i as f64),
None => None,
}
}
}