1use std::{collections::BTreeMap, sync::Arc};
2
3use chrono::TimeZone;
4use ordered_float::NotNan;
5use uuid::Uuid;
6
7use super::{MetricTags, WithMetadata};
8use crate::{event, metrics::AgentDDSketch};
9
10#[allow(warnings, clippy::all, clippy::pedantic)]
11mod proto_event {
12 include!(concat!(env!("OUT_DIR"), "/event.rs"));
13}
14pub use event_wrapper::Event;
15pub use metric::Value as MetricValue;
16pub use proto_event::*;
17use vrl::value::{ObjectMap, Value as VrlValue};
18
19use super::{EventMetadata, array, metric::MetricSketch};
20
21impl event_array::Events {
22 fn from_logs(logs: array::LogArray) -> Self {
25 let logs = logs.into_iter().map(Into::into).collect();
26 Self::Logs(LogArray { logs })
27 }
28
29 fn from_metrics(metrics: array::MetricArray) -> Self {
30 let metrics = metrics.into_iter().map(Into::into).collect();
31 Self::Metrics(MetricArray { metrics })
32 }
33
34 fn from_traces(traces: array::TraceArray) -> Self {
35 let traces = traces.into_iter().map(Into::into).collect();
36 Self::Traces(TraceArray { traces })
37 }
38}
39
40impl From<array::EventArray> for EventArray {
41 fn from(events: array::EventArray) -> Self {
42 let events = Some(match events {
43 array::EventArray::Logs(array) => event_array::Events::from_logs(array),
44 array::EventArray::Metrics(array) => event_array::Events::from_metrics(array),
45 array::EventArray::Traces(array) => event_array::Events::from_traces(array),
46 });
47 Self { events }
48 }
49}
50
51impl From<EventArray> for array::EventArray {
52 fn from(events: EventArray) -> Self {
53 let events = events.events.unwrap();
54
55 match events {
56 event_array::Events::Logs(logs) => {
57 array::EventArray::Logs(logs.logs.into_iter().map(Into::into).collect())
58 }
59 event_array::Events::Metrics(metrics) => {
60 array::EventArray::Metrics(metrics.metrics.into_iter().map(Into::into).collect())
61 }
62 event_array::Events::Traces(traces) => {
63 array::EventArray::Traces(traces.traces.into_iter().map(Into::into).collect())
64 }
65 }
66 }
67}
68
69impl From<Event> for EventWrapper {
70 fn from(event: Event) -> Self {
71 Self { event: Some(event) }
72 }
73}
74
75impl From<Log> for Event {
76 fn from(log: Log) -> Self {
77 Self::Log(log)
78 }
79}
80
81impl From<Metric> for Event {
82 fn from(metric: Metric) -> Self {
83 Self::Metric(metric)
84 }
85}
86
87impl From<Trace> for Event {
88 fn from(trace: Trace) -> Self {
89 Self::Trace(trace)
90 }
91}
92
93impl From<Log> for super::LogEvent {
94 fn from(log: Log) -> Self {
95 #[allow(deprecated)]
96 let metadata = log
97 .metadata_full
98 .map(Into::into)
99 .or_else(|| {
100 log.metadata
101 .and_then(decode_value)
102 .map(EventMetadata::default_with_value)
103 })
104 .unwrap_or_default();
105
106 if let Some(value) = log.value {
107 Self::from_parts(decode_value(value).unwrap_or(VrlValue::Null), metadata)
108 } else {
109 let fields = log
111 .fields
112 .into_iter()
113 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
114 .collect::<ObjectMap>();
115
116 Self::from_map(fields, metadata)
117 }
118 }
119}
120
121impl From<Trace> for super::TraceEvent {
122 fn from(trace: Trace) -> Self {
123 #[allow(deprecated)]
124 let metadata = trace
125 .metadata_full
126 .map(Into::into)
127 .or_else(|| {
128 trace
129 .metadata
130 .and_then(decode_value)
131 .map(EventMetadata::default_with_value)
132 })
133 .unwrap_or_default();
134
135 let fields = trace
136 .fields
137 .into_iter()
138 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
139 .collect::<ObjectMap>();
140
141 Self::from(super::LogEvent::from_map(fields, metadata))
142 }
143}
144
145impl From<MetricValue> for super::MetricValue {
146 fn from(value: MetricValue) -> Self {
147 match value {
148 MetricValue::Counter(counter) => Self::Counter {
149 value: counter.value,
150 },
151 MetricValue::Gauge(gauge) => Self::Gauge { value: gauge.value },
152 MetricValue::Set(set) => Self::Set {
153 values: set.values.into_iter().collect(),
154 },
155 MetricValue::Distribution1(dist) => Self::Distribution {
156 statistic: dist.statistic().into(),
157 samples: super::metric::zip_samples(dist.values, dist.sample_rates),
158 },
159 MetricValue::Distribution2(dist) => Self::Distribution {
160 statistic: dist.statistic().into(),
161 samples: dist.samples.into_iter().map(Into::into).collect(),
162 },
163 MetricValue::AggregatedHistogram1(hist) => Self::AggregatedHistogram {
164 buckets: super::metric::zip_buckets(
165 hist.buckets,
166 hist.counts.iter().map(|h| u64::from(*h)),
167 ),
168 count: u64::from(hist.count),
169 sum: hist.sum,
170 },
171 MetricValue::AggregatedHistogram2(hist) => Self::AggregatedHistogram {
172 buckets: hist.buckets.into_iter().map(Into::into).collect(),
173 count: u64::from(hist.count),
174 sum: hist.sum,
175 },
176 MetricValue::AggregatedHistogram3(hist) => Self::AggregatedHistogram {
177 buckets: hist.buckets.into_iter().map(Into::into).collect(),
178 count: hist.count,
179 sum: hist.sum,
180 },
181 MetricValue::AggregatedSummary1(summary) => Self::AggregatedSummary {
182 quantiles: super::metric::zip_quantiles(summary.quantiles, summary.values),
183 count: u64::from(summary.count),
184 sum: summary.sum,
185 },
186 MetricValue::AggregatedSummary2(summary) => Self::AggregatedSummary {
187 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
188 count: u64::from(summary.count),
189 sum: summary.sum,
190 },
191 MetricValue::AggregatedSummary3(summary) => Self::AggregatedSummary {
192 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
193 count: summary.count,
194 sum: summary.sum,
195 },
196 MetricValue::Sketch(sketch) => match sketch.sketch.unwrap() {
197 sketch::Sketch::AgentDdSketch(ddsketch) => Self::Sketch {
198 sketch: ddsketch.into(),
199 },
200 },
201 }
202 }
203}
204
205impl From<Metric> for super::Metric {
206 fn from(metric: Metric) -> Self {
207 let kind = match metric.kind() {
208 metric::Kind::Incremental => super::MetricKind::Incremental,
209 metric::Kind::Absolute => super::MetricKind::Absolute,
210 };
211
212 let name = metric.name;
213
214 let namespace = (!metric.namespace.is_empty()).then_some(metric.namespace);
215
216 let timestamp = metric.timestamp.map(|ts| {
217 chrono::Utc
218 .timestamp_opt(ts.seconds, ts.nanos as u32)
219 .single()
220 .expect("invalid timestamp")
221 });
222
223 let mut tags = MetricTags(
224 metric
225 .tags_v2
226 .into_iter()
227 .map(|(tag, values)| {
228 (
229 tag,
230 values
231 .values
232 .into_iter()
233 .map(|value| super::metric::TagValue::from(value.value))
234 .collect(),
235 )
236 })
237 .collect(),
238 );
239 tags.extend(metric.tags_v1);
243 let tags = (!tags.is_empty()).then_some(tags);
244
245 let value = super::MetricValue::from(metric.value.unwrap());
246
247 #[allow(deprecated)]
248 let metadata = metric
249 .metadata_full
250 .map(Into::into)
251 .or_else(|| {
252 metric
253 .metadata
254 .and_then(decode_value)
255 .map(EventMetadata::default_with_value)
256 })
257 .unwrap_or_default();
258
259 Self::new_with_metadata(name, kind, value, metadata)
260 .with_namespace(namespace)
261 .with_tags(tags)
262 .with_timestamp(timestamp)
263 .with_interval_ms(std::num::NonZeroU32::new(metric.interval_ms))
264 }
265}
266
267impl From<EventWrapper> for super::Event {
268 fn from(proto: EventWrapper) -> Self {
269 let event = proto.event.unwrap();
270
271 match event {
272 Event::Log(proto) => Self::Log(proto.into()),
273 Event::Metric(proto) => Self::Metric(proto.into()),
274 Event::Trace(proto) => Self::Trace(proto.into()),
275 }
276 }
277}
278
279impl From<super::LogEvent> for Log {
280 fn from(log_event: super::LogEvent) -> Self {
281 WithMetadata::<Self>::from(log_event).data
282 }
283}
284
285impl From<super::TraceEvent> for Trace {
286 fn from(trace: super::TraceEvent) -> Self {
287 WithMetadata::<Self>::from(trace).data
288 }
289}
290
291impl From<super::LogEvent> for WithMetadata<Log> {
292 fn from(log_event: super::LogEvent) -> Self {
293 let (value, metadata) = log_event.into_parts();
294
295 let (fields, value) = if let VrlValue::Object(fields) = value {
303 let fields = fields
305 .into_iter()
306 .map(|(k, v)| (k.into(), encode_value(v)))
307 .collect::<BTreeMap<_, _>>();
308
309 (fields, None)
310 } else {
311 let mut dummy_fields = BTreeMap::new();
315 dummy_fields.insert(".".to_owned(), encode_value(VrlValue::Null));
316
317 (dummy_fields, Some(encode_value(value)))
318 };
319
320 #[allow(deprecated)]
321 let data = Log {
322 fields,
323 value,
324 metadata: Some(encode_value(metadata.value().clone())),
325 metadata_full: Some(metadata.clone().into()),
326 };
327
328 Self { data, metadata }
329 }
330}
331
332impl From<super::TraceEvent> for WithMetadata<Trace> {
333 fn from(trace: super::TraceEvent) -> Self {
334 let (fields, metadata) = trace.into_parts();
335 let fields = fields
336 .into_iter()
337 .map(|(k, v)| (k.into(), encode_value(v)))
338 .collect::<BTreeMap<_, _>>();
339
340 #[allow(deprecated)]
341 let data = Trace {
342 fields,
343 metadata: Some(encode_value(metadata.value().clone())),
344 metadata_full: Some(metadata.clone().into()),
345 };
346
347 Self { data, metadata }
348 }
349}
350
351impl From<super::Metric> for Metric {
352 fn from(metric: super::Metric) -> Self {
353 WithMetadata::<Self>::from(metric).data
354 }
355}
356
357impl From<super::MetricValue> for MetricValue {
358 fn from(value: super::MetricValue) -> Self {
359 match value {
360 super::MetricValue::Counter { value } => Self::Counter(Counter { value }),
361 super::MetricValue::Gauge { value } => Self::Gauge(Gauge { value }),
362 super::MetricValue::Set { values } => Self::Set(Set {
363 values: values.into_iter().collect(),
364 }),
365 super::MetricValue::Distribution { samples, statistic } => {
366 Self::Distribution2(Distribution2 {
367 samples: samples.into_iter().map(Into::into).collect(),
368 statistic: match statistic {
369 super::StatisticKind::Histogram => StatisticKind::Histogram,
370 super::StatisticKind::Summary => StatisticKind::Summary,
371 }
372 .into(),
373 })
374 }
375 super::MetricValue::AggregatedHistogram {
376 buckets,
377 count,
378 sum,
379 } => Self::AggregatedHistogram3(AggregatedHistogram3 {
380 buckets: buckets.into_iter().map(Into::into).collect(),
381 count,
382 sum,
383 }),
384 super::MetricValue::AggregatedSummary {
385 quantiles,
386 count,
387 sum,
388 } => Self::AggregatedSummary3(AggregatedSummary3 {
389 quantiles: quantiles.into_iter().map(Into::into).collect(),
390 count,
391 sum,
392 }),
393 super::MetricValue::Sketch { sketch } => match sketch {
394 MetricSketch::AgentDDSketch(ddsketch) => {
395 let bin_map = ddsketch.bin_map();
396 let (keys, counts) = bin_map.into_parts();
397 let keys = keys.into_iter().map(i32::from).collect();
398 let counts = counts.into_iter().map(u32::from).collect();
399
400 Self::Sketch(Sketch {
401 sketch: Some(sketch::Sketch::AgentDdSketch(sketch::AgentDdSketch {
402 count: ddsketch.count(),
403 min: ddsketch.min().unwrap_or(f64::MAX),
404 max: ddsketch.max().unwrap_or(f64::MIN),
405 sum: ddsketch.sum().unwrap_or(0.0),
406 avg: ddsketch.avg().unwrap_or(0.0),
407 k: keys,
408 n: counts,
409 })),
410 })
411 }
412 },
413 }
414 }
415}
416
417impl From<super::Metric> for WithMetadata<Metric> {
418 fn from(metric: super::Metric) -> Self {
419 let (series, data, metadata) = metric.into_parts();
420 let name = series.name.name;
421 let namespace = series.name.namespace.unwrap_or_default();
422
423 let timestamp = data.time.timestamp.map(|ts| prost_types::Timestamp {
424 seconds: ts.timestamp(),
425 nanos: ts.timestamp_subsec_nanos() as i32,
426 });
427
428 let interval_ms = data.time.interval_ms.map_or(0, std::num::NonZeroU32::get);
429
430 let tags = series.tags.unwrap_or_default();
431
432 let kind = match data.kind {
433 super::MetricKind::Incremental => metric::Kind::Incremental,
434 super::MetricKind::Absolute => metric::Kind::Absolute,
435 }
436 .into();
437
438 let metric = MetricValue::from(data.value);
439
440 let tags_v1 = tags
443 .0
444 .iter()
445 .filter_map(|(tag, values)| {
446 values
447 .as_single()
448 .map(|value| (tag.clone(), value.to_string()))
449 })
450 .collect();
451 let tags_v2 = tags
453 .0
454 .into_iter()
455 .map(|(tag, values)| {
456 let values = values
457 .into_iter()
458 .map(|value| TagValue {
459 value: value.into_option(),
460 })
461 .collect();
462 (tag, TagValues { values })
463 })
464 .collect();
465
466 #[allow(deprecated)]
467 let data = Metric {
468 name,
469 namespace,
470 timestamp,
471 tags_v1,
472 tags_v2,
473 kind,
474 interval_ms,
475 value: Some(metric),
476 metadata: Some(encode_value(metadata.value().clone())),
477 metadata_full: Some(metadata.clone().into()),
478 };
479
480 Self { data, metadata }
481 }
482}
483
484impl From<super::Event> for Event {
485 fn from(event: super::Event) -> Self {
486 WithMetadata::<Self>::from(event).data
487 }
488}
489
490impl From<super::Event> for WithMetadata<Event> {
491 fn from(event: super::Event) -> Self {
492 match event {
493 super::Event::Log(log_event) => WithMetadata::<Log>::from(log_event).into(),
494 super::Event::Metric(metric) => WithMetadata::<Metric>::from(metric).into(),
495 super::Event::Trace(trace) => WithMetadata::<Trace>::from(trace).into(),
496 }
497 }
498}
499
500impl From<super::Event> for EventWrapper {
501 fn from(event: super::Event) -> Self {
502 WithMetadata::<EventWrapper>::from(event).data
503 }
504}
505
506impl From<super::Event> for WithMetadata<EventWrapper> {
507 fn from(event: super::Event) -> Self {
508 WithMetadata::<Event>::from(event).into()
509 }
510}
511
512impl From<AgentDDSketch> for Sketch {
513 fn from(ddsketch: AgentDDSketch) -> Self {
514 let bin_map = ddsketch.bin_map();
515 let (keys, counts) = bin_map.into_parts();
516 let ddsketch = sketch::AgentDdSketch {
517 count: ddsketch.count(),
518 min: ddsketch.min().unwrap_or(f64::MAX),
519 max: ddsketch.max().unwrap_or(f64::MIN),
520 sum: ddsketch.sum().unwrap_or(0.0),
521 avg: ddsketch.avg().unwrap_or(0.0),
522 k: keys.into_iter().map(i32::from).collect(),
523 n: counts.into_iter().map(u32::from).collect(),
524 };
525 Sketch {
526 sketch: Some(sketch::Sketch::AgentDdSketch(ddsketch)),
527 }
528 }
529}
530
531impl From<sketch::AgentDdSketch> for MetricSketch {
532 fn from(sketch: sketch::AgentDdSketch) -> Self {
533 let keys = sketch
536 .k
537 .into_iter()
538 .map(|k| (k, k > 0))
539 .map(|(k, pos)| {
540 k.try_into()
541 .unwrap_or(if pos { i16::MAX } else { i16::MIN })
542 })
543 .collect::<Vec<_>>();
544 let counts = sketch
545 .n
546 .into_iter()
547 .map(|n| n.try_into().unwrap_or(u16::MAX))
548 .collect::<Vec<_>>();
549 MetricSketch::AgentDDSketch(
550 AgentDDSketch::from_raw(
551 sketch.count,
552 sketch.min,
553 sketch.max,
554 sketch.sum,
555 sketch.avg,
556 &keys,
557 &counts,
558 )
559 .expect("keys/counts were unexpectedly mismatched"),
560 )
561 }
562}
563
564impl From<super::metadata::Secrets> for Secrets {
565 fn from(value: super::metadata::Secrets) -> Self {
566 Self {
567 entries: value.into_iter().map(|(k, v)| (k, v.to_string())).collect(),
568 }
569 }
570}
571
572impl From<Secrets> for super::metadata::Secrets {
573 fn from(value: Secrets) -> Self {
574 let mut secrets = Self::new();
575 for (k, v) in value.entries {
576 secrets.insert(k, v);
577 }
578
579 secrets
580 }
581}
582
583impl From<super::DatadogMetricOriginMetadata> for DatadogOriginMetadata {
584 fn from(value: super::DatadogMetricOriginMetadata) -> Self {
585 Self {
586 origin_product: value.product(),
587 origin_category: value.category(),
588 origin_service: value.service(),
589 }
590 }
591}
592
593impl From<DatadogOriginMetadata> for super::DatadogMetricOriginMetadata {
594 fn from(value: DatadogOriginMetadata) -> Self {
595 Self::new(
596 value.origin_product,
597 value.origin_category,
598 value.origin_service,
599 )
600 }
601}
602
603impl From<crate::config::OutputId> for OutputId {
604 fn from(value: crate::config::OutputId) -> Self {
605 Self {
606 component: value.component.into_id(),
607 port: value.port,
608 }
609 }
610}
611
612impl From<OutputId> for crate::config::OutputId {
613 fn from(value: OutputId) -> Self {
614 Self::from((value.component, value.port))
615 }
616}
617
618impl From<EventMetadata> for Metadata {
619 fn from(value: EventMetadata) -> Self {
620 let super::metadata::Inner {
621 value,
622 secrets,
623 source_id,
624 source_type,
625 upstream_id,
626 datadog_origin_metadata,
627 source_event_id,
628 ..
629 } = value.into_owned();
630
631 let secrets = (!secrets.is_empty()).then(|| secrets.into());
632
633 Self {
634 value: Some(encode_value(value)),
635 datadog_origin_metadata: datadog_origin_metadata.map(Into::into),
636 source_id: source_id.map(|s| s.to_string()),
637 source_type: source_type.map(|s| s.to_string()),
638 upstream_id: upstream_id.map(|id| id.as_ref().clone()).map(Into::into),
639 secrets,
640 source_event_id: source_event_id.map_or(vec![], std::convert::Into::into),
641 }
642 }
643}
644
645impl From<Metadata> for EventMetadata {
646 fn from(value: Metadata) -> Self {
647 let mut metadata = EventMetadata::default();
648
649 if let Some(value) = value.value.and_then(decode_value) {
650 *metadata.value_mut() = value;
651 }
652
653 if let Some(source_id) = value.source_id {
654 metadata.set_source_id(Arc::new(source_id.into()));
655 }
656
657 if let Some(source_type) = value.source_type {
658 metadata.set_source_type(source_type);
659 }
660
661 if let Some(upstream_id) = value.upstream_id {
662 metadata.set_upstream_id(Arc::new(upstream_id.into()));
663 }
664
665 if let Some(secrets) = value.secrets {
666 metadata.secrets_mut().merge(secrets.into());
667 }
668
669 if let Some(origin_metadata) = value.datadog_origin_metadata {
670 metadata = metadata.with_origin_metadata(origin_metadata.into());
671 }
672
673 let maybe_source_event_id = if value.source_event_id.is_empty() {
674 None
675 } else {
676 match Uuid::from_slice(&value.source_event_id) {
677 Ok(id) => Some(id),
678 Err(error) => {
679 error!(
680 message = "Failed to parse source_event_id: {}",
681 %error,
682 internal_log_rate_limit = true
683 );
684 None
685 }
686 }
687 };
688 metadata = metadata.with_source_event_id(maybe_source_event_id);
689
690 metadata
691 }
692}
693
694fn decode_value(input: Value) -> Option<super::Value> {
695 match input.kind {
696 Some(value::Kind::RawBytes(data)) => Some(super::Value::Bytes(data)),
697 Some(value::Kind::Timestamp(ts)) => Some(super::Value::Timestamp(
698 chrono::Utc
699 .timestamp_opt(ts.seconds, ts.nanos as u32)
700 .single()
701 .expect("invalid timestamp"),
702 )),
703 Some(value::Kind::Integer(value)) => Some(super::Value::Integer(value)),
704 Some(value::Kind::Float(value)) => Some(super::Value::Float(NotNan::new(value).unwrap())),
705 Some(value::Kind::Boolean(value)) => Some(super::Value::Boolean(value)),
706 Some(value::Kind::Map(map)) => decode_map(map.fields),
707 Some(value::Kind::Array(array)) => decode_array(array.items),
708 Some(value::Kind::Null(_)) => Some(super::Value::Null),
709 None => {
710 error!("Encoded event contains unknown value kind.");
711 None
712 }
713 }
714}
715
716fn decode_map(fields: BTreeMap<String, Value>) -> Option<super::Value> {
717 fields
718 .into_iter()
719 .map(|(key, value)| decode_value(value).map(|value| (key.into(), value)))
720 .collect::<Option<ObjectMap>>()
721 .map(event::Value::Object)
722}
723
724fn decode_array(items: Vec<Value>) -> Option<super::Value> {
725 items
726 .into_iter()
727 .map(decode_value)
728 .collect::<Option<Vec<_>>>()
729 .map(super::Value::Array)
730}
731
732fn encode_value(value: super::Value) -> Value {
733 Value {
734 kind: match value {
735 super::Value::Bytes(b) => Some(value::Kind::RawBytes(b)),
736 super::Value::Regex(regex) => Some(value::Kind::RawBytes(regex.as_bytes())),
737 super::Value::Timestamp(ts) => Some(value::Kind::Timestamp(prost_types::Timestamp {
738 seconds: ts.timestamp(),
739 nanos: ts.timestamp_subsec_nanos() as i32,
740 })),
741 super::Value::Integer(value) => Some(value::Kind::Integer(value)),
742 super::Value::Float(value) => Some(value::Kind::Float(value.into_inner())),
743 super::Value::Boolean(value) => Some(value::Kind::Boolean(value)),
744 super::Value::Object(fields) => Some(value::Kind::Map(encode_map(fields))),
745 super::Value::Array(items) => Some(value::Kind::Array(encode_array(items))),
746 super::Value::Null => Some(value::Kind::Null(ValueNull::NullValue as i32)),
747 },
748 }
749}
750
751fn encode_map(fields: ObjectMap) -> ValueMap {
752 ValueMap {
753 fields: fields
754 .into_iter()
755 .map(|(key, value)| (key.into(), encode_value(value)))
756 .collect(),
757 }
758}
759
760fn encode_array(items: Vec<super::Value>) -> ValueArray {
761 ValueArray {
762 items: items.into_iter().map(encode_value).collect(),
763 }
764}