1use bytes::Bytes;
2use opentelemetry_proto::proto::{
3 DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
4 RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
5 TRACES_REQUEST_MESSAGE_TYPE,
6};
7use smallvec::{SmallVec, smallvec};
8use vector_config::{configurable_component, indexmap::IndexSet};
9use vector_core::{
10 config::{DataType, LogNamespace},
11 event::Event,
12 schema,
13};
14use vrl::{protobuf::parse::Options, value::Kind};
15
16use super::{Deserializer, ProtobufDeserializer};
17
18#[configurable_component]
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21#[serde(rename_all = "snake_case")]
22pub enum OtlpSignalType {
23 Logs,
25 Metrics,
27 Traces,
29}
30
31#[configurable_component]
33#[derive(Debug, Clone)]
34pub struct OtlpDeserializerConfig {
35 #[serde(default = "default_signal_types")]
44 pub signal_types: IndexSet<OtlpSignalType>,
45}
46
47fn default_signal_types() -> IndexSet<OtlpSignalType> {
48 IndexSet::from([
49 OtlpSignalType::Logs,
50 OtlpSignalType::Metrics,
51 OtlpSignalType::Traces,
52 ])
53}
54
55impl Default for OtlpDeserializerConfig {
56 fn default() -> Self {
57 Self {
58 signal_types: default_signal_types(),
59 }
60 }
61}
62
63impl OtlpDeserializerConfig {
64 pub fn build(&self) -> OtlpDeserializer {
66 OtlpDeserializer::new_with_signals(self.signal_types.clone())
67 }
68
69 pub fn output_type(&self) -> DataType {
71 DataType::Log | DataType::Trace
72 }
73
74 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76 match log_namespace {
77 LogNamespace::Legacy => {
78 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
79 }
80 LogNamespace::Vector => {
81 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
82 }
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
104pub struct OtlpDeserializer {
105 logs_deserializer: ProtobufDeserializer,
106 metrics_deserializer: ProtobufDeserializer,
107 traces_deserializer: ProtobufDeserializer,
108 signals: IndexSet<OtlpSignalType>,
110}
111
112impl Default for OtlpDeserializer {
113 fn default() -> Self {
114 Self::new_with_signals(default_signal_types())
115 }
116}
117
118impl OtlpDeserializer {
119 pub fn new_with_signals(signals: IndexSet<OtlpSignalType>) -> Self {
122 let options = Options {
123 use_json_names: true,
124 };
125
126 let logs_deserializer = ProtobufDeserializer::new_from_bytes(
127 DESCRIPTOR_BYTES,
128 LOGS_REQUEST_MESSAGE_TYPE,
129 options.clone(),
130 )
131 .expect("Failed to create logs deserializer");
132
133 let metrics_deserializer = ProtobufDeserializer::new_from_bytes(
134 DESCRIPTOR_BYTES,
135 METRICS_REQUEST_MESSAGE_TYPE,
136 options.clone(),
137 )
138 .expect("Failed to create metrics deserializer");
139
140 let traces_deserializer = ProtobufDeserializer::new_from_bytes(
141 DESCRIPTOR_BYTES,
142 TRACES_REQUEST_MESSAGE_TYPE,
143 options,
144 )
145 .expect("Failed to create traces deserializer");
146
147 Self {
148 logs_deserializer,
149 metrics_deserializer,
150 traces_deserializer,
151 signals,
152 }
153 }
154}
155
156impl Deserializer for OtlpDeserializer {
157 fn parse(
158 &self,
159 bytes: Bytes,
160 log_namespace: LogNamespace,
161 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
162 for signal_type in &self.signals {
164 match signal_type {
165 OtlpSignalType::Logs => {
166 if let Ok(events) = self.logs_deserializer.parse(bytes.clone(), log_namespace)
167 && let Some(Event::Log(log)) = events.first()
168 && log.get(RESOURCE_LOGS_JSON_FIELD).is_some()
169 {
170 return Ok(events);
171 }
172 }
173 OtlpSignalType::Metrics => {
174 if let Ok(events) = self
175 .metrics_deserializer
176 .parse(bytes.clone(), log_namespace)
177 && let Some(Event::Log(log)) = events.first()
178 && log.get(RESOURCE_METRICS_JSON_FIELD).is_some()
179 {
180 return Ok(events);
181 }
182 }
183 OtlpSignalType::Traces => {
184 if let Ok(mut events) =
185 self.traces_deserializer.parse(bytes.clone(), log_namespace)
186 && let Some(Event::Log(log)) = events.first()
187 && log.get(RESOURCE_SPANS_JSON_FIELD).is_some()
188 {
189 if let Some(Event::Log(log)) = events.pop() {
191 let trace_event = Event::Trace(log.into());
192 return Ok(smallvec![trace_event]);
193 }
194 }
195 }
196 }
197 }
198
199 Err(format!("Invalid OTLP data: expected one of {:?}", self.signals).into())
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use opentelemetry_proto::proto::{
206 collector::{
207 logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
208 trace::v1::ExportTraceServiceRequest,
209 },
210 logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
211 metrics::v1::{Metric, ResourceMetrics, ScopeMetrics},
212 resource::v1::Resource,
213 trace::v1::{ResourceSpans, ScopeSpans, Span},
214 };
215 use prost::Message;
216
217 use super::*;
218
219 const TEST_TRACE_ID: [u8; 16] = [
221 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
222 0x10,
223 ];
224 const TEST_SPAN_ID: [u8; 8] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
226
227 fn create_logs_request_bytes() -> Bytes {
228 let request = ExportLogsServiceRequest {
229 resource_logs: vec![ResourceLogs {
230 resource: Some(Resource {
231 attributes: vec![],
232 dropped_attributes_count: 0,
233 }),
234 scope_logs: vec![ScopeLogs {
235 scope: None,
236 log_records: vec![LogRecord {
237 time_unix_nano: 1234567890,
238 severity_number: 9,
239 severity_text: "INFO".to_string(),
240 body: None,
241 attributes: vec![],
242 dropped_attributes_count: 0,
243 flags: 0,
244 trace_id: vec![],
245 span_id: vec![],
246 observed_time_unix_nano: 0,
247 }],
248 schema_url: String::new(),
249 }],
250 schema_url: String::new(),
251 }],
252 };
253
254 Bytes::from(request.encode_to_vec())
255 }
256
257 fn create_metrics_request_bytes() -> Bytes {
258 let request = ExportMetricsServiceRequest {
259 resource_metrics: vec![ResourceMetrics {
260 resource: Some(Resource {
261 attributes: vec![],
262 dropped_attributes_count: 0,
263 }),
264 scope_metrics: vec![ScopeMetrics {
265 scope: None,
266 metrics: vec![Metric {
267 name: "test_metric".to_string(),
268 description: String::new(),
269 unit: String::new(),
270 data: None,
271 }],
272 schema_url: String::new(),
273 }],
274 schema_url: String::new(),
275 }],
276 };
277
278 Bytes::from(request.encode_to_vec())
279 }
280
281 fn create_traces_request_bytes() -> Bytes {
282 let request = ExportTraceServiceRequest {
283 resource_spans: vec![ResourceSpans {
284 resource: Some(Resource {
285 attributes: vec![],
286 dropped_attributes_count: 0,
287 }),
288 scope_spans: vec![ScopeSpans {
289 scope: None,
290 spans: vec![Span {
291 trace_id: TEST_TRACE_ID.to_vec(),
292 span_id: TEST_SPAN_ID.to_vec(),
293 trace_state: String::new(),
294 parent_span_id: vec![],
295 name: "test_span".to_string(),
296 kind: 0,
297 start_time_unix_nano: 1234567890,
298 end_time_unix_nano: 1234567900,
299 attributes: vec![],
300 dropped_attributes_count: 0,
301 events: vec![],
302 dropped_events_count: 0,
303 links: vec![],
304 dropped_links_count: 0,
305 status: None,
306 }],
307 schema_url: String::new(),
308 }],
309 schema_url: String::new(),
310 }],
311 };
312
313 Bytes::from(request.encode_to_vec())
314 }
315
316 fn validate_trace_ids(trace: &vrl::value::Value) {
317 let resource_spans = trace
319 .get("resourceSpans")
320 .and_then(|v| v.as_array())
321 .expect("resourceSpans should be an array");
322
323 let first_rs = resource_spans
324 .first()
325 .expect("should have at least one resource span");
326
327 let scope_spans = first_rs
328 .get("scopeSpans")
329 .and_then(|v| v.as_array())
330 .expect("scopeSpans should be an array");
331
332 let first_ss = scope_spans
333 .first()
334 .expect("should have at least one scope span");
335
336 let spans = first_ss
337 .get("spans")
338 .and_then(|v| v.as_array())
339 .expect("spans should be an array");
340
341 let span = spans.first().expect("should have at least one span");
342
343 let trace_id = span
345 .get("traceId")
346 .and_then(|v| v.as_bytes())
347 .expect("traceId should exist and be bytes");
348
349 assert_eq!(
350 trace_id.as_ref(),
351 &TEST_TRACE_ID,
352 "traceId should match the expected 16 bytes (0102030405060708090a0b0c0d0e0f10)"
353 );
354
355 let span_id = span
357 .get("spanId")
358 .and_then(|v| v.as_bytes())
359 .expect("spanId should exist and be bytes");
360
361 assert_eq!(
362 span_id.as_ref(),
363 &TEST_SPAN_ID,
364 "spanId should match the expected 8 bytes (0102030405060708)"
365 );
366 }
367
368 fn assert_otlp_event(bytes: Bytes, field: &str, is_trace: bool) {
369 let deserializer = OtlpDeserializer::default();
370 let events = deserializer.parse(bytes, LogNamespace::Legacy).unwrap();
371
372 assert_eq!(events.len(), 1);
373 if is_trace {
374 assert!(matches!(events[0], Event::Trace(_)));
375 let trace = events[0].as_trace();
376 assert!(trace.get(field).is_some());
377 validate_trace_ids(trace.value());
378 } else {
379 assert!(events[0].as_log().get(field).is_some());
380 }
381 }
382
383 #[test]
384 fn deserialize_otlp_logs() {
385 assert_otlp_event(create_logs_request_bytes(), RESOURCE_LOGS_JSON_FIELD, false);
386 }
387
388 #[test]
389 fn deserialize_otlp_metrics() {
390 assert_otlp_event(
391 create_metrics_request_bytes(),
392 RESOURCE_METRICS_JSON_FIELD,
393 false,
394 );
395 }
396
397 #[test]
398 fn deserialize_otlp_traces() {
399 assert_otlp_event(
400 create_traces_request_bytes(),
401 RESOURCE_SPANS_JSON_FIELD,
402 true,
403 );
404 }
405
406 #[test]
407 fn deserialize_invalid_otlp() {
408 let deserializer = OtlpDeserializer::default();
409 let bytes = Bytes::from("invalid protobuf data");
410 let result = deserializer.parse(bytes, LogNamespace::Legacy);
411
412 assert!(result.is_err());
413 assert!(
414 result
415 .unwrap_err()
416 .to_string()
417 .contains("Invalid OTLP data")
418 );
419 }
420
421 #[test]
422 fn deserialize_with_custom_priority_traces_only() {
423 let deserializer =
425 OtlpDeserializer::new_with_signals(IndexSet::from([OtlpSignalType::Traces]));
426
427 let trace_bytes = create_traces_request_bytes();
429 let result = deserializer.parse(trace_bytes, LogNamespace::Legacy);
430 assert!(result.is_ok());
431 assert!(matches!(result.unwrap()[0], Event::Trace(_)));
432
433 let log_bytes = create_logs_request_bytes();
435 let result = deserializer.parse(log_bytes, LogNamespace::Legacy);
436 assert!(result.is_err());
437 }
438}