opentelemetry_proto/
spans.rs

1use super::common::{kv_list_into_value, to_hex};
2use super::proto::{
3    resource::v1::Resource,
4    trace::v1::{
5        span::{Event as SpanEvent, Link},
6        ResourceSpans, Span, Status as SpanStatus,
7    },
8};
9use chrono::{DateTime, TimeZone, Utc};
10use std::collections::BTreeMap;
11use vector_core::event::{Event, TraceEvent};
12use vrl::{
13    event_path,
14    value::{KeyString, Value},
15};
16
17pub const TRACE_ID_KEY: &str = "trace_id";
18pub const SPAN_ID_KEY: &str = "span_id";
19pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
20pub const RESOURCE_KEY: &str = "resources";
21pub const ATTRIBUTES_KEY: &str = "attributes";
22
23impl ResourceSpans {
24    pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
25        let resource = self.resource;
26        let now = Utc::now();
27
28        self.scope_spans
29            .into_iter()
30            .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans)
31            .map(move |span| {
32                ResourceSpan {
33                    resource: resource.clone(),
34                    span,
35                }
36                .into_event(now)
37            })
38    }
39}
40
41struct ResourceSpan {
42    resource: Option<Resource>,
43    span: Span,
44}
45
46// Unlike log events(log body + metadata), trace spans are just metadata, so we don't handle log_namespace here,
47// insert all attributes into log root, just like what datadog_agent/traces does.
48impl ResourceSpan {
49    fn into_event(self, now: DateTime<Utc>) -> Event {
50        let mut trace = TraceEvent::default();
51        let span = self.span;
52        trace.insert(
53            event_path!(TRACE_ID_KEY),
54            Value::from(to_hex(&span.trace_id)),
55        );
56        trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id)));
57        trace.insert(event_path!("trace_state"), span.trace_state);
58        trace.insert(
59            event_path!("parent_span_id"),
60            Value::from(to_hex(&span.parent_span_id)),
61        );
62        trace.insert(event_path!("name"), span.name);
63        trace.insert(event_path!("kind"), span.kind);
64        trace.insert(
65            event_path!("start_time_unix_nano"),
66            Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)),
67        );
68        trace.insert(
69            event_path!("end_time_unix_nano"),
70            Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)),
71        );
72        if !span.attributes.is_empty() {
73            trace.insert(
74                event_path!(ATTRIBUTES_KEY),
75                kv_list_into_value(span.attributes),
76            );
77        }
78        trace.insert(
79            event_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
80            Value::from(span.dropped_attributes_count),
81        );
82        if !span.events.is_empty() {
83            trace.insert(
84                event_path!("events"),
85                Value::Array(span.events.into_iter().map(Into::into).collect()),
86            );
87        }
88        trace.insert(
89            event_path!("dropped_events_count"),
90            Value::from(span.dropped_events_count),
91        );
92        if !span.links.is_empty() {
93            trace.insert(
94                event_path!("links"),
95                Value::Array(span.links.into_iter().map(Into::into).collect()),
96            );
97        }
98        trace.insert(
99            event_path!("dropped_links_count"),
100            Value::from(span.dropped_links_count),
101        );
102        trace.insert(event_path!("status"), Value::from(span.status));
103        if let Some(resource) = self.resource {
104            if !resource.attributes.is_empty() {
105                trace.insert(
106                    event_path!(RESOURCE_KEY),
107                    kv_list_into_value(resource.attributes),
108                );
109            }
110        }
111        trace.insert(event_path!("ingest_timestamp"), Value::from(now));
112        trace.into()
113    }
114}
115
116impl From<SpanEvent> for Value {
117    fn from(ev: SpanEvent) -> Self {
118        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
119        obj.insert("name".into(), ev.name.into());
120        obj.insert(
121            "time_unix_nano".into(),
122            Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)),
123        );
124        obj.insert("attributes".into(), kv_list_into_value(ev.attributes));
125        obj.insert(
126            "dropped_attributes_count".into(),
127            Value::Integer(ev.dropped_attributes_count as i64),
128        );
129        Value::Object(obj)
130    }
131}
132
133impl From<Link> for Value {
134    fn from(link: Link) -> Self {
135        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
136        obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id)));
137        obj.insert("span_id".into(), Value::from(to_hex(&link.span_id)));
138        obj.insert("trace_state".into(), link.trace_state.into());
139        obj.insert("attributes".into(), kv_list_into_value(link.attributes));
140        obj.insert(
141            "dropped_attributes_count".into(),
142            Value::Integer(link.dropped_attributes_count as i64),
143        );
144        Value::Object(obj)
145    }
146}
147
148impl From<SpanStatus> for Value {
149    fn from(status: SpanStatus) -> Self {
150        let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
151        obj.insert("message".into(), status.message.into());
152        obj.insert("code".into(), status.code.into());
153        Value::Object(obj)
154    }
155}