1use super::common::{kv_list_into_value, to_hex};
2use crate::proto::{
3 common::v1::{any_value::Value as PBValue, InstrumentationScope},
4 logs::v1::{LogRecord, ResourceLogs, SeverityNumber},
5 resource::v1::Resource,
6};
7use bytes::Bytes;
8use chrono::{DateTime, TimeZone, Utc};
9use vector_core::{
10 config::{log_schema, LegacyKey, LogNamespace},
11 event::{Event, LogEvent},
12};
13use vrl::core::Value;
14use vrl::path;
15
16const SOURCE_NAME: &str = "opentelemetry";
17pub const RESOURCE_KEY: &str = "resources";
18pub const ATTRIBUTES_KEY: &str = "attributes";
19pub const SCOPE_KEY: &str = "scope";
20pub const NAME_KEY: &str = "name";
21pub const VERSION_KEY: &str = "version";
22pub const TRACE_ID_KEY: &str = "trace_id";
23pub const SPAN_ID_KEY: &str = "span_id";
24pub const SEVERITY_TEXT_KEY: &str = "severity_text";
25pub const SEVERITY_NUMBER_KEY: &str = "severity_number";
26pub const OBSERVED_TIMESTAMP_KEY: &str = "observed_timestamp";
27pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
28pub const FLAGS_KEY: &str = "flags";
29
30impl ResourceLogs {
31 pub fn into_event_iter(self, log_namespace: LogNamespace) -> impl Iterator<Item = Event> {
32 let now = Utc::now();
33
34 self.scope_logs.into_iter().flat_map(move |scope_log| {
35 let scope = scope_log.scope;
36 let resource = self.resource.clone();
37 scope_log.log_records.into_iter().map(move |log_record| {
38 ResourceLog {
39 resource: resource.clone(),
40 scope: scope.clone(),
41 log_record,
42 }
43 .into_event(log_namespace, now)
44 })
45 })
46 }
47}
48
49struct ResourceLog {
50 resource: Option<Resource>,
51 scope: Option<InstrumentationScope>,
52 log_record: LogRecord,
53}
54
55impl ResourceLog {
57 fn into_event(self, log_namespace: LogNamespace, now: DateTime<Utc>) -> Event {
58 let mut log = match log_namespace {
59 LogNamespace::Vector => {
60 if let Some(v) = self.log_record.body.and_then(|av| av.value) {
61 LogEvent::from(<PBValue as Into<Value>>::into(v))
62 } else {
63 LogEvent::from(Value::Null)
64 }
65 }
66 LogNamespace::Legacy => {
67 let mut log = LogEvent::default();
68 if let Some(v) = self.log_record.body.and_then(|av| av.value) {
69 log.maybe_insert(log_schema().message_key_target_path(), v);
70 }
71 log
72 }
73 };
74
75 if let Some(scope) = self.scope {
77 if !scope.name.is_empty() {
78 log_namespace.insert_source_metadata(
79 SOURCE_NAME,
80 &mut log,
81 Some(LegacyKey::Overwrite(path!(SCOPE_KEY, NAME_KEY))),
82 path!(SCOPE_KEY, NAME_KEY),
83 scope.name,
84 );
85 }
86 if !scope.version.is_empty() {
87 log_namespace.insert_source_metadata(
88 SOURCE_NAME,
89 &mut log,
90 Some(LegacyKey::Overwrite(path!(SCOPE_KEY, VERSION_KEY))),
91 path!(SCOPE_KEY, VERSION_KEY),
92 scope.version,
93 );
94 }
95 if !scope.attributes.is_empty() {
96 log_namespace.insert_source_metadata(
97 SOURCE_NAME,
98 &mut log,
99 Some(LegacyKey::Overwrite(path!(SCOPE_KEY, ATTRIBUTES_KEY))),
100 path!(SCOPE_KEY, ATTRIBUTES_KEY),
101 kv_list_into_value(scope.attributes),
102 );
103 }
104 if scope.dropped_attributes_count > 0 {
105 log_namespace.insert_source_metadata(
106 SOURCE_NAME,
107 &mut log,
108 Some(LegacyKey::Overwrite(path!(
109 SCOPE_KEY,
110 DROPPED_ATTRIBUTES_COUNT_KEY
111 ))),
112 path!(SCOPE_KEY, DROPPED_ATTRIBUTES_COUNT_KEY),
113 scope.dropped_attributes_count,
114 );
115 }
116 }
117
118 if let Some(resource) = self.resource {
120 if !resource.attributes.is_empty() {
121 log_namespace.insert_source_metadata(
122 SOURCE_NAME,
123 &mut log,
124 Some(LegacyKey::Overwrite(path!(RESOURCE_KEY))),
125 path!(RESOURCE_KEY),
126 kv_list_into_value(resource.attributes),
127 );
128 }
129 }
130 if !self.log_record.attributes.is_empty() {
131 log_namespace.insert_source_metadata(
132 SOURCE_NAME,
133 &mut log,
134 Some(LegacyKey::Overwrite(path!(ATTRIBUTES_KEY))),
135 path!(ATTRIBUTES_KEY),
136 kv_list_into_value(self.log_record.attributes),
137 );
138 }
139 if !self.log_record.trace_id.is_empty() {
140 log_namespace.insert_source_metadata(
141 SOURCE_NAME,
142 &mut log,
143 Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))),
144 path!(TRACE_ID_KEY),
145 Bytes::from(to_hex(&self.log_record.trace_id)),
146 );
147 }
148 if !self.log_record.span_id.is_empty() {
149 log_namespace.insert_source_metadata(
150 SOURCE_NAME,
151 &mut log,
152 Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))),
153 path!(SPAN_ID_KEY),
154 Bytes::from(to_hex(&self.log_record.span_id)),
155 );
156 }
157 if !self.log_record.severity_text.is_empty() {
158 log_namespace.insert_source_metadata(
159 SOURCE_NAME,
160 &mut log,
161 Some(LegacyKey::Overwrite(path!(SEVERITY_TEXT_KEY))),
162 path!(SEVERITY_TEXT_KEY),
163 self.log_record.severity_text,
164 );
165 }
166 if self.log_record.severity_number != SeverityNumber::Unspecified as i32 {
167 log_namespace.insert_source_metadata(
168 SOURCE_NAME,
169 &mut log,
170 Some(LegacyKey::Overwrite(path!(SEVERITY_NUMBER_KEY))),
171 path!(SEVERITY_NUMBER_KEY),
172 self.log_record.severity_number,
173 );
174 }
175 if self.log_record.flags > 0 {
176 log_namespace.insert_source_metadata(
177 SOURCE_NAME,
178 &mut log,
179 Some(LegacyKey::Overwrite(path!(FLAGS_KEY))),
180 path!(FLAGS_KEY),
181 self.log_record.flags,
182 );
183 }
184
185 log_namespace.insert_source_metadata(
186 SOURCE_NAME,
187 &mut log,
188 Some(LegacyKey::Overwrite(path!(DROPPED_ATTRIBUTES_COUNT_KEY))),
189 path!(DROPPED_ATTRIBUTES_COUNT_KEY),
190 self.log_record.dropped_attributes_count,
191 );
192
193 let observed_timestamp = if self.log_record.observed_time_unix_nano > 0 {
196 Utc.timestamp_nanos(self.log_record.observed_time_unix_nano as i64)
197 .into()
198 } else {
199 Value::Timestamp(now)
200 };
201 log_namespace.insert_source_metadata(
202 SOURCE_NAME,
203 &mut log,
204 Some(LegacyKey::Overwrite(path!(OBSERVED_TIMESTAMP_KEY))),
205 path!(OBSERVED_TIMESTAMP_KEY),
206 observed_timestamp.clone(),
207 );
208
209 let timestamp = if self.log_record.time_unix_nano > 0 {
211 Utc.timestamp_nanos(self.log_record.time_unix_nano as i64)
212 .into()
213 } else {
214 observed_timestamp
215 };
216 log_namespace.insert_source_metadata(
217 SOURCE_NAME,
218 &mut log,
219 log_schema().timestamp_key().map(LegacyKey::Overwrite),
220 path!("timestamp"),
221 timestamp,
222 );
223
224 log_namespace.insert_vector_metadata(
225 &mut log,
226 log_schema().source_type_key(),
227 path!("source_type"),
228 Bytes::from_static(SOURCE_NAME.as_bytes()),
229 );
230 if log_namespace == LogNamespace::Vector {
231 log.metadata_mut()
232 .value_mut()
233 .insert(path!("vector", "ingest_timestamp"), now);
234 }
235
236 log.into()
237 }
238}