opentelemetry_proto/
spans.rs

1use std::collections::BTreeMap;
2
3use chrono::{DateTime, TimeZone, Utc};
4use vector_core::event::{Event, TraceEvent};
5use vrl::{
6    event_path,
7    value::{KeyString, Value},
8};
9
10use super::{
11    common::{kv_list_into_value, to_hex},
12    proto::{
13        resource::v1::Resource,
14        trace::v1::{
15            ResourceSpans, Span, Status as SpanStatus,
16            span::{Event as SpanEvent, Link},
17        },
18    },
19};
20
21pub const TRACE_ID_KEY: &str = "trace_id";
22pub const SPAN_ID_KEY: &str = "span_id";
23pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
24pub const RESOURCE_KEY: &str = "resources";
25pub const ATTRIBUTES_KEY: &str = "attributes";
26
27impl ResourceSpans {
28    pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
29        let resource = self.resource;
30        let now = Utc::now();
31
32        self.scope_spans
33            .into_iter()
34            .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans)
35            .map(move |span| {
36                ResourceSpan {
37                    resource: resource.clone(),
38                    span,
39                }
40                .into_event(now)
41            })
42    }
43}
44
45struct ResourceSpan {
46    resource: Option<Resource>,
47    span: Span,
48}
49
50// Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here,
51// insert all attributes into log root, just like what datadog_agent/traces does.
52impl ResourceSpan {
53    fn into_event(self, now: DateTime<Utc>) -> Event {
54        let mut trace = TraceEvent::default();
55        let span = self.span;
56        trace.insert(
57            event_path!(TRACE_ID_KEY),
58            Value::from(to_hex(&span.trace_id)),
59        );
60        trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id)));
61        trace.insert(event_path!("trace_state"), span.trace_state);
62        trace.insert(
63            event_path!("parent_span_id"),
64            Value::from(to_hex(&span.parent_span_id)),
65        );
66        trace.insert(event_path!("name"), span.name);
67        trace.insert(event_path!("kind"), span.kind);
68        trace.insert(
69            event_path!("start_time_unix_nano"),
70            Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)),
71        );
72        trace.insert(
73            event_path!("end_time_unix_nano"),
74            Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)),
75        );
76        if !span.attributes.is_empty() {
77            trace.insert(
78                event_path!(ATTRIBUTES_KEY),
79                kv_list_into_value(span.attributes),
80            );
81        }
82        trace.insert(
83            event_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
84            Value::from(span.dropped_attributes_count),
85        );
86        if !span.events.is_empty() {
87            trace.insert(
88                event_path!("events"),
89                Value::Array(span.events.into_iter().map(Into::into).collect()),
90            );
91        }
92        trace.insert(
93            event_path!("dropped_events_count"),
94            Value::from(span.dropped_events_count),
95        );
96        if !span.links.is_empty() {
97            trace.insert(
98                event_path!("links"),
99                Value::Array(span.links.into_iter().map(Into::into).collect()),
100            );
101        }
102        trace.insert(
103            event_path!("dropped_links_count"),
104            Value::from(span.dropped_links_count),
105        );
106        trace.insert(event_path!("status"), Value::from(span.status));
107        if let Some(resource) = self.resource
108            && !resource.attributes.is_empty()
109        {
110            trace.insert(
111                event_path!(RESOURCE_KEY),
112                kv_list_into_value(resource.attributes),
113            );
114        }
115        trace.insert(event_path!("ingest_timestamp"), Value::from(now));
116        trace.into()
117    }
118}
119
120impl From<SpanEvent> for Value {
121    fn from(ev: SpanEvent) -> Self {
122        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
123        obj.insert("name".into(), ev.name.into());
124        obj.insert(
125            "time_unix_nano".into(),
126            Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)),
127        );
128        obj.insert("attributes".into(), kv_list_into_value(ev.attributes));
129        obj.insert(
130            "dropped_attributes_count".into(),
131            Value::Integer(ev.dropped_attributes_count as i64),
132        );
133        Value::Object(obj)
134    }
135}
136
137impl From<Link> for Value {
138    fn from(link: Link) -> Self {
139        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
140        obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id)));
141        obj.insert("span_id".into(), Value::from(to_hex(&link.span_id)));
142        obj.insert("trace_state".into(), link.trace_state.into());
143        obj.insert("attributes".into(), kv_list_into_value(link.attributes));
144        obj.insert(
145            "dropped_attributes_count".into(),
146            Value::Integer(link.dropped_attributes_count as i64),
147        );
148        Value::Object(obj)
149    }
150}
151
152impl From<SpanStatus> for Value {
153    fn from(status: SpanStatus) -> Self {
154        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
155        obj.insert("message".into(), status.message.into());
156        obj.insert("code".into(), status.code.into());
157        Value::Object(obj)
158    }
159}