1use chrono::{TimeZone, Utc};
2use vector_core::event::{
3 Event, Metric as MetricEvent, MetricKind, MetricTags, MetricValue,
4 metric::{Bucket, Quantile, TagValue},
5};
6
7use super::proto::{
8 common::v1::{InstrumentationScope, KeyValue},
9 metrics::v1::{
10 AggregationTemporality, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
11 Histogram, HistogramDataPoint, NumberDataPoint, ResourceMetrics, Sum, Summary,
12 SummaryDataPoint, metric::Data, number_data_point::Value as NumberDataPointValue,
13 },
14 resource::v1::Resource,
15};
16
17impl ResourceMetrics {
18 pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
19 let resource = self.resource.clone();
20
21 self.scope_metrics
22 .into_iter()
23 .flat_map(move |scope_metrics| {
24 let scope = scope_metrics.scope;
25 let resource = resource.clone();
26
27 scope_metrics.metrics.into_iter().flat_map(move |metric| {
28 let metric_name = metric.name.clone();
29 match metric.data {
30 Some(Data::Gauge(g)) => {
31 Self::convert_gauge(g, &resource, &scope, &metric_name)
32 }
33 Some(Data::Sum(s)) => Self::convert_sum(s, &resource, &scope, &metric_name),
34 Some(Data::Histogram(h)) => {
35 Self::convert_histogram(h, &resource, &scope, &metric_name)
36 }
37 Some(Data::ExponentialHistogram(e)) => {
38 Self::convert_exp_histogram(e, &resource, &scope, &metric_name)
39 }
40 Some(Data::Summary(su)) => {
41 Self::convert_summary(su, &resource, &scope, &metric_name)
42 }
43 _ => Vec::new(),
44 }
45 })
46 })
47 }
48
49 fn convert_gauge(
50 gauge: Gauge,
51 resource: &Option<Resource>,
52 scope: &Option<InstrumentationScope>,
53 metric_name: &str,
54 ) -> Vec<Event> {
55 let resource = resource.clone();
56 let scope = scope.clone();
57 let metric_name = metric_name.to_string();
58
59 gauge
60 .data_points
61 .into_iter()
62 .map(move |point| {
63 GaugeMetric {
64 resource: resource.clone(),
65 scope: scope.clone(),
66 point,
67 }
68 .into_metric(metric_name.clone())
69 })
70 .collect()
71 }
72
73 fn convert_sum(
74 sum: Sum,
75 resource: &Option<Resource>,
76 scope: &Option<InstrumentationScope>,
77 metric_name: &str,
78 ) -> Vec<Event> {
79 let resource = resource.clone();
80 let scope = scope.clone();
81 let metric_name = metric_name.to_string();
82
83 sum.data_points
84 .into_iter()
85 .map(move |point| {
86 SumMetric {
87 aggregation_temporality: sum.aggregation_temporality,
88 resource: resource.clone(),
89 scope: scope.clone(),
90 is_monotonic: sum.is_monotonic,
91 point,
92 }
93 .into_metric(metric_name.clone())
94 })
95 .collect()
96 }
97
98 fn convert_histogram(
99 histogram: Histogram,
100 resource: &Option<Resource>,
101 scope: &Option<InstrumentationScope>,
102 metric_name: &str,
103 ) -> Vec<Event> {
104 let resource = resource.clone();
105 let scope = scope.clone();
106 let metric_name = metric_name.to_string();
107
108 histogram
109 .data_points
110 .into_iter()
111 .map(move |point| {
112 HistogramMetric {
113 aggregation_temporality: histogram.aggregation_temporality,
114 resource: resource.clone(),
115 scope: scope.clone(),
116 point,
117 }
118 .into_metric(metric_name.clone())
119 })
120 .collect()
121 }
122
123 fn convert_exp_histogram(
124 histogram: ExponentialHistogram,
125 resource: &Option<Resource>,
126 scope: &Option<InstrumentationScope>,
127 metric_name: &str,
128 ) -> Vec<Event> {
129 let resource = resource.clone();
130 let scope = scope.clone();
131 let metric_name = metric_name.to_string();
132
133 histogram
134 .data_points
135 .into_iter()
136 .map(move |point| {
137 ExpHistogramMetric {
138 aggregation_temporality: histogram.aggregation_temporality,
139 resource: resource.clone(),
140 scope: scope.clone(),
141 point,
142 }
143 .into_metric(metric_name.clone())
144 })
145 .collect()
146 }
147
148 fn convert_summary(
149 summary: Summary,
150 resource: &Option<Resource>,
151 scope: &Option<InstrumentationScope>,
152 metric_name: &str,
153 ) -> Vec<Event> {
154 let resource = resource.clone();
155 let scope = scope.clone();
156 let metric_name = metric_name.to_string();
157
158 summary
159 .data_points
160 .into_iter()
161 .map(move |point| {
162 SummaryMetric {
163 resource: resource.clone(),
164 scope: scope.clone(),
165 point,
166 }
167 .into_metric(metric_name.clone())
168 })
169 .collect()
170 }
171}
172
173struct GaugeMetric {
174 resource: Option<Resource>,
175 scope: Option<InstrumentationScope>,
176 point: NumberDataPoint,
177}
178
179struct SumMetric {
180 aggregation_temporality: i32,
181 resource: Option<Resource>,
182 scope: Option<InstrumentationScope>,
183 point: NumberDataPoint,
184 is_monotonic: bool,
185}
186
187struct SummaryMetric {
188 resource: Option<Resource>,
189 scope: Option<InstrumentationScope>,
190 point: SummaryDataPoint,
191}
192
193struct HistogramMetric {
194 aggregation_temporality: i32,
195 resource: Option<Resource>,
196 scope: Option<InstrumentationScope>,
197 point: HistogramDataPoint,
198}
199
200struct ExpHistogramMetric {
201 aggregation_temporality: i32,
202 resource: Option<Resource>,
203 scope: Option<InstrumentationScope>,
204 point: ExponentialHistogramDataPoint,
205}
206
207pub fn build_metric_tags(
208 resource: Option<Resource>,
209 scope: Option<InstrumentationScope>,
210 attributes: &[KeyValue],
211) -> MetricTags {
212 let mut tags = MetricTags::default();
213
214 if let Some(res) = resource {
215 for attr in res.attributes {
216 if let Some(value) = &attr.value
217 && let Some(pb_value) = &value.value
218 {
219 tags.insert(
220 format!("resource.{}", attr.key.clone()),
221 TagValue::from(pb_value.clone()),
222 );
223 }
224 }
225 }
226
227 if let Some(scope) = scope {
228 if !scope.name.is_empty() {
229 tags.insert("scope.name".to_string(), scope.name);
230 }
231 if !scope.version.is_empty() {
232 tags.insert("scope.version".to_string(), scope.version);
233 }
234 for attr in scope.attributes {
235 if let Some(value) = &attr.value
236 && let Some(pb_value) = &value.value
237 {
238 tags.insert(
239 format!("scope.{}", attr.key.clone()),
240 TagValue::from(pb_value.clone()),
241 );
242 }
243 }
244 }
245
246 for attr in attributes {
247 if let Some(value) = &attr.value
248 && let Some(pb_value) = &value.value
249 {
250 tags.insert(attr.key.clone(), TagValue::from(pb_value.clone()));
251 }
252 }
253
254 tags
255}
256
257impl SumMetric {
258 fn into_metric(self, metric_name: String) -> Event {
259 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
260 let value = self.point.value.to_f64().unwrap_or(0.0);
261 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
262 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
263 MetricKind::Incremental
264 } else {
265 MetricKind::Absolute
266 };
267
268 let metric_value = if self.is_monotonic {
270 MetricValue::Counter { value }
271 } else {
272 MetricValue::Gauge { value }
273 };
274
275 MetricEvent::new(metric_name, kind, metric_value)
276 .with_tags(Some(attributes))
277 .with_timestamp(timestamp)
278 .into()
279 }
280}
281
282impl GaugeMetric {
283 fn into_metric(self, metric_name: String) -> Event {
284 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
285 let value = self.point.value.to_f64().unwrap_or(0.0);
286 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
287
288 MetricEvent::new(
289 metric_name,
290 MetricKind::Absolute,
291 MetricValue::Gauge { value },
292 )
293 .with_timestamp(timestamp)
294 .with_tags(Some(attributes))
295 .into()
296 }
297}
298
299impl HistogramMetric {
300 fn into_metric(self, metric_name: String) -> Event {
301 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
302 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
303 let buckets = match self.point.bucket_counts.len() {
304 0 => Vec::new(),
305 n => {
306 let mut buckets = Vec::with_capacity(n);
307
308 for (i, &count) in self.point.bucket_counts.iter().enumerate() {
309 let upper_limit = self
311 .point
312 .explicit_bounds
313 .get(i)
314 .copied()
315 .unwrap_or(f64::INFINITY);
316 buckets.push(Bucket { count, upper_limit });
317 }
318
319 buckets
320 }
321 };
322
323 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
324 MetricKind::Incremental
325 } else {
326 MetricKind::Absolute
327 };
328
329 MetricEvent::new(
330 metric_name,
331 kind,
332 MetricValue::AggregatedHistogram {
333 buckets,
334 count: self.point.count,
335 sum: self.point.sum.unwrap_or(0.0),
336 },
337 )
338 .with_timestamp(timestamp)
339 .with_tags(Some(attributes))
340 .into()
341 }
342}
343
344impl ExpHistogramMetric {
345 fn into_metric(self, metric_name: String) -> Event {
346 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
348 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
349
350 let scale = self.point.scale;
351 let base = 2f64.powf(2f64.powi(-scale));
353
354 let mut buckets = Vec::new();
355
356 if let Some(negative_buckets) = self.point.negative {
357 for (i, &count) in negative_buckets.bucket_counts.iter().enumerate() {
358 let index = negative_buckets.offset + i as i32;
359 let upper_limit = -base.powi(index);
360 buckets.push(Bucket { count, upper_limit });
361 }
362 }
363
364 if self.point.zero_count > 0 {
365 buckets.push(Bucket {
366 count: self.point.zero_count,
367 upper_limit: 0.0,
368 });
369 }
370
371 if let Some(positive_buckets) = self.point.positive {
372 for (i, &count) in positive_buckets.bucket_counts.iter().enumerate() {
373 let index = positive_buckets.offset + i as i32;
374 let upper_limit = base.powi(index + 1);
375 buckets.push(Bucket { count, upper_limit });
376 }
377 }
378
379 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
380 MetricKind::Incremental
381 } else {
382 MetricKind::Absolute
383 };
384
385 MetricEvent::new(
386 metric_name,
387 kind,
388 MetricValue::AggregatedHistogram {
389 buckets,
390 count: self.point.count,
391 sum: self.point.sum.unwrap_or(0.0),
392 },
393 )
394 .with_timestamp(timestamp)
395 .with_tags(Some(attributes))
396 .into()
397 }
398}
399
400impl SummaryMetric {
401 fn into_metric(self, metric_name: String) -> Event {
402 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
403 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
404
405 let quantiles: Vec<Quantile> = self
406 .point
407 .quantile_values
408 .iter()
409 .map(|q| Quantile {
410 quantile: q.quantile,
411 value: q.value,
412 })
413 .collect();
414
415 MetricEvent::new(
416 metric_name,
417 MetricKind::Absolute,
418 MetricValue::AggregatedSummary {
419 quantiles,
420 count: self.point.count,
421 sum: self.point.sum,
422 },
423 )
424 .with_timestamp(timestamp)
425 .with_tags(Some(attributes))
426 .into()
427 }
428}
429
430pub trait ToF64 {
431 fn to_f64(self) -> Option<f64>;
432}
433
434impl ToF64 for Option<NumberDataPointValue> {
435 fn to_f64(self) -> Option<f64> {
436 match self {
437 Some(NumberDataPointValue::AsDouble(f)) => Some(f),
438 Some(NumberDataPointValue::AsInt(i)) => Some(i as f64),
439 None => None,
440 }
441 }
442}