1use std::{collections::BTreeMap, sync::Arc};
2
3use chrono::TimeZone;
4use ordered_float::NotNan;
5use uuid::Uuid;
6
7use super::{MetricTags, WithMetadata};
8use crate::{event, metrics::AgentDDSketch};
9
10#[allow(warnings, clippy::all, clippy::pedantic)]
11mod proto_event {
12 include!(concat!(env!("OUT_DIR"), "/event.rs"));
13}
14pub use event_wrapper::Event;
15pub use metric::Value as MetricValue;
16pub use proto_event::*;
17use vrl::value::{ObjectMap, Value as VrlValue};
18
19use super::EventFinalizers;
20use super::metadata::{Inner, default_schema_definition};
21use super::{EventMetadata, array, metric::MetricSketch};
22
23impl event_array::Events {
24 fn from_logs(logs: array::LogArray) -> Self {
27 let logs = logs.into_iter().map(Into::into).collect();
28 Self::Logs(LogArray { logs })
29 }
30
31 fn from_metrics(metrics: array::MetricArray) -> Self {
32 let metrics = metrics.into_iter().map(Into::into).collect();
33 Self::Metrics(MetricArray { metrics })
34 }
35
36 fn from_traces(traces: array::TraceArray) -> Self {
37 let traces = traces.into_iter().map(Into::into).collect();
38 Self::Traces(TraceArray { traces })
39 }
40}
41
42impl From<array::EventArray> for EventArray {
43 fn from(events: array::EventArray) -> Self {
44 let events = Some(match events {
45 array::EventArray::Logs(array) => event_array::Events::from_logs(array),
46 array::EventArray::Metrics(array) => event_array::Events::from_metrics(array),
47 array::EventArray::Traces(array) => event_array::Events::from_traces(array),
48 });
49 Self { events }
50 }
51}
52
53impl From<EventArray> for array::EventArray {
54 fn from(events: EventArray) -> Self {
55 let events = events.events.unwrap();
56
57 match events {
58 event_array::Events::Logs(logs) => {
59 array::EventArray::Logs(logs.logs.into_iter().map(Into::into).collect())
60 }
61 event_array::Events::Metrics(metrics) => {
62 array::EventArray::Metrics(metrics.metrics.into_iter().map(Into::into).collect())
63 }
64 event_array::Events::Traces(traces) => {
65 array::EventArray::Traces(traces.traces.into_iter().map(Into::into).collect())
66 }
67 }
68 }
69}
70
71impl From<Event> for EventWrapper {
72 fn from(event: Event) -> Self {
73 Self { event: Some(event) }
74 }
75}
76
77impl From<Log> for Event {
78 fn from(log: Log) -> Self {
79 Self::Log(log)
80 }
81}
82
83impl From<Metric> for Event {
84 fn from(metric: Metric) -> Self {
85 Self::Metric(metric)
86 }
87}
88
89impl From<Trace> for Event {
90 fn from(trace: Trace) -> Self {
91 Self::Trace(trace)
92 }
93}
94
95impl From<Log> for super::LogEvent {
96 fn from(log: Log) -> Self {
97 #[allow(deprecated)]
98 let metadata = log
99 .metadata_full
100 .map(Into::into)
101 .or_else(|| {
102 log.metadata
103 .and_then(decode_value)
104 .map(EventMetadata::default_with_value)
105 })
106 .unwrap_or_default();
107
108 if let Some(value) = log.value {
109 Self::from_parts(decode_value(value).unwrap_or(VrlValue::Null), metadata)
110 } else {
111 let fields = log
113 .fields
114 .into_iter()
115 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
116 .collect::<ObjectMap>();
117
118 Self::from_map(fields, metadata)
119 }
120 }
121}
122
123impl From<Trace> for super::TraceEvent {
124 fn from(trace: Trace) -> Self {
125 #[allow(deprecated)]
126 let metadata = trace
127 .metadata_full
128 .map(Into::into)
129 .or_else(|| {
130 trace
131 .metadata
132 .and_then(decode_value)
133 .map(EventMetadata::default_with_value)
134 })
135 .unwrap_or_default();
136
137 let fields = trace
138 .fields
139 .into_iter()
140 .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value)))
141 .collect::<ObjectMap>();
142
143 Self::from(super::LogEvent::from_map(fields, metadata))
144 }
145}
146
147impl From<MetricValue> for super::MetricValue {
148 fn from(value: MetricValue) -> Self {
149 match value {
150 MetricValue::Counter(counter) => Self::Counter {
151 value: counter.value,
152 },
153 MetricValue::Gauge(gauge) => Self::Gauge { value: gauge.value },
154 MetricValue::Set(set) => Self::Set {
155 values: set.values.into_iter().collect(),
156 },
157 MetricValue::Distribution1(dist) => Self::Distribution {
158 statistic: dist.statistic().into(),
159 samples: super::metric::zip_samples(dist.values, dist.sample_rates),
160 },
161 MetricValue::Distribution2(dist) => Self::Distribution {
162 statistic: dist.statistic().into(),
163 samples: dist.samples.into_iter().map(Into::into).collect(),
164 },
165 MetricValue::AggregatedHistogram1(hist) => Self::AggregatedHistogram {
166 buckets: super::metric::zip_buckets(
167 hist.buckets,
168 hist.counts.iter().map(|h| u64::from(*h)),
169 ),
170 count: u64::from(hist.count),
171 sum: hist.sum,
172 },
173 MetricValue::AggregatedHistogram2(hist) => Self::AggregatedHistogram {
174 buckets: hist.buckets.into_iter().map(Into::into).collect(),
175 count: u64::from(hist.count),
176 sum: hist.sum,
177 },
178 MetricValue::AggregatedHistogram3(hist) => Self::AggregatedHistogram {
179 buckets: hist.buckets.into_iter().map(Into::into).collect(),
180 count: hist.count,
181 sum: hist.sum,
182 },
183 MetricValue::AggregatedSummary1(summary) => Self::AggregatedSummary {
184 quantiles: super::metric::zip_quantiles(summary.quantiles, summary.values),
185 count: u64::from(summary.count),
186 sum: summary.sum,
187 },
188 MetricValue::AggregatedSummary2(summary) => Self::AggregatedSummary {
189 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
190 count: u64::from(summary.count),
191 sum: summary.sum,
192 },
193 MetricValue::AggregatedSummary3(summary) => Self::AggregatedSummary {
194 quantiles: summary.quantiles.into_iter().map(Into::into).collect(),
195 count: summary.count,
196 sum: summary.sum,
197 },
198 MetricValue::Sketch(sketch) => match sketch.sketch.unwrap() {
199 sketch::Sketch::AgentDdSketch(ddsketch) => Self::Sketch {
200 sketch: ddsketch.into(),
201 },
202 },
203 }
204 }
205}
206
207impl From<Metric> for super::Metric {
208 fn from(metric: Metric) -> Self {
209 let kind = match metric.kind() {
210 metric::Kind::Incremental => super::MetricKind::Incremental,
211 metric::Kind::Absolute => super::MetricKind::Absolute,
212 };
213
214 let name = metric.name;
215
216 let namespace = (!metric.namespace.is_empty()).then_some(metric.namespace);
217
218 let timestamp = metric.timestamp.map(|ts| {
219 chrono::Utc
220 .timestamp_opt(ts.seconds, ts.nanos as u32)
221 .single()
222 .expect("invalid timestamp")
223 });
224
225 let mut tags = MetricTags(
226 metric
227 .tags_v2
228 .into_iter()
229 .map(|(tag, values)| {
230 (
231 tag,
232 values
233 .values
234 .into_iter()
235 .map(|value| super::metric::TagValue::from(value.value))
236 .collect(),
237 )
238 })
239 .collect(),
240 );
241 tags.extend(metric.tags_v1);
245 let tags = (!tags.is_empty()).then_some(tags);
246
247 let value = super::MetricValue::from(metric.value.unwrap());
248
249 #[allow(deprecated)]
250 let metadata = metric
251 .metadata_full
252 .map(Into::into)
253 .or_else(|| {
254 metric
255 .metadata
256 .and_then(decode_value)
257 .map(EventMetadata::default_with_value)
258 })
259 .unwrap_or_default();
260
261 Self::new_with_metadata(name, kind, value, metadata)
262 .with_namespace(namespace)
263 .with_tags(tags)
264 .with_timestamp(timestamp)
265 .with_interval_ms(std::num::NonZeroU32::new(metric.interval_ms))
266 }
267}
268
269impl From<EventWrapper> for super::Event {
270 fn from(proto: EventWrapper) -> Self {
271 let event = proto.event.unwrap();
272
273 match event {
274 Event::Log(proto) => Self::Log(proto.into()),
275 Event::Metric(proto) => Self::Metric(proto.into()),
276 Event::Trace(proto) => Self::Trace(proto.into()),
277 }
278 }
279}
280
281impl From<super::LogEvent> for Log {
282 fn from(log_event: super::LogEvent) -> Self {
283 WithMetadata::<Self>::from(log_event).data
284 }
285}
286
287impl From<super::TraceEvent> for Trace {
288 fn from(trace: super::TraceEvent) -> Self {
289 WithMetadata::<Self>::from(trace).data
290 }
291}
292
293impl From<super::LogEvent> for WithMetadata<Log> {
294 fn from(log_event: super::LogEvent) -> Self {
295 let (value, metadata) = log_event.into_parts();
296
297 let (fields, value) = if let VrlValue::Object(fields) = value {
305 let fields = fields
307 .into_iter()
308 .map(|(k, v)| (k.into(), encode_value(v)))
309 .collect::<BTreeMap<_, _>>();
310
311 (fields, None)
312 } else {
313 let mut dummy_fields = BTreeMap::new();
317 dummy_fields.insert(".".to_owned(), encode_value(VrlValue::Null));
318
319 (dummy_fields, Some(encode_value(value)))
320 };
321
322 #[allow(deprecated)]
323 let data = Log {
324 fields,
325 value,
326 metadata: Some(encode_value(metadata.value().clone())),
327 metadata_full: Some(metadata.clone().into()),
328 };
329
330 Self { data, metadata }
331 }
332}
333
334impl From<super::TraceEvent> for WithMetadata<Trace> {
335 fn from(trace: super::TraceEvent) -> Self {
336 let (fields, metadata) = trace.into_parts();
337 let fields = fields
338 .into_iter()
339 .map(|(k, v)| (k.into(), encode_value(v)))
340 .collect::<BTreeMap<_, _>>();
341
342 #[allow(deprecated)]
343 let data = Trace {
344 fields,
345 metadata: Some(encode_value(metadata.value().clone())),
346 metadata_full: Some(metadata.clone().into()),
347 };
348
349 Self { data, metadata }
350 }
351}
352
353impl From<super::Metric> for Metric {
354 fn from(metric: super::Metric) -> Self {
355 WithMetadata::<Self>::from(metric).data
356 }
357}
358
359impl From<super::MetricValue> for MetricValue {
360 fn from(value: super::MetricValue) -> Self {
361 match value {
362 super::MetricValue::Counter { value } => Self::Counter(Counter { value }),
363 super::MetricValue::Gauge { value } => Self::Gauge(Gauge { value }),
364 super::MetricValue::Set { values } => Self::Set(Set {
365 values: values.into_iter().collect(),
366 }),
367 super::MetricValue::Distribution { samples, statistic } => {
368 Self::Distribution2(Distribution2 {
369 samples: samples.into_iter().map(Into::into).collect(),
370 statistic: match statistic {
371 super::StatisticKind::Histogram => StatisticKind::Histogram,
372 super::StatisticKind::Summary => StatisticKind::Summary,
373 }
374 .into(),
375 })
376 }
377 super::MetricValue::AggregatedHistogram {
378 buckets,
379 count,
380 sum,
381 } => Self::AggregatedHistogram3(AggregatedHistogram3 {
382 buckets: buckets.into_iter().map(Into::into).collect(),
383 count,
384 sum,
385 }),
386 super::MetricValue::AggregatedSummary {
387 quantiles,
388 count,
389 sum,
390 } => Self::AggregatedSummary3(AggregatedSummary3 {
391 quantiles: quantiles.into_iter().map(Into::into).collect(),
392 count,
393 sum,
394 }),
395 super::MetricValue::Sketch { sketch } => match sketch {
396 MetricSketch::AgentDDSketch(ddsketch) => {
397 let bin_map = ddsketch.bin_map();
398 let (keys, counts) = bin_map.into_parts();
399 let keys = keys.into_iter().map(i32::from).collect();
400 let counts = counts.into_iter().map(u32::from).collect();
401
402 Self::Sketch(Sketch {
403 sketch: Some(sketch::Sketch::AgentDdSketch(sketch::AgentDdSketch {
404 count: ddsketch.count(),
405 min: ddsketch.min().unwrap_or(f64::MAX),
406 max: ddsketch.max().unwrap_or(f64::MIN),
407 sum: ddsketch.sum().unwrap_or(0.0),
408 avg: ddsketch.avg().unwrap_or(0.0),
409 k: keys,
410 n: counts,
411 })),
412 })
413 }
414 },
415 }
416 }
417}
418
419impl From<super::Metric> for WithMetadata<Metric> {
420 fn from(metric: super::Metric) -> Self {
421 let (series, data, metadata) = metric.into_parts();
422 let name = series.name.name;
423 let namespace = series.name.namespace.unwrap_or_default();
424
425 let timestamp = data.time.timestamp.map(|ts| prost_types::Timestamp {
426 seconds: ts.timestamp(),
427 nanos: ts.timestamp_subsec_nanos() as i32,
428 });
429
430 let interval_ms = data.time.interval_ms.map_or(0, std::num::NonZeroU32::get);
431
432 let tags = series.tags.unwrap_or_default();
433
434 let kind = match data.kind {
435 super::MetricKind::Incremental => metric::Kind::Incremental,
436 super::MetricKind::Absolute => metric::Kind::Absolute,
437 }
438 .into();
439
440 let metric = MetricValue::from(data.value);
441
442 let tags_v1 = tags
445 .0
446 .iter()
447 .filter_map(|(tag, values)| {
448 values
449 .as_single()
450 .map(|value| (tag.clone(), value.to_string()))
451 })
452 .collect();
453 let tags_v2 = tags
455 .0
456 .into_iter()
457 .map(|(tag, values)| {
458 let values = values
459 .into_iter()
460 .map(|value| TagValue {
461 value: value.into_option(),
462 })
463 .collect();
464 (tag, TagValues { values })
465 })
466 .collect();
467
468 #[allow(deprecated)]
469 let data = Metric {
470 name,
471 namespace,
472 timestamp,
473 tags_v1,
474 tags_v2,
475 kind,
476 interval_ms,
477 value: Some(metric),
478 metadata: Some(encode_value(metadata.value().clone())),
479 metadata_full: Some(metadata.clone().into()),
480 };
481
482 Self { data, metadata }
483 }
484}
485
486impl From<super::Event> for Event {
487 fn from(event: super::Event) -> Self {
488 WithMetadata::<Self>::from(event).data
489 }
490}
491
492impl From<super::Event> for WithMetadata<Event> {
493 fn from(event: super::Event) -> Self {
494 match event {
495 super::Event::Log(log_event) => WithMetadata::<Log>::from(log_event).into(),
496 super::Event::Metric(metric) => WithMetadata::<Metric>::from(metric).into(),
497 super::Event::Trace(trace) => WithMetadata::<Trace>::from(trace).into(),
498 }
499 }
500}
501
502impl From<super::Event> for EventWrapper {
503 fn from(event: super::Event) -> Self {
504 WithMetadata::<EventWrapper>::from(event).data
505 }
506}
507
508impl From<super::Event> for WithMetadata<EventWrapper> {
509 fn from(event: super::Event) -> Self {
510 WithMetadata::<Event>::from(event).into()
511 }
512}
513
514impl From<AgentDDSketch> for Sketch {
515 fn from(ddsketch: AgentDDSketch) -> Self {
516 let bin_map = ddsketch.bin_map();
517 let (keys, counts) = bin_map.into_parts();
518 let ddsketch = sketch::AgentDdSketch {
519 count: ddsketch.count(),
520 min: ddsketch.min().unwrap_or(f64::MAX),
521 max: ddsketch.max().unwrap_or(f64::MIN),
522 sum: ddsketch.sum().unwrap_or(0.0),
523 avg: ddsketch.avg().unwrap_or(0.0),
524 k: keys.into_iter().map(i32::from).collect(),
525 n: counts.into_iter().map(u32::from).collect(),
526 };
527 Sketch {
528 sketch: Some(sketch::Sketch::AgentDdSketch(ddsketch)),
529 }
530 }
531}
532
533impl From<sketch::AgentDdSketch> for MetricSketch {
534 fn from(sketch: sketch::AgentDdSketch) -> Self {
535 let keys = sketch
538 .k
539 .into_iter()
540 .map(|k| (k, k > 0))
541 .map(|(k, pos)| {
542 k.try_into()
543 .unwrap_or(if pos { i16::MAX } else { i16::MIN })
544 })
545 .collect::<Vec<_>>();
546 let counts = sketch
547 .n
548 .into_iter()
549 .map(|n| n.try_into().unwrap_or(u16::MAX))
550 .collect::<Vec<_>>();
551 MetricSketch::AgentDDSketch(
552 AgentDDSketch::from_raw(
553 sketch.count,
554 sketch.min,
555 sketch.max,
556 sketch.sum,
557 sketch.avg,
558 &keys,
559 &counts,
560 )
561 .expect("keys/counts were unexpectedly mismatched"),
562 )
563 }
564}
565
566impl From<super::metadata::Secrets> for Secrets {
567 fn from(value: super::metadata::Secrets) -> Self {
568 Self {
569 entries: value.into_iter().map(|(k, v)| (k, v.to_string())).collect(),
570 }
571 }
572}
573
574impl From<Secrets> for super::metadata::Secrets {
575 fn from(value: Secrets) -> Self {
576 let mut secrets = Self::new();
577 for (k, v) in value.entries {
578 secrets.insert(k, v);
579 }
580
581 secrets
582 }
583}
584
585impl From<super::DatadogMetricOriginMetadata> for DatadogOriginMetadata {
586 fn from(value: super::DatadogMetricOriginMetadata) -> Self {
587 Self {
588 origin_product: value.product(),
589 origin_category: value.category(),
590 origin_service: value.service(),
591 }
592 }
593}
594
595impl From<DatadogOriginMetadata> for super::DatadogMetricOriginMetadata {
596 fn from(value: DatadogOriginMetadata) -> Self {
597 Self::new(
598 value.origin_product,
599 value.origin_category,
600 value.origin_service,
601 )
602 }
603}
604
605impl From<crate::config::OutputId> for OutputId {
606 fn from(value: crate::config::OutputId) -> Self {
607 Self {
608 component: value.component.into_id(),
609 port: value.port,
610 }
611 }
612}
613
614impl From<OutputId> for crate::config::OutputId {
615 fn from(value: OutputId) -> Self {
616 Self::from((value.component, value.port))
617 }
618}
619
620impl From<EventMetadata> for Metadata {
621 fn from(value: EventMetadata) -> Self {
622 let super::metadata::Inner {
623 value,
624 secrets,
625 source_id,
626 source_type,
627 upstream_id,
628 datadog_origin_metadata,
629 source_event_id,
630 ..
631 } = value.into_owned();
632
633 let secrets = (!secrets.is_empty()).then(|| secrets.into());
634
635 Self {
636 value: Some(encode_value(value)),
637 datadog_origin_metadata: datadog_origin_metadata.map(Into::into),
638 source_id: source_id.map(|s| s.to_string()),
639 source_type: source_type.map(|s| s.to_string()),
640 upstream_id: upstream_id.map(|id| id.as_ref().clone()).map(Into::into),
641 secrets,
642 source_event_id: source_event_id.map_or(vec![], std::convert::Into::into),
643 }
644 }
645}
646
647impl From<Metadata> for EventMetadata {
648 fn from(value: Metadata) -> Self {
649 let Metadata {
650 value: metadata_value,
651 source_id,
652 source_type,
653 upstream_id,
654 secrets,
655 datadog_origin_metadata,
656 source_event_id,
657 } = value;
658
659 let metadata_value = metadata_value.and_then(decode_value);
660 let source_id = source_id.map(|s| Arc::new(s.into()));
661 let upstream_id = upstream_id.map(|id| Arc::new(id.into()));
662 let secrets = secrets.map(Into::into);
663 let datadog_origin_metadata = datadog_origin_metadata.map(Into::into);
664 let source_event_id = if source_event_id.is_empty() {
665 None
666 } else {
667 match Uuid::from_slice(&source_event_id) {
668 Ok(id) => Some(id),
669 Err(error) => {
670 error!(
671 %error,
672 source_event_id = %String::from_utf8_lossy(&source_event_id),
673 "Failed to parse source_event_id.",
674 );
675 None
676 }
677 }
678 };
679
680 EventMetadata(Arc::new(Inner {
681 value: metadata_value.unwrap_or_else(|| vrl::value::Value::Object(ObjectMap::new())),
682 secrets: secrets.unwrap_or_default(),
683 finalizers: EventFinalizers::default(),
684 source_id,
685 source_type: source_type.map(Into::into),
686 upstream_id,
687 schema_definition: default_schema_definition(),
688 dropped_fields: ObjectMap::new(),
689 datadog_origin_metadata,
690 source_event_id,
691 }))
692 }
693}
694
695fn decode_value(input: Value) -> Option<super::Value> {
696 match input.kind {
697 Some(value::Kind::RawBytes(data)) => Some(super::Value::Bytes(data)),
698 Some(value::Kind::Timestamp(ts)) => Some(super::Value::Timestamp(
699 chrono::Utc
700 .timestamp_opt(ts.seconds, ts.nanos as u32)
701 .single()
702 .expect("invalid timestamp"),
703 )),
704 Some(value::Kind::Integer(value)) => Some(super::Value::Integer(value)),
705 Some(value::Kind::Float(value)) => Some(super::Value::Float(NotNan::new(value).unwrap())),
706 Some(value::Kind::Boolean(value)) => Some(super::Value::Boolean(value)),
707 Some(value::Kind::Map(map)) => decode_map(map.fields),
708 Some(value::Kind::Array(array)) => decode_array(array.items),
709 Some(value::Kind::Null(_)) => Some(super::Value::Null),
710 None => {
711 error!("Encoded event contains unknown value kind.");
712 None
713 }
714 }
715}
716
717fn decode_map(fields: BTreeMap<String, Value>) -> Option<super::Value> {
718 fields
719 .into_iter()
720 .map(|(key, value)| decode_value(value).map(|value| (key.into(), value)))
721 .collect::<Option<ObjectMap>>()
722 .map(event::Value::Object)
723}
724
725fn decode_array(items: Vec<Value>) -> Option<super::Value> {
726 items
727 .into_iter()
728 .map(decode_value)
729 .collect::<Option<Vec<_>>>()
730 .map(super::Value::Array)
731}
732
733fn encode_value(value: super::Value) -> Value {
734 Value {
735 kind: match value {
736 super::Value::Bytes(b) => Some(value::Kind::RawBytes(b)),
737 super::Value::Regex(regex) => Some(value::Kind::RawBytes(regex.as_bytes())),
738 super::Value::Timestamp(ts) => Some(value::Kind::Timestamp(prost_types::Timestamp {
739 seconds: ts.timestamp(),
740 nanos: ts.timestamp_subsec_nanos() as i32,
741 })),
742 super::Value::Integer(value) => Some(value::Kind::Integer(value)),
743 super::Value::Float(value) => Some(value::Kind::Float(value.into_inner())),
744 super::Value::Boolean(value) => Some(value::Kind::Boolean(value)),
745 super::Value::Object(fields) => Some(value::Kind::Map(encode_map(fields))),
746 super::Value::Array(items) => Some(value::Kind::Array(encode_array(items))),
747 super::Value::Null => Some(value::Kind::Null(ValueNull::NullValue as i32)),
748 },
749 }
750}
751
752fn encode_map(fields: ObjectMap) -> ValueMap {
753 ValueMap {
754 fields: fields
755 .into_iter()
756 .map(|(key, value)| (key.into(), encode_value(value)))
757 .collect(),
758 }
759}
760
761fn encode_array(items: Vec<super::Value>) -> ValueArray {
762 ValueArray {
763 items: items.into_iter().map(encode_value).collect(),
764 }
765}