1use super::proto::{
2 common::v1::{InstrumentationScope, KeyValue},
3 metrics::v1::{
4 metric::Data, number_data_point::Value as NumberDataPointValue, AggregationTemporality,
5 ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, HistogramDataPoint,
6 NumberDataPoint, ResourceMetrics, Sum, Summary, SummaryDataPoint,
7 },
8 resource::v1::Resource,
9};
10use chrono::{TimeZone, Utc};
11use vector_core::event::{
12 metric::{Bucket, Quantile, TagValue},
13 Event, Metric as MetricEvent, MetricKind, MetricTags, MetricValue,
14};
15
16impl ResourceMetrics {
17 pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
18 let resource = self.resource.clone();
19
20 self.scope_metrics
21 .into_iter()
22 .flat_map(move |scope_metrics| {
23 let scope = scope_metrics.scope;
24 let resource = resource.clone();
25
26 scope_metrics.metrics.into_iter().flat_map(move |metric| {
27 let metric_name = metric.name.clone();
28 match metric.data {
29 Some(Data::Gauge(g)) => {
30 Self::convert_gauge(g, &resource, &scope, &metric_name)
31 }
32 Some(Data::Sum(s)) => Self::convert_sum(s, &resource, &scope, &metric_name),
33 Some(Data::Histogram(h)) => {
34 Self::convert_histogram(h, &resource, &scope, &metric_name)
35 }
36 Some(Data::ExponentialHistogram(e)) => {
37 Self::convert_exp_histogram(e, &resource, &scope, &metric_name)
38 }
39 Some(Data::Summary(su)) => {
40 Self::convert_summary(su, &resource, &scope, &metric_name)
41 }
42 _ => Vec::new(),
43 }
44 })
45 })
46 }
47
48 fn convert_gauge(
49 gauge: Gauge,
50 resource: &Option<Resource>,
51 scope: &Option<InstrumentationScope>,
52 metric_name: &str,
53 ) -> Vec<Event> {
54 let resource = resource.clone();
55 let scope = scope.clone();
56 let metric_name = metric_name.to_string();
57
58 gauge
59 .data_points
60 .into_iter()
61 .map(move |point| {
62 GaugeMetric {
63 resource: resource.clone(),
64 scope: scope.clone(),
65 point,
66 }
67 .into_metric(metric_name.clone())
68 })
69 .collect()
70 }
71
72 fn convert_sum(
73 sum: Sum,
74 resource: &Option<Resource>,
75 scope: &Option<InstrumentationScope>,
76 metric_name: &str,
77 ) -> Vec<Event> {
78 let resource = resource.clone();
79 let scope = scope.clone();
80 let metric_name = metric_name.to_string();
81
82 sum.data_points
83 .into_iter()
84 .map(move |point| {
85 SumMetric {
86 aggregation_temporality: sum.aggregation_temporality,
87 resource: resource.clone(),
88 scope: scope.clone(),
89 is_monotonic: sum.is_monotonic,
90 point,
91 }
92 .into_metric(metric_name.clone())
93 })
94 .collect()
95 }
96
97 fn convert_histogram(
98 histogram: Histogram,
99 resource: &Option<Resource>,
100 scope: &Option<InstrumentationScope>,
101 metric_name: &str,
102 ) -> Vec<Event> {
103 let resource = resource.clone();
104 let scope = scope.clone();
105 let metric_name = metric_name.to_string();
106
107 histogram
108 .data_points
109 .into_iter()
110 .map(move |point| {
111 HistogramMetric {
112 aggregation_temporality: histogram.aggregation_temporality,
113 resource: resource.clone(),
114 scope: scope.clone(),
115 point,
116 }
117 .into_metric(metric_name.clone())
118 })
119 .collect()
120 }
121
122 fn convert_exp_histogram(
123 histogram: ExponentialHistogram,
124 resource: &Option<Resource>,
125 scope: &Option<InstrumentationScope>,
126 metric_name: &str,
127 ) -> Vec<Event> {
128 let resource = resource.clone();
129 let scope = scope.clone();
130 let metric_name = metric_name.to_string();
131
132 histogram
133 .data_points
134 .into_iter()
135 .map(move |point| {
136 ExpHistogramMetric {
137 aggregation_temporality: histogram.aggregation_temporality,
138 resource: resource.clone(),
139 scope: scope.clone(),
140 point,
141 }
142 .into_metric(metric_name.clone())
143 })
144 .collect()
145 }
146
147 fn convert_summary(
148 summary: Summary,
149 resource: &Option<Resource>,
150 scope: &Option<InstrumentationScope>,
151 metric_name: &str,
152 ) -> Vec<Event> {
153 let resource = resource.clone();
154 let scope = scope.clone();
155 let metric_name = metric_name.to_string();
156
157 summary
158 .data_points
159 .into_iter()
160 .map(move |point| {
161 SummaryMetric {
162 resource: resource.clone(),
163 scope: scope.clone(),
164 point,
165 }
166 .into_metric(metric_name.clone())
167 })
168 .collect()
169 }
170}
171
172struct GaugeMetric {
173 resource: Option<Resource>,
174 scope: Option<InstrumentationScope>,
175 point: NumberDataPoint,
176}
177
178struct SumMetric {
179 aggregation_temporality: i32,
180 resource: Option<Resource>,
181 scope: Option<InstrumentationScope>,
182 point: NumberDataPoint,
183 is_monotonic: bool,
184}
185
186struct SummaryMetric {
187 resource: Option<Resource>,
188 scope: Option<InstrumentationScope>,
189 point: SummaryDataPoint,
190}
191
192struct HistogramMetric {
193 aggregation_temporality: i32,
194 resource: Option<Resource>,
195 scope: Option<InstrumentationScope>,
196 point: HistogramDataPoint,
197}
198
199struct ExpHistogramMetric {
200 aggregation_temporality: i32,
201 resource: Option<Resource>,
202 scope: Option<InstrumentationScope>,
203 point: ExponentialHistogramDataPoint,
204}
205
206pub fn build_metric_tags(
207 resource: Option<Resource>,
208 scope: Option<InstrumentationScope>,
209 attributes: &[KeyValue],
210) -> MetricTags {
211 let mut tags = MetricTags::default();
212
213 if let Some(res) = resource {
214 for attr in res.attributes {
215 if let Some(value) = &attr.value {
216 if let Some(pb_value) = &value.value {
217 tags.insert(
218 format!("resource.{}", attr.key.clone()),
219 TagValue::from(pb_value.clone()),
220 );
221 }
222 }
223 }
224 }
225
226 if let Some(scope) = scope {
227 if !scope.name.is_empty() {
228 tags.insert("scope.name".to_string(), scope.name);
229 }
230 if !scope.version.is_empty() {
231 tags.insert("scope.version".to_string(), scope.version);
232 }
233 for attr in scope.attributes {
234 if let Some(value) = &attr.value {
235 if let Some(pb_value) = &value.value {
236 tags.insert(
237 format!("scope.{}", attr.key.clone()),
238 TagValue::from(pb_value.clone()),
239 );
240 }
241 }
242 }
243 }
244
245 for attr in attributes {
246 if let Some(value) = &attr.value {
247 if let Some(pb_value) = &value.value {
248 tags.insert(attr.key.clone(), TagValue::from(pb_value.clone()));
249 }
250 }
251 }
252
253 tags
254}
255
256impl SumMetric {
257 fn into_metric(self, metric_name: String) -> Event {
258 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
259 let value = self.point.value.to_f64().unwrap_or(0.0);
260 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
261 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
262 MetricKind::Incremental
263 } else {
264 MetricKind::Absolute
265 };
266
267 let metric_value = if self.is_monotonic {
269 MetricValue::Counter { value }
270 } else {
271 MetricValue::Gauge { value }
272 };
273
274 MetricEvent::new(metric_name, kind, metric_value)
275 .with_tags(Some(attributes))
276 .with_timestamp(timestamp)
277 .into()
278 }
279}
280
281impl GaugeMetric {
282 fn into_metric(self, metric_name: String) -> Event {
283 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
284 let value = self.point.value.to_f64().unwrap_or(0.0);
285 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
286
287 MetricEvent::new(
288 metric_name,
289 MetricKind::Absolute,
290 MetricValue::Gauge { value },
291 )
292 .with_timestamp(timestamp)
293 .with_tags(Some(attributes))
294 .into()
295 }
296}
297
298impl HistogramMetric {
299 fn into_metric(self, metric_name: String) -> Event {
300 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
301 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
302 let buckets = match self.point.bucket_counts.len() {
303 0 => Vec::new(),
304 n => {
305 let mut buckets = Vec::with_capacity(n);
306
307 for (i, &count) in self.point.bucket_counts.iter().enumerate() {
308 let upper_limit = self
310 .point
311 .explicit_bounds
312 .get(i)
313 .copied()
314 .unwrap_or(f64::INFINITY);
315 buckets.push(Bucket { count, upper_limit });
316 }
317
318 buckets
319 }
320 };
321
322 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
323 MetricKind::Incremental
324 } else {
325 MetricKind::Absolute
326 };
327
328 MetricEvent::new(
329 metric_name,
330 kind,
331 MetricValue::AggregatedHistogram {
332 buckets,
333 count: self.point.count,
334 sum: self.point.sum.unwrap_or(0.0),
335 },
336 )
337 .with_timestamp(timestamp)
338 .with_tags(Some(attributes))
339 .into()
340 }
341}
342
343impl ExpHistogramMetric {
344 fn into_metric(self, metric_name: String) -> Event {
345 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
347 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
348
349 let scale = self.point.scale;
350 let base = 2f64.powf(2f64.powi(-scale));
352
353 let mut buckets = Vec::new();
354
355 if let Some(negative_buckets) = self.point.negative {
356 for (i, &count) in negative_buckets.bucket_counts.iter().enumerate() {
357 let index = negative_buckets.offset + i as i32;
358 let upper_limit = -base.powi(index);
359 buckets.push(Bucket { count, upper_limit });
360 }
361 }
362
363 if self.point.zero_count > 0 {
364 buckets.push(Bucket {
365 count: self.point.zero_count,
366 upper_limit: 0.0,
367 });
368 }
369
370 if let Some(positive_buckets) = self.point.positive {
371 for (i, &count) in positive_buckets.bucket_counts.iter().enumerate() {
372 let index = positive_buckets.offset + i as i32;
373 let upper_limit = base.powi(index + 1);
374 buckets.push(Bucket { count, upper_limit });
375 }
376 }
377
378 let kind = if self.aggregation_temporality == AggregationTemporality::Delta as i32 {
379 MetricKind::Incremental
380 } else {
381 MetricKind::Absolute
382 };
383
384 MetricEvent::new(
385 metric_name,
386 kind,
387 MetricValue::AggregatedHistogram {
388 buckets,
389 count: self.point.count,
390 sum: self.point.sum.unwrap_or(0.0),
391 },
392 )
393 .with_timestamp(timestamp)
394 .with_tags(Some(attributes))
395 .into()
396 }
397}
398
399impl SummaryMetric {
400 fn into_metric(self, metric_name: String) -> Event {
401 let timestamp = Some(Utc.timestamp_nanos(self.point.time_unix_nano as i64));
402 let attributes = build_metric_tags(self.resource, self.scope, &self.point.attributes);
403
404 let quantiles: Vec<Quantile> = self
405 .point
406 .quantile_values
407 .iter()
408 .map(|q| Quantile {
409 quantile: q.quantile,
410 value: q.value,
411 })
412 .collect();
413
414 MetricEvent::new(
415 metric_name,
416 MetricKind::Absolute,
417 MetricValue::AggregatedSummary {
418 quantiles,
419 count: self.point.count,
420 sum: self.point.sum,
421 },
422 )
423 .with_timestamp(timestamp)
424 .with_tags(Some(attributes))
425 .into()
426 }
427}
428
429pub trait ToF64 {
430 fn to_f64(self) -> Option<f64>;
431}
432
433impl ToF64 for Option<NumberDataPointValue> {
434 fn to_f64(self) -> Option<f64> {
435 match self {
436 Some(NumberDataPointValue::AsDouble(f)) => Some(f),
437 Some(NumberDataPointValue::AsInt(i)) => Some(i as f64),
438 None => None,
439 }
440 }
441}