opentelemetry_proto/
metrics.rs

1use chrono::{TimeZone, Utc};
2use vector_core::event::{
3    Event, Metric as MetricEvent, MetricKind, MetricTags, MetricValue,
4    metric::{Bucket, Quantile, TagValue},
5};
6
7use super::proto::{
8    common::v1::{InstrumentationScope, KeyValue},
9    metrics::v1::{
10        AggregationTemporality, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
11        Histogram, HistogramDataPoint, NumberDataPoint, ResourceMetrics, Sum, Summary,
12        SummaryDataPoint, metric::Data, number_data_point::Value as NumberDataPointValue,
13    },
14    resource::v1::Resource,
15};
16
17impl ResourceMetrics {
18    pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
19        let resource = self.resource.clone();
20
21        self.scope_metrics
22            .into_iter()
23            .flat_map(move |scope_metrics| {
24                let scope = scope_metrics.scope;
25                let resource = resource.clone();
26
27                scope_metrics.metrics.into_iter().flat_map(move |metric| {
28                    let metric_name = metric.name.clone();
29                    match metric.data {
30                        Some(Data::Gauge(g)) => {
31                            Self::convert_gauge(g, &resource, &scope, &metric_name)
32                        }
33                        Some(Data::Sum(s)) => Self::convert_sum(s, &resource, &scope, &metric_name),
34                        Some(Data::Histogram(h)) => {
35                            Self::convert_histogram(h, &resource, &scope, &metric_name)
36                        }
37                        Some(Data::ExponentialHistogram(e)) => {
38                            Self::convert_exp_histogram(e, &resource, &scope, &metric_name)
39                        }
40                        Some(Data::Summary(su)) => {
41                            Self::convert_summary(su, &resource, &scope, &metric_name)
42                        }
43                        _ => Vec::new(),
44                    }
45                })
46            })
47    }
48
49    fn convert_gauge(
50        gauge: Gauge,
51        resource: &Option<Resource>,
52        scope: &Option<InstrumentationScope>,
53        metric_name: &str,
54    ) -> Vec<Event> {
55        let resource = resource.clone();
56        let scope = scope.clone();
57        let metric_name = metric_name.to_string();
58
59        gauge
60            .data_points
61            .into_iter()
62            .map(move |point| {
63                GaugeMetric {
64                    resource: resource.clone(),
65                    scope: scope.clone(),
66                    point,
67                }
68                .into_metric(metric_name.clone())
69            })
70            .collect()
71    }
72
73    fn convert_sum(
74        sum: Sum,
75        resource: &Option<Resource>,
76        scope: &Option<InstrumentationScope>,
77        metric_name: &str,
78    ) -> Vec<Event> {
79        let resource = resource.clone();
80        let scope = scope.clone();
81        let metric_name = metric_name.to_string();
82
83        sum.data_points
84            .into_iter()
85            .map(move |point| {
86                SumMetric {
87                    aggregation_temporality: sum.aggregation_temporality,
88                    resource: resource.clone(),
89                    scope: scope.clone(),
90                    is_monotonic: sum.is_monotonic,
91                    point,
92                }
93                .into_metric(metric_name.clone())
94            })
95            .collect()
96    }
97
98    fn convert_histogram(
99        histogram: Histogram,
100        resource: &Option<Resource>,
101        scope: &Option<InstrumentationScope>,
102        metric_name: &str,
103    ) -> Vec<Event> {
104        let resource = resource.clone();
105        let scope = scope.clone();
106        let metric_name = metric_name.to_string();
107
108        histogram
109            .data_points
110            .into_iter()
111            .map(move |point| {
112                HistogramMetric {
113                    aggregation_temporality: histogram.aggregation_temporality,
114                    resource: resource.clone(),
115                    scope: scope.clone(),
116                    point,
117                }
118                .into_metric(metric_name.clone())
119            })
120            .collect()
121    }
122
123    fn convert_exp_histogram(
124        histogram: ExponentialHistogram,
125        resource: &Option<Resource>,
126        scope: &Option<InstrumentationScope>,
127        metric_name: &str,
128    ) -> Vec<Event> {
129        let resource = resource.clone();
130        let scope = scope.clone();
131        let metric_name = metric_name.to_string();
132
133        histogram
134            .data_points
135            .into_iter()
136            .map(move |point| {
137                ExpHistogramMetric {
138                    aggregation_temporality: histogram.aggregation_temporality,
139                    resource: resource.clone(),
140                    scope: scope.clone(),
141                    point,
142                }
143                .into_metric(metric_name.clone())
144            })
145            .collect()
146    }
147
148    fn convert_summary(
149        summary: Summary,
150        resource: &Option<Resource>,
151        scope: &Option<InstrumentationScope>,
152        metric_name: &str,
153    ) -> Vec<Event> {
154        let resource = resource.clone();
155        let scope = scope.clone();
156        let metric_name = metric_name.to_string();
157
158        summary
159            .data_points
160            .into_iter()
161            .map(move |point| {
162                SummaryMetric {
163                    resource: resource.clone(),
164                    scope: scope.clone(),
165                    point,
166                }
167                .into_metric(metric_name.clone())
168            })
169            .collect()
170    }
171}
172
173struct GaugeMetric {
174    resource: Option<Resource>,
175    scope: Option<InstrumentationScope>,
176    point: NumberDataPoint,
177}
178
179struct SumMetric {
180    aggregation_temporality: i32,
181    resource: Option<Resource>,
182    scope: Option<InstrumentationScope>,
183    point: NumberDataPoint,
184    is_monotonic: bool,
185}
186
187struct SummaryMetric {
188    resource: Option<Resource>,
189    scope: Option<InstrumentationScope>,
190    point: SummaryDataPoint,
191}
192
193struct HistogramMetric {
194    aggregation_temporality: i32,
195    resource: Option<Resource>,
196    scope: Option<InstrumentationScope>,
197    point: HistogramDataPoint,
198}
199
200struct ExpHistogramMetric {
201    aggregation_temporality: i32,
202    resource: Option<Resource>,
203    scope: Option<InstrumentationScope>,
204    point: ExponentialHistogramDataPoint,
205}
206
207pub fn build_metric_tags(
208    resource: Option<Resource>,
209    scope: Option<InstrumentationScope>,
210    attributes: &[KeyValue],
211) -> MetricTags {
212    let mut tags = MetricTags::default();
213
214    if let Some(res) = resource {
215        for attr in res.attributes {
216            if let Some(value) = &attr.value
217                && let Some(pb_value) = &value.value
218            {
219                tags.insert(
220                    format!("resource.{}", attr.key.clone()),
221                    TagValue::from(pb_value.clone()),
222                );
223            }
224        }
225    }
226
227    if let Some(scope) = scope {
228        if !scope.name.is_empty() {
229            tags.insert("scope.name".to_string(), scope.name);
230        }
231        if !scope.version.is_empty() {
232            tags.insert("scope.version".to_string(), scope.version);
233        }
234        for attr in scope.attributes {
235            if let Some(value) = &attr.value
236                && let Some(pb_value) = &value.value
237            {
238                tags.insert(
239                    format!("scope.{}", attr.key.clone()),
240                    TagValue::from(pb_value.clone()),
241                );
242            }
243        }
244    }
245
246    for attr in attributes {
247        if let Some(value) = &attr.value
248            && let Some(pb_value) = &value.value
249        {
250            tags.insert(attr.key.clone(), TagValue::from(pb_value.clone()));
251        }
252    }
253
254    tags
255}
256
257impl SumMetric {
258    fn into_metric(self, metric_name: String) -> Event {
259        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
260        let value = self.point.value.to_f64().unwrap_or(0.0);
261        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
262        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
263            MetricKind::Incremental
264        } else {
265            MetricKind::Absolute
266        };
267
268        // as per otel doc non_monotonic sum would be better transformed to gauge in time-series
269        let metric_value = if self.is_monotonic {
270            MetricValue::Counter { value }
271        } else {
272            MetricValue::Gauge { value }
273        };
274
275        MetricEvent::new(metric_name, kind, metric_value)
276            .with_tags(Some(attributes))
277            .with_timestamp(timestamp)
278            .into()
279    }
280}
281
282impl GaugeMetric {
283    fn into_metric(self, metric_name: String) -> Event {
284        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
285        let value = self.point.value.to_f64().unwrap_or(0.0);
286        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
287
288        MetricEvent::new(
289            metric_name,
290            MetricKind::Absolute,
291            MetricValue::Gauge { value },
292        )
293        .with_timestamp(timestamp)
294        .with_tags(Some(attributes))
295        .into()
296    }
297}
298
299impl HistogramMetric {
300    fn into_metric(self, metric_name: String) -> Event {
301        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
302        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
303        let buckets = match self.point.bucket_counts.len() {
304            0 => Vec::new(),
305            n => {
306                let mut buckets = Vec::with_capacity(n);
307
308                for (i, &count) in self.point.bucket_counts.iter().enumerate() {
309                    // there are n+1 buckets, since we have -Inf, +Inf on the sides
310                    let upper_limit = self
311                        .point
312                        .explicit_bounds
313                        .get(i)
314                        .copied()
315                        .unwrap_or(f64::INFINITY);
316                    buckets.push(Bucket { count, upper_limit });
317                }
318
319                buckets
320            }
321        };
322
323        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
324            MetricKind::Incremental
325        } else {
326            MetricKind::Absolute
327        };
328
329        MetricEvent::new(
330            metric_name,
331            kind,
332            MetricValue::AggregatedHistogram {
333                buckets,
334                count: self.point.count,
335                sum: self.point.sum.unwrap_or(0.0),
336            },
337        )
338        .with_timestamp(timestamp)
339        .with_tags(Some(attributes))
340        .into()
341    }
342}
343
344impl ExpHistogramMetric {
345    fn into_metric(self, metric_name: String) -> Event {
346        // we have to convert Exponential Histogram to agg histogram using scale and base
347        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
348        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
349
350        let scale = self.point.scale;
351        // from Opentelemetry docs: base = 2**(2**(-scale))
352        let base = 2f64.powf(2f64.powi(-scale));
353
354        let mut buckets = Vec::new();
355
356        if let Some(negative_buckets) = self.point.negative {
357            for (i, &count) in negative_buckets.bucket_counts.iter().enumerate() {
358                let index = negative_buckets.offset + i as i32;
359                let upper_limit = -base.powi(index);
360                buckets.push(Bucket { count, upper_limit });
361            }
362        }
363
364        if self.point.zero_count > 0 {
365            buckets.push(Bucket {
366                count: self.point.zero_count,
367                upper_limit: 0.0,
368            });
369        }
370
371        if let Some(positive_buckets) = self.point.positive {
372            for (i, &count) in positive_buckets.bucket_counts.iter().enumerate() {
373                let index = positive_buckets.offset + i as i32;
374                let upper_limit = base.powi(index + 1);
375                buckets.push(Bucket { count, upper_limit });
376            }
377        }
378
379        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
380            MetricKind::Incremental
381        } else {
382            MetricKind::Absolute
383        };
384
385        MetricEvent::new(
386            metric_name,
387            kind,
388            MetricValue::AggregatedHistogram {
389                buckets,
390                count: self.point.count,
391                sum: self.point.sum.unwrap_or(0.0),
392            },
393        )
394        .with_timestamp(timestamp)
395        .with_tags(Some(attributes))
396        .into()
397    }
398}
399
400impl SummaryMetric {
401    fn into_metric(self, metric_name: String) -> Event {
402        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
403        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
404
405        let quantiles: Vec<Quantile> = self
406            .point
407            .quantile_values
408            .iter()
409            .map(|q| Quantile {
410                quantile: q.quantile,
411                value: q.value,
412            })
413            .collect();
414
415        MetricEvent::new(
416            metric_name,
417            MetricKind::Absolute,
418            MetricValue::AggregatedSummary {
419                quantiles,
420                count: self.point.count,
421                sum: self.point.sum,
422            },
423        )
424        .with_timestamp(timestamp)
425        .with_tags(Some(attributes))
426        .into()
427    }
428}
429
430pub trait ToF64 {
431    fn to_f64(self) -> Option<f64>;
432}
433
434impl ToF64 for Option<NumberDataPointValue> {
435    fn to_f64(self) -> Option<f64> {
436        match self {
437            Some(NumberDataPointValue::AsDouble(f)) => Some(f),
438            Some(NumberDataPointValue::AsInt(i)) => Some(i as f64),
439            None => None,
440        }
441    }
442}