opentelemetry_proto/
spans.rs1use std::collections::BTreeMap;
2
3use chrono::{DateTime, TimeZone, Utc};
4use vector_core::event::{Event, TraceEvent};
5use vrl::{
6 event_path,
7 value::{KeyString, Value},
8};
9
10use super::{
11 common::{kv_list_into_value, to_hex},
12 proto::{
13 resource::v1::Resource,
14 trace::v1::{
15 ResourceSpans, Span, Status as SpanStatus,
16 span::{Event as SpanEvent, Link},
17 },
18 },
19};
20
21pub const TRACE_ID_KEY: &str = "trace_id";
22pub const SPAN_ID_KEY: &str = "span_id";
23pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
24pub const RESOURCE_KEY: &str = "resources";
25pub const ATTRIBUTES_KEY: &str = "attributes";
26
27impl ResourceSpans {
28 pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
29 let resource = self.resource;
30 let now = Utc::now();
31
32 self.scope_spans
33 .into_iter()
34 .flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans)
35 .map(move |span| {
36 ResourceSpan {
37 resource: resource.clone(),
38 span,
39 }
40 .into_event(now)
41 })
42 }
43}
44
45struct ResourceSpan {
46 resource: Option<Resource>,
47 span: Span,
48}
49
50impl ResourceSpan {
53 fn into_event(self, now: DateTime<Utc>) -> Event {
54 let mut trace = TraceEvent::default();
55 let span = self.span;
56 trace.insert(
57 event_path!(TRACE_ID_KEY),
58 Value::from(to_hex(&span.trace_id)),
59 );
60 trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id)));
61 trace.insert(event_path!("trace_state"), span.trace_state);
62 trace.insert(
63 event_path!("parent_span_id"),
64 Value::from(to_hex(&span.parent_span_id)),
65 );
66 trace.insert(event_path!("name"), span.name);
67 trace.insert(event_path!("kind"), span.kind);
68 trace.insert(
69 event_path!("start_time_unix_nano"),
70 Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)),
71 );
72 trace.insert(
73 event_path!("end_time_unix_nano"),
74 Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)),
75 );
76 if !span.attributes.is_empty() {
77 trace.insert(
78 event_path!(ATTRIBUTES_KEY),
79 kv_list_into_value(span.attributes),
80 );
81 }
82 trace.insert(
83 event_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
84 Value::from(span.dropped_attributes_count),
85 );
86 if !span.events.is_empty() {
87 trace.insert(
88 event_path!("events"),
89 Value::Array(span.events.into_iter().map(Into::into).collect()),
90 );
91 }
92 trace.insert(
93 event_path!("dropped_events_count"),
94 Value::from(span.dropped_events_count),
95 );
96 if !span.links.is_empty() {
97 trace.insert(
98 event_path!("links"),
99 Value::Array(span.links.into_iter().map(Into::into).collect()),
100 );
101 }
102 trace.insert(
103 event_path!("dropped_links_count"),
104 Value::from(span.dropped_links_count),
105 );
106 trace.insert(event_path!("status"), Value::from(span.status));
107 if let Some(resource) = self.resource
108 && !resource.attributes.is_empty()
109 {
110 trace.insert(
111 event_path!(RESOURCE_KEY),
112 kv_list_into_value(resource.attributes),
113 );
114 }
115 trace.insert(event_path!("ingest_timestamp"), Value::from(now));
116 trace.into()
117 }
118}
119
120impl From<SpanEvent> for Value {
121 fn from(ev: SpanEvent) -> Self {
122 let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
123 obj.insert("name".into(), ev.name.into());
124 obj.insert(
125 "time_unix_nano".into(),
126 Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)),
127 );
128 obj.insert("attributes".into(), kv_list_into_value(ev.attributes));
129 obj.insert(
130 "dropped_attributes_count".into(),
131 Value::Integer(ev.dropped_attributes_count as i64),
132 );
133 Value::Object(obj)
134 }
135}
136
137impl From<Link> for Value {
138 fn from(link: Link) -> Self {
139 let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
140 obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id)));
141 obj.insert("span_id".into(), Value::from(to_hex(&link.span_id)));
142 obj.insert("trace_state".into(), link.trace_state.into());
143 obj.insert("attributes".into(), kv_list_into_value(link.attributes));
144 obj.insert(
145 "dropped_attributes_count".into(),
146 Value::Integer(link.dropped_attributes_count as i64),
147 );
148 Value::Object(obj)
149 }
150}
151
152impl From<SpanStatus> for Value {
153 fn from(status: SpanStatus) -> Self {
154 let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
155 obj.insert("message".into(), status.message.into());
156 obj.insert("code".into(), status.code.into());
157 Value::Object(obj)
158 }
159}