opentelemetry_proto/
metrics.rs

1use super::proto::{
2    common::v1::{InstrumentationScope, KeyValue},
3    metrics::v1::{
4        metric::Data, number_data_point::Value as NumberDataPointValue, AggregationTemporality,
5        ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint,
6        NumberDataPoint, ResourceMetrics, Sum, Summary, SummaryDataPoint,
7    },
8    resource::v1::Resource,
9};
10use chrono::{TimeZone, Utc};
11use vector_core::event::{
12    metric::{Bucket, Quantile, TagValue},
13    Event, Metric as MetricEvent, MetricKind, MetricTags, MetricValue,
14};
15
16impl ResourceMetrics {
17    pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
18        let resource = self.resource.clone();
19
20        self.scope_metrics
21            .into_iter()
22            .flat_map(move |scope_metrics| {
23                let scope = scope_metrics.scope;
24                let resource = resource.clone();
25
26                scope_metrics.metrics.into_iter().flat_map(move |metric| {
27                    let metric_name = metric.name.clone();
28                    match metric.data {
29                        Some(Data::Gauge(g)) => {
30                            Self::convert_gauge(g, &resource, &scope, &metric_name)
31                        }
32                        Some(Data::Sum(s)) => Self::convert_sum(s, &resource, &scope, &metric_name),
33                        Some(Data::Histogram(h)) => {
34                            Self::convert_histogram(h, &resource, &scope, &metric_name)
35                        }
36                        Some(Data::ExponentialHistogram(e)) => {
37                            Self::convert_exp_histogram(e, &resource, &scope, &metric_name)
38                        }
39                        Some(Data::Summary(su)) => {
40                            Self::convert_summary(su, &resource, &scope, &metric_name)
41                        }
42                        _ => Vec::new(),
43                    }
44                })
45            })
46    }
47
48    fn convert_gauge(
49        gauge: Gauge,
50        resource: &Option<Resource>,
51        scope: &Option<InstrumentationScope>,
52        metric_name: &str,
53    ) -> Vec<Event> {
54        let resource = resource.clone();
55        let scope = scope.clone();
56        let metric_name = metric_name.to_string();
57
58        gauge
59            .data_points
60            .into_iter()
61            .map(move |point| {
62                GaugeMetric {
63                    resource: resource.clone(),
64                    scope: scope.clone(),
65                    point,
66                }
67                .into_metric(metric_name.clone())
68            })
69            .collect()
70    }
71
72    fn convert_sum(
73        sum: Sum,
74        resource: &Option<Resource>,
75        scope: &Option<InstrumentationScope>,
76        metric_name: &str,
77    ) -> Vec<Event> {
78        let resource = resource.clone();
79        let scope = scope.clone();
80        let metric_name = metric_name.to_string();
81
82        sum.data_points
83            .into_iter()
84            .map(move |point| {
85                SumMetric {
86                    aggregation_temporality: sum.aggregation_temporality,
87                    resource: resource.clone(),
88                    scope: scope.clone(),
89                    is_monotonic: sum.is_monotonic,
90                    point,
91                }
92                .into_metric(metric_name.clone())
93            })
94            .collect()
95    }
96
97    fn convert_histogram(
98        histogram: Histogram,
99        resource: &Option<Resource>,
100        scope: &Option<InstrumentationScope>,
101        metric_name: &str,
102    ) -> Vec<Event> {
103        let resource = resource.clone();
104        let scope = scope.clone();
105        let metric_name = metric_name.to_string();
106
107        histogram
108            .data_points
109            .into_iter()
110            .map(move |point| {
111                HistogramMetric {
112                    aggregation_temporality: histogram.aggregation_temporality,
113                    resource: resource.clone(),
114                    scope: scope.clone(),
115                    point,
116                }
117                .into_metric(metric_name.clone())
118            })
119            .collect()
120    }
121
122    fn convert_exp_histogram(
123        histogram: ExponentialHistogram,
124        resource: &Option<Resource>,
125        scope: &Option<InstrumentationScope>,
126        metric_name: &str,
127    ) -> Vec<Event> {
128        let resource = resource.clone();
129        let scope = scope.clone();
130        let metric_name = metric_name.to_string();
131
132        histogram
133            .data_points
134            .into_iter()
135            .map(move |point| {
136                ExpHistogramMetric {
137                    aggregation_temporality: histogram.aggregation_temporality,
138                    resource: resource.clone(),
139                    scope: scope.clone(),
140                    point,
141                }
142                .into_metric(metric_name.clone())
143            })
144            .collect()
145    }
146
147    fn convert_summary(
148        summary: Summary,
149        resource: &Option<Resource>,
150        scope: &Option<InstrumentationScope>,
151        metric_name: &str,
152    ) -> Vec<Event> {
153        let resource = resource.clone();
154        let scope = scope.clone();
155        let metric_name = metric_name.to_string();
156
157        summary
158            .data_points
159            .into_iter()
160            .map(move |point| {
161                SummaryMetric {
162                    resource: resource.clone(),
163                    scope: scope.clone(),
164                    point,
165                }
166                .into_metric(metric_name.clone())
167            })
168            .collect()
169    }
170}
171
172struct GaugeMetric {
173    resource: Option<Resource>,
174    scope: Option<InstrumentationScope>,
175    point: NumberDataPoint,
176}
177
178struct SumMetric {
179    aggregation_temporality: i32,
180    resource: Option<Resource>,
181    scope: Option<InstrumentationScope>,
182    point: NumberDataPoint,
183    is_monotonic: bool,
184}
185
186struct SummaryMetric {
187    resource: Option<Resource>,
188    scope: Option<InstrumentationScope>,
189    point: SummaryDataPoint,
190}
191
192struct HistogramMetric {
193    aggregation_temporality: i32,
194    resource: Option<Resource>,
195    scope: Option<InstrumentationScope>,
196    point: HistogramDataPoint,
197}
198
199struct ExpHistogramMetric {
200    aggregation_temporality: i32,
201    resource: Option<Resource>,
202    scope: Option<InstrumentationScope>,
203    point: ExponentialHistogramDataPoint,
204}
205
206pub fn build_metric_tags(
207    resource: Option<Resource>,
208    scope: Option<InstrumentationScope>,
209    attributes: &[KeyValue],
210) -> MetricTags {
211    let mut tags = MetricTags::default();
212
213    if let Some(res) = resource {
214        for attr in res.attributes {
215            if let Some(value) = &attr.value {
216                if let Some(pb_value) = &value.value {
217                    tags.insert(
218                        format!("resource.{}", attr.key.clone()),
219                        TagValue::from(pb_value.clone()),
220                    );
221                }
222            }
223        }
224    }
225
226    if let Some(scope) = scope {
227        if !scope.name.is_empty() {
228            tags.insert("scope.name".to_string(), scope.name);
229        }
230        if !scope.version.is_empty() {
231            tags.insert("scope.version".to_string(), scope.version);
232        }
233        for attr in scope.attributes {
234            if let Some(value) = &attr.value {
235                if let Some(pb_value) = &value.value {
236                    tags.insert(
237                        format!("scope.{}", attr.key.clone()),
238                        TagValue::from(pb_value.clone()),
239                    );
240                }
241            }
242        }
243    }
244
245    for attr in attributes {
246        if let Some(value) = &attr.value {
247            if let Some(pb_value) = &value.value {
248                tags.insert(attr.key.clone(), TagValue::from(pb_value.clone()));
249            }
250        }
251    }
252
253    tags
254}
255
256impl SumMetric {
257    fn into_metric(self, metric_name: String) -> Event {
258        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
259        let value = self.point.value.to_f64().unwrap_or(0.0);
260        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
261        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
262            MetricKind::Incremental
263        } else {
264            MetricKind::Absolute
265        };
266
267        // as per otel doc non_monotonic sum would be better transformed to gauge in time-series
268        let metric_value = if self.is_monotonic {
269            MetricValue::Counter { value }
270        } else {
271            MetricValue::Gauge { value }
272        };
273
274        MetricEvent::new(metric_name, kind, metric_value)
275            .with_tags(Some(attributes))
276            .with_timestamp(timestamp)
277            .into()
278    }
279}
280
281impl GaugeMetric {
282    fn into_metric(self, metric_name: String) -> Event {
283        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
284        let value = self.point.value.to_f64().unwrap_or(0.0);
285        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
286
287        MetricEvent::new(
288            metric_name,
289            MetricKind::Absolute,
290            MetricValue::Gauge { value },
291        )
292        .with_timestamp(timestamp)
293        .with_tags(Some(attributes))
294        .into()
295    }
296}
297
298impl HistogramMetric {
299    fn into_metric(self, metric_name: String) -> Event {
300        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
301        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
302        let buckets = match self.point.bucket_counts.len() {
303            0 => Vec::new(),
304            n => {
305                let mut buckets = Vec::with_capacity(n);
306
307                for (i, &count) in self.point.bucket_counts.iter().enumerate() {
308                    // there are n+1 buckets, since we have -Inf, +Inf on the sides
309                    let upper_limit = self
310                        .point
311                        .explicit_bounds
312                        .get(i)
313                        .copied()
314                        .unwrap_or(f64::INFINITY);
315                    buckets.push(Bucket { count, upper_limit });
316                }
317
318                buckets
319            }
320        };
321
322        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
323            MetricKind::Incremental
324        } else {
325            MetricKind::Absolute
326        };
327
328        MetricEvent::new(
329            metric_name,
330            kind,
331            MetricValue::AggregatedHistogram {
332                buckets,
333                count: self.point.count,
334                sum: self.point.sum.unwrap_or(0.0),
335            },
336        )
337        .with_timestamp(timestamp)
338        .with_tags(Some(attributes))
339        .into()
340    }
341}
342
343impl ExpHistogramMetric {
344    fn into_metric(self, metric_name: String) -> Event {
345        // we have to convert Exponential Histogram to agg histogram using scale and base
346        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
347        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
348
349        let scale = self.point.scale;
350        // from Opentelemetry docs: base = 2**(2**(-scale))
351        let base = 2f64.powf(2f64.powi(-scale));
352
353        let mut buckets = Vec::new();
354
355        if let Some(negative_buckets) = self.point.negative {
356            for (i, &count) in negative_buckets.bucket_counts.iter().enumerate() {
357                let index = negative_buckets.offset + i as i32;
358                let upper_limit = -base.powi(index);
359                buckets.push(Bucket { count, upper_limit });
360            }
361        }
362
363        if self.point.zero_count > 0 {
364            buckets.push(Bucket {
365                count: self.point.zero_count,
366                upper_limit: 0.0,
367            });
368        }
369
370        if let Some(positive_buckets) = self.point.positive {
371            for (i, &count) in positive_buckets.bucket_counts.iter().enumerate() {
372                let index = positive_buckets.offset + i as i32;
373                let upper_limit = base.powi(index + 1);
374                buckets.push(Bucket { count, upper_limit });
375            }
376        }
377
378        let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
379            MetricKind::Incremental
380        } else {
381            MetricKind::Absolute
382        };
383
384        MetricEvent::new(
385            metric_name,
386            kind,
387            MetricValue::AggregatedHistogram {
388                buckets,
389                count: self.point.count,
390                sum: self.point.sum.unwrap_or(0.0),
391            },
392        )
393        .with_timestamp(timestamp)
394        .with_tags(Some(attributes))
395        .into()
396    }
397}
398
399impl SummaryMetric {
400    fn into_metric(self, metric_name: String) -> Event {
401        let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
402        let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
403
404        let quantiles: Vec<Quantile> = self
405            .point
406            .quantile_values
407            .iter()
408            .map(|q| Quantile {
409                quantile: q.quantile,
410                value: q.value,
411            })
412            .collect();
413
414        MetricEvent::new(
415            metric_name,
416            MetricKind::Absolute,
417            MetricValue::AggregatedSummary {
418                quantiles,
419                count: self.point.count,
420                sum: self.point.sum,
421            },
422        )
423        .with_timestamp(timestamp)
424        .with_tags(Some(attributes))
425        .into()
426    }
427}
428
429pub trait ToF64 {
430    fn to_f64(self) -> Option<f64>;
431}
432
433impl ToF64 for Option<NumberDataPointValue> {
434    fn to_f64(self) -> Option<f64> {
435        match self {
436            Some(NumberDataPointValue::AsDouble(f)) => Some(f),
437            Some(NumberDataPointValue::AsInt(i)) => Some(i as f64),
438            None => None,
439        }
440    }
441}