1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use chrono::TimeZone;
5use ordered_float::NotNan;
6use uuid::Uuid;
7
8use super::{MetricTags, WithMetadata};
9use crate::{event, metrics::AgentDDSketch};
10
11#[allow(warnings, clippy::all, clippy::pedantic)]
12mod proto_event {
13 include!(concat!(env!("OUT_DIR"), "/event.rs"));
14}
15pub use event_wrapper::Event;
16pub use metric::Value as MetricValue;
17pub use proto_event::*;
18use vrl::value::{ObjectMap, Value as VrlValue};
19
20use super::{array, metric::MetricSketch, EventMetadata};
21
22impl event_array::Events {
23 fn from_logs(logs: array::LogArray) -> Self {
26 let logs = logs.into_iter().map(Into::into).collect();
27 Self::Logs(LogArray { logs })
28 }
29
30 fn from_metrics(metrics: array::MetricArray) -> Self {
31 let metrics = metrics.into_iter().map(Into::into).collect();
32 Self::Metrics(MetricArray { metrics })
33 }
34
35 fn from_traces(traces: array::TraceArray) -> Self {
36 let traces = traces.into_iter().map(Into::into).collect();
37 Self::Traces(TraceArray { traces })
38 }
39}
40
41impl From<array::EventArray> for EventArray {
42 fn from(events: array::EventArray) -> Self {
43 let events = Some(match events {
44 array::EventArray::Logs(array) => event_array::Events::from_logs(array),
45 array::EventArray::Metrics(array) => event_array::Events::from_metrics(array),
46 array::EventArray::Traces(array) => event_array::Events::from_traces(array),
47 });
48 Self { events }
49 }
50}
51
52impl From<EventArray> for array::EventArray {
53 fn from(events: EventArray) -> Self {
54 let events = events.events.unwrap();
55
56 match events {
57 event_array::Events::Logs(logs) => {
58 array::EventArray::Logs(logs.logs.into_iter().map(Into::into).collect())
59 }
60 event_array::Events::Metrics(metrics) => {
61 array::EventArray::Metrics(metrics.metrics.into_iter().map(Into::into).collect())
62 }
63 event_array::Events::Traces(traces) => {
64 array::EventArray::Traces(traces.traces.into_iter().map(Into::into).collect())
65 }
66 }
67 }
68}
69
70impl From<Event> for EventWrapper {
71 fn from(event: Event) -> Self {
72 Self { event: Some(event) }
73 }
74}
75
76impl From<Log> for Event {
77 fn from(log: Log) -> Self {
78 Self::Log(log)
79 }
80}
81
82impl From<Metric> for Event {
83 fn from(metric: Metric) -> Self {
84 Self::Metric(metric)
85 }
86}
87
88impl From<Trace> for Event {
89 fn from(trace: Trace) -> Self {
90 Self::Trace(trace)
91 }
92}
93
94impl From<Log> for super::LogEvent {
95 fn from(log: Log) -> Self {
96 #[allow(deprecated)]
97 let metadata = log
98 .metadata_full
99 .map(Into::into)
100 .or_else(|| {
101 log.metadata
102 .and_then(decode_value)
103 .map(EventMetadata::default_with_value)
104 })
105 .unwrap_or_default();
106
107 if let Some(value) = log.value {
108 Self::from_parts(decode_value(value).unwrap_or(VrlValue::Null), metadata)
109 } else {
110 let fields = log
112 .fields
113 .into_iter()
114 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
115 .collect::<ObjectMap>();
116
117 Self::from_map(fields, metadata)
118 }
119 }
120}
121
122impl From<Trace> for super::TraceEvent {
123 fn from(trace: Trace) -> Self {
124 #[allow(deprecated)]
125 let metadata = trace
126 .metadata_full
127 .map(Into::into)
128 .or_else(|| {
129 trace
130 .metadata
131 .and_then(decode_value)
132 .map(EventMetadata::default_with_value)
133 })
134 .unwrap_or_default();
135
136 let fields = trace
137 .fields
138 .into_iter()
139 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
140 .collect::<ObjectMap>();
141
142 Self::from(super::LogEvent::from_map(fields, metadata))
143 }
144}
145
146impl From<MetricValue> for super::MetricValue {
147 fn from(value: MetricValue) -> Self {
148 match value {
149 MetricValue::Counter(counter) => Self::Counter {
150 value: counter.value,
151 },
152 MetricValue::Gauge(gauge) => Self::Gauge { value: gauge.value },
153 MetricValue::Set(set) => Self::Set {
154 values: set.values.into_iter().collect(),
155 },
156 MetricValue::Distribution1(dist) => Self::Distribution {
157 statistic: dist.statistic().into(),
158 samples: super::metric::zip_samples(dist.values, dist.sample_rates),
159 },
160 MetricValue::Distribution2(dist) => Self::Distribution {
161 statistic: dist.statistic().into(),
162 samples: dist.samples.into_iter().map(Into::into).collect(),
163 },
164 MetricValue::AggregatedHistogram1(hist) => Self::AggregatedHistogram {
165 buckets: super::metric::zip_buckets(
166 hist.buckets,
167 hist.counts.iter().map(|h| u64::from(*h)),
168 ),
169 count: u64::from(hist.count),
170 sum: hist.sum,
171 },
172 MetricValue::AggregatedHistogram2(hist) => Self::AggregatedHistogram {
173 buckets: hist.buckets.into_iter().map(Into::into).collect(),
174 count: u64::from(hist.count),
175 sum: hist.sum,
176 },
177 MetricValue::AggregatedHistogram3(hist) => Self::AggregatedHistogram {
178 buckets: hist.buckets.into_iter().map(Into::into).collect(),
179 count: hist.count,
180 sum: hist.sum,
181 },
182 MetricValue::AggregatedSummary1(summary) => Self::AggregatedSummary {
183 quantiles: super::metric::zip_quantiles(summary.quantiles, summary.values),
184 count: u64::from(summary.count),
185 sum: summary.sum,
186 },
187 MetricValue::AggregatedSummary2(summary) => Self::AggregatedSummary {
188 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
189 count: u64::from(summary.count),
190 sum: summary.sum,
191 },
192 MetricValue::AggregatedSummary3(summary) => Self::AggregatedSummary {
193 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
194 count: summary.count,
195 sum: summary.sum,
196 },
197 MetricValue::Sketch(sketch) => match sketch.sketch.unwrap() {
198 sketch::Sketch::AgentDdSketch(ddsketch) => Self::Sketch {
199 sketch: ddsketch.into(),
200 },
201 },
202 }
203 }
204}
205
206impl From<Metric> for super::Metric {
207 fn from(metric: Metric) -> Self {
208 let kind = match metric.kind() {
209 metric::Kind::Incremental => super::MetricKind::Incremental,
210 metric::Kind::Absolute => super::MetricKind::Absolute,
211 };
212
213 let name = metric.name;
214
215 let namespace = (!metric.namespace.is_empty()).then_some(metric.namespace);
216
217 let timestamp = metric.timestamp.map(|ts| {
218 chrono::Utc
219 .timestamp_opt(ts.seconds, ts.nanos as u32)
220 .single()
221 .expect("invalid timestamp")
222 });
223
224 let mut tags = MetricTags(
225 metric
226 .tags_v2
227 .into_iter()
228 .map(|(tag, values)| {
229 (
230 tag,
231 values
232 .values
233 .into_iter()
234 .map(|value| super::metric::TagValue::from(value.value))
235 .collect(),
236 )
237 })
238 .collect(),
239 );
240 tags.extend(metric.tags_v1);
244 let tags = (!tags.is_empty()).then_some(tags);
245
246 let value = super::MetricValue::from(metric.value.unwrap());
247
248 #[allow(deprecated)]
249 let metadata = metric
250 .metadata_full
251 .map(Into::into)
252 .or_else(|| {
253 metric
254 .metadata
255 .and_then(decode_value)
256 .map(EventMetadata::default_with_value)
257 })
258 .unwrap_or_default();
259
260 Self::new_with_metadata(name, kind, value, metadata)
261 .with_namespace(namespace)
262 .with_tags(tags)
263 .with_timestamp(timestamp)
264 .with_interval_ms(std::num::NonZeroU32::new(metric.interval_ms))
265 }
266}
267
268impl From<EventWrapper> for super::Event {
269 fn from(proto: EventWrapper) -> Self {
270 let event = proto.event.unwrap();
271
272 match event {
273 Event::Log(proto) => Self::Log(proto.into()),
274 Event::Metric(proto) => Self::Metric(proto.into()),
275 Event::Trace(proto) => Self::Trace(proto.into()),
276 }
277 }
278}
279
280impl From<super::LogEvent> for Log {
281 fn from(log_event: super::LogEvent) -> Self {
282 WithMetadata::<Self>::from(log_event).data
283 }
284}
285
286impl From<super::TraceEvent> for Trace {
287 fn from(trace: super::TraceEvent) -> Self {
288 WithMetadata::<Self>::from(trace).data
289 }
290}
291
292impl From<super::LogEvent> for WithMetadata<Log> {
293 fn from(log_event: super::LogEvent) -> Self {
294 let (value, metadata) = log_event.into_parts();
295
296 let (fields, value) = if let VrlValue::Object(fields) = value {
304 let fields = fields
306 .into_iter()
307 .map(|(k, v)| (k.into(), encode_value(v)))
308 .collect::<BTreeMap<_, _>>();
309
310 (fields, None)
311 } else {
312 let mut dummy_fields = BTreeMap::new();
316 dummy_fields.insert(".".to_owned(), encode_value(VrlValue::Null));
317
318 (dummy_fields, Some(encode_value(value)))
319 };
320
321 #[allow(deprecated)]
322 let data = Log {
323 fields,
324 value,
325 metadata: Some(encode_value(metadata.value().clone())),
326 metadata_full: Some(metadata.clone().into()),
327 };
328
329 Self { data, metadata }
330 }
331}
332
333impl From<super::TraceEvent> for WithMetadata<Trace> {
334 fn from(trace: super::TraceEvent) -> Self {
335 let (fields, metadata) = trace.into_parts();
336 let fields = fields
337 .into_iter()
338 .map(|(k, v)| (k.into(), encode_value(v)))
339 .collect::<BTreeMap<_, _>>();
340
341 #[allow(deprecated)]
342 let data = Trace {
343 fields,
344 metadata: Some(encode_value(metadata.value().clone())),
345 metadata_full: Some(metadata.clone().into()),
346 };
347
348 Self { data, metadata }
349 }
350}
351
352impl From<super::Metric> for Metric {
353 fn from(metric: super::Metric) -> Self {
354 WithMetadata::<Self>::from(metric).data
355 }
356}
357
358impl From<super::MetricValue> for MetricValue {
359 fn from(value: super::MetricValue) -> Self {
360 match value {
361 super::MetricValue::Counter { value } => Self::Counter(Counter { value }),
362 super::MetricValue::Gauge { value } => Self::Gauge(Gauge { value }),
363 super::MetricValue::Set { values } => Self::Set(Set {
364 values: values.into_iter().collect(),
365 }),
366 super::MetricValue::Distribution { samples, statistic } => {
367 Self::Distribution2(Distribution2 {
368 samples: samples.into_iter().map(Into::into).collect(),
369 statistic: match statistic {
370 super::StatisticKind::Histogram => StatisticKind::Histogram,
371 super::StatisticKind::Summary => StatisticKind::Summary,
372 }
373 .into(),
374 })
375 }
376 super::MetricValue::AggregatedHistogram {
377 buckets,
378 count,
379 sum,
380 } => Self::AggregatedHistogram3(AggregatedHistogram3 {
381 buckets: buckets.into_iter().map(Into::into).collect(),
382 count,
383 sum,
384 }),
385 super::MetricValue::AggregatedSummary {
386 quantiles,
387 count,
388 sum,
389 } => Self::AggregatedSummary3(AggregatedSummary3 {
390 quantiles: quantiles.into_iter().map(Into::into).collect(),
391 count,
392 sum,
393 }),
394 super::MetricValue::Sketch { sketch } => match sketch {
395 MetricSketch::AgentDDSketch(ddsketch) => {
396 let bin_map = ddsketch.bin_map();
397 let (keys, counts) = bin_map.into_parts();
398 let keys = keys.into_iter().map(i32::from).collect();
399 let counts = counts.into_iter().map(u32::from).collect();
400
401 Self::Sketch(Sketch {
402 sketch: Some(sketch::Sketch::AgentDdSketch(sketch::AgentDdSketch {
403 count: ddsketch.count(),
404 min: ddsketch.min().unwrap_or(f64::MAX),
405 max: ddsketch.max().unwrap_or(f64::MIN),
406 sum: ddsketch.sum().unwrap_or(0.0),
407 avg: ddsketch.avg().unwrap_or(0.0),
408 k: keys,
409 n: counts,
410 })),
411 })
412 }
413 },
414 }
415 }
416}
417
418impl From<super::Metric> for WithMetadata<Metric> {
419 fn from(metric: super::Metric) -> Self {
420 let (series, data, metadata) = metric.into_parts();
421 let name = series.name.name;
422 let namespace = series.name.namespace.unwrap_or_default();
423
424 let timestamp = data.time.timestamp.map(|ts| prost_types::Timestamp {
425 seconds: ts.timestamp(),
426 nanos: ts.timestamp_subsec_nanos() as i32,
427 });
428
429 let interval_ms = data.time.interval_ms.map_or(0, std::num::NonZeroU32::get);
430
431 let tags = series.tags.unwrap_or_default();
432
433 let kind = match data.kind {
434 super::MetricKind::Incremental => metric::Kind::Incremental,
435 super::MetricKind::Absolute => metric::Kind::Absolute,
436 }
437 .into();
438
439 let metric = MetricValue::from(data.value);
440
441 let tags_v1 = tags
444 .0
445 .iter()
446 .filter_map(|(tag, values)| {
447 values
448 .as_single()
449 .map(|value| (tag.clone(), value.to_string()))
450 })
451 .collect();
452 let tags_v2 = tags
454 .0
455 .into_iter()
456 .map(|(tag, values)| {
457 let values = values
458 .into_iter()
459 .map(|value| TagValue {
460 value: value.into_option(),
461 })
462 .collect();
463 (tag, TagValues { values })
464 })
465 .collect();
466
467 #[allow(deprecated)]
468 let data = Metric {
469 name,
470 namespace,
471 timestamp,
472 tags_v1,
473 tags_v2,
474 kind,
475 interval_ms,
476 value: Some(metric),
477 metadata: Some(encode_value(metadata.value().clone())),
478 metadata_full: Some(metadata.clone().into()),
479 };
480
481 Self { data, metadata }
482 }
483}
484
485impl From<super::Event> for Event {
486 fn from(event: super::Event) -> Self {
487 WithMetadata::<Self>::from(event).data
488 }
489}
490
491impl From<super::Event> for WithMetadata<Event> {
492 fn from(event: super::Event) -> Self {
493 match event {
494 super::Event::Log(log_event) => WithMetadata::<Log>::from(log_event).into(),
495 super::Event::Metric(metric) => WithMetadata::<Metric>::from(metric).into(),
496 super::Event::Trace(trace) => WithMetadata::<Trace>::from(trace).into(),
497 }
498 }
499}
500
501impl From<super::Event> for EventWrapper {
502 fn from(event: super::Event) -> Self {
503 WithMetadata::<EventWrapper>::from(event).data
504 }
505}
506
507impl From<super::Event> for WithMetadata<EventWrapper> {
508 fn from(event: super::Event) -> Self {
509 WithMetadata::<Event>::from(event).into()
510 }
511}
512
513impl From<AgentDDSketch> for Sketch {
514 fn from(ddsketch: AgentDDSketch) -> Self {
515 let bin_map = ddsketch.bin_map();
516 let (keys, counts) = bin_map.into_parts();
517 let ddsketch = sketch::AgentDdSketch {
518 count: ddsketch.count(),
519 min: ddsketch.min().unwrap_or(f64::MAX),
520 max: ddsketch.max().unwrap_or(f64::MIN),
521 sum: ddsketch.sum().unwrap_or(0.0),
522 avg: ddsketch.avg().unwrap_or(0.0),
523 k: keys.into_iter().map(i32::from).collect(),
524 n: counts.into_iter().map(u32::from).collect(),
525 };
526 Sketch {
527 sketch: Some(sketch::Sketch::AgentDdSketch(ddsketch)),
528 }
529 }
530}
531
532impl From<sketch::AgentDdSketch> for MetricSketch {
533 fn from(sketch: sketch::AgentDdSketch) -> Self {
534 let keys = sketch
537 .k
538 .into_iter()
539 .map(|k| (k, k > 0))
540 .map(|(k, pos)| {
541 k.try_into()
542 .unwrap_or(if pos { i16::MAX } else { i16::MIN })
543 })
544 .collect::<Vec<_>>();
545 let counts = sketch
546 .n
547 .into_iter()
548 .map(|n| n.try_into().unwrap_or(u16::MAX))
549 .collect::<Vec<_>>();
550 MetricSketch::AgentDDSketch(
551 AgentDDSketch::from_raw(
552 sketch.count,
553 sketch.min,
554 sketch.max,
555 sketch.sum,
556 sketch.avg,
557 &keys,
558 &counts,
559 )
560 .expect("keys/counts were unexpectedly mismatched"),
561 )
562 }
563}
564
565impl From<super::metadata::Secrets> for Secrets {
566 fn from(value: super::metadata::Secrets) -> Self {
567 Self {
568 entries: value.into_iter().map(|(k, v)| (k, v.to_string())).collect(),
569 }
570 }
571}
572
573impl From<Secrets> for super::metadata::Secrets {
574 fn from(value: Secrets) -> Self {
575 let mut secrets = Self::new();
576 for (k, v) in value.entries {
577 secrets.insert(k, v);
578 }
579
580 secrets
581 }
582}
583
584impl From<super::DatadogMetricOriginMetadata> for DatadogOriginMetadata {
585 fn from(value: super::DatadogMetricOriginMetadata) -> Self {
586 Self {
587 origin_product: value.product(),
588 origin_category: value.category(),
589 origin_service: value.service(),
590 }
591 }
592}
593
594impl From<DatadogOriginMetadata> for super::DatadogMetricOriginMetadata {
595 fn from(value: DatadogOriginMetadata) -> Self {
596 Self::new(
597 value.origin_product,
598 value.origin_category,
599 value.origin_service,
600 )
601 }
602}
603
604impl From<crate::config::OutputId> for OutputId {
605 fn from(value: crate::config::OutputId) -> Self {
606 Self {
607 component: value.component.into_id(),
608 port: value.port,
609 }
610 }
611}
612
613impl From<OutputId> for crate::config::OutputId {
614 fn from(value: OutputId) -> Self {
615 Self::from((value.component, value.port))
616 }
617}
618
619impl From<EventMetadata> for Metadata {
620 fn from(value: EventMetadata) -> Self {
621 let super::metadata::Inner {
622 value,
623 secrets,
624 source_id,
625 source_type,
626 upstream_id,
627 datadog_origin_metadata,
628 source_event_id,
629 ..
630 } = value.into_owned();
631
632 let secrets = (!secrets.is_empty()).then(|| secrets.into());
633
634 Self {
635 value: Some(encode_value(value)),
636 datadog_origin_metadata: datadog_origin_metadata.map(Into::into),
637 source_id: source_id.map(|s| s.to_string()),
638 source_type: source_type.map(|s| s.to_string()),
639 upstream_id: upstream_id.map(|id| id.as_ref().clone()).map(Into::into),
640 secrets,
641 source_event_id: source_event_id.map_or(vec![], std::convert::Into::into),
642 }
643 }
644}
645
646impl From<Metadata> for EventMetadata {
647 fn from(value: Metadata) -> Self {
648 let mut metadata = EventMetadata::default();
649
650 if let Some(value) = value.value.and_then(decode_value) {
651 *metadata.value_mut() = value;
652 }
653
654 if let Some(source_id) = value.source_id {
655 metadata.set_source_id(Arc::new(source_id.into()));
656 }
657
658 if let Some(source_type) = value.source_type {
659 metadata.set_source_type(source_type);
660 }
661
662 if let Some(upstream_id) = value.upstream_id {
663 metadata.set_upstream_id(Arc::new(upstream_id.into()));
664 }
665
666 if let Some(secrets) = value.secrets {
667 metadata.secrets_mut().merge(secrets.into());
668 }
669
670 if let Some(origin_metadata) = value.datadog_origin_metadata {
671 metadata = metadata.with_origin_metadata(origin_metadata.into());
672 }
673
674 let maybe_source_event_id = if value.source_event_id.is_empty() {
675 None
676 } else {
677 match Uuid::from_slice(&value.source_event_id) {
678 Ok(id) => Some(id),
679 Err(error) => {
680 error!(
681 message = "Failed to parse source_event_id: {}",
682 %error,
683 internal_log_rate_limit = true
684 );
685 None
686 }
687 }
688 };
689 metadata = metadata.with_source_event_id(maybe_source_event_id);
690
691 metadata
692 }
693}
694
695fn decode_value(input: Value) -> Option<super::Value> {
696 match input.kind {
697 Some(value::Kind::RawBytes(data)) => Some(super::Value::Bytes(data)),
698 Some(value::Kind::Timestamp(ts)) => Some(super::Value::Timestamp(
699 chrono::Utc
700 .timestamp_opt(ts.seconds, ts.nanos as u32)
701 .single()
702 .expect("invalid timestamp"),
703 )),
704 Some(value::Kind::Integer(value)) => Some(super::Value::Integer(value)),
705 Some(value::Kind::Float(value)) => Some(super::Value::Float(NotNan::new(value).unwrap())),
706 Some(value::Kind::Boolean(value)) => Some(super::Value::Boolean(value)),
707 Some(value::Kind::Map(map)) => decode_map(map.fields),
708 Some(value::Kind::Array(array)) => decode_array(array.items),
709 Some(value::Kind::Null(_)) => Some(super::Value::Null),
710 None => {
711 error!("Encoded event contains unknown value kind.");
712 None
713 }
714 }
715}
716
717fn decode_map(fields: BTreeMap<String, Value>) -> Option<super::Value> {
718 fields
719 .into_iter()
720 .map(|(key, value)| decode_value(value).map(|value| (key.into(), value)))
721 .collect::<Option<ObjectMap>>()
722 .map(event::Value::Object)
723}
724
725fn decode_array(items: Vec<Value>) -> Option<super::Value> {
726 items
727 .into_iter()
728 .map(decode_value)
729 .collect::<Option<Vec<_>>>()
730 .map(super::Value::Array)
731}
732
733fn encode_value(value: super::Value) -> Value {
734 Value {
735 kind: match value {
736 super::Value::Bytes(b) => Some(value::Kind::RawBytes(b)),
737 super::Value::Regex(regex) => Some(value::Kind::RawBytes(regex.as_bytes())),
738 super::Value::Timestamp(ts) => Some(value::Kind::Timestamp(prost_types::Timestamp {
739 seconds: ts.timestamp(),
740 nanos: ts.timestamp_subsec_nanos() as i32,
741 })),
742 super::Value::Integer(value) => Some(value::Kind::Integer(value)),
743 super::Value::Float(value) => Some(value::Kind::Float(value.into_inner())),
744 super::Value::Boolean(value) => Some(value::Kind::Boolean(value)),
745 super::Value::Object(fields) => Some(value::Kind::Map(encode_map(fields))),
746 super::Value::Array(items) => Some(value::Kind::Array(encode_array(items))),
747 super::Value::Null => Some(value::Kind::Null(ValueNull::NullValue as i32)),
748 },
749 }
750}
751
752fn encode_map(fields: ObjectMap) -> ValueMap {
753 ValueMap {
754 fields: fields
755 .into_iter()
756 .map(|(key, value)| (key.into(), encode_value(value)))
757 .collect(),
758 }
759}
760
761fn encode_array(items: Vec<super::Value>) -> ValueArray {
762 ValueArray {
763 items: items.into_iter().map(encode_value).collect(),
764 }
765}