codecs/decoding/format/
otlp.rs

1use bytes::Bytes;
2use opentelemetry_proto::proto::{
3    DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
4    RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
5    TRACES_REQUEST_MESSAGE_TYPE,
6};
7use smallvec::{SmallVec, smallvec};
8use vector_config::{configurable_component, indexmap::IndexSet};
9use vector_core::{
10    config::{DataType, LogNamespace},
11    event::Event,
12    schema,
13};
14use vrl::{protobuf::parse::Options, value::Kind};
15
16use super::{Deserializer, ProtobufDeserializer};
17
18/// OTLP signal type for prioritized parsing.
19#[configurable_component]
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21#[serde(rename_all = "snake_case")]
22pub enum OtlpSignalType {
23    /// OTLP logs signal (ExportLogsServiceRequest)
24    Logs,
25    /// OTLP metrics signal (ExportMetricsServiceRequest)
26    Metrics,
27    /// OTLP traces signal (ExportTraceServiceRequest)
28    Traces,
29}
30
31/// Config used to build an `OtlpDeserializer`.
32#[configurable_component]
33#[derive(Debug, Clone)]
34pub struct OtlpDeserializerConfig {
35    /// Signal types to attempt parsing, in priority order.
36    ///
37    /// The deserializer will try parsing in the order specified. This allows you to optimize
38    /// performance when you know the expected signal types. For example, if you only receive
39    /// traces, set this to `["traces"]` to avoid attempting to parse as logs or metrics first.
40    ///
41    /// If not specified, defaults to trying all types in order: logs, metrics, traces.
42    /// Duplicate signal types are automatically removed while preserving order.
43    #[serde(default = "default_signal_types")]
44    pub signal_types: IndexSet<OtlpSignalType>,
45}
46
47fn default_signal_types() -> IndexSet<OtlpSignalType> {
48    IndexSet::from([
49        OtlpSignalType::Logs,
50        OtlpSignalType::Metrics,
51        OtlpSignalType::Traces,
52    ])
53}
54
55impl Default for OtlpDeserializerConfig {
56    fn default() -> Self {
57        Self {
58            signal_types: default_signal_types(),
59        }
60    }
61}
62
63impl OtlpDeserializerConfig {
64    /// Build the `OtlpDeserializer` from this configuration.
65    pub fn build(&self) -> OtlpDeserializer {
66        OtlpDeserializer::new_with_signals(self.signal_types.clone())
67    }
68
69    /// Return the type of event build by this deserializer.
70    pub fn output_type(&self) -> DataType {
71        DataType::Log | DataType::Trace
72    }
73
74    /// The schema produced by the deserializer.
75    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
76        match log_namespace {
77            LogNamespace::Legacy => {
78                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
79            }
80            LogNamespace::Vector => {
81                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
82            }
83        }
84    }
85}
86
87/// Deserializer that builds `Event`s from a byte frame containing [OTLP](https://opentelemetry.io/docs/specs/otlp/) protobuf data.
88///
89/// This deserializer decodes events using the OTLP protobuf specification. It handles the three
90/// OTLP signal types: logs, metrics, and traces.
91///
92/// The implementation supports three OTLP message types:
93/// - `ExportLogsServiceRequest` → Log events with `resourceLogs` field
94/// - `ExportMetricsServiceRequest` → Log events with `resourceMetrics` field
95/// - `ExportTraceServiceRequest` → Trace events with `resourceSpans` field
96///
97/// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
98/// This means that components that work on metrics, will not be compatible with this output.
99/// However, these events can be forwarded directly to a downstream OTEL collector.
100///
101/// This is the inverse of what the OTLP encoder does, ensuring round-trip compatibility
102/// with the `opentelemetry` source when `use_otlp_decoding` is enabled.
103#[derive(Debug, Clone)]
104pub struct OtlpDeserializer {
105    logs_deserializer: ProtobufDeserializer,
106    metrics_deserializer: ProtobufDeserializer,
107    traces_deserializer: ProtobufDeserializer,
108    /// Signal types to parse, in priority order
109    signals: IndexSet<OtlpSignalType>,
110}
111
112impl Default for OtlpDeserializer {
113    fn default() -> Self {
114        Self::new_with_signals(default_signal_types())
115    }
116}
117
118impl OtlpDeserializer {
119    /// Creates a new OTLP deserializer with custom signal support.
120    /// During parsing, each signal type is tried in order until one succeeds.
121    pub fn new_with_signals(signals: IndexSet<OtlpSignalType>) -> Self {
122        let options = Options {
123            use_json_names: true,
124        };
125
126        let logs_deserializer = ProtobufDeserializer::new_from_bytes(
127            DESCRIPTOR_BYTES,
128            LOGS_REQUEST_MESSAGE_TYPE,
129            options.clone(),
130        )
131        .expect("Failed to create logs deserializer");
132
133        let metrics_deserializer = ProtobufDeserializer::new_from_bytes(
134            DESCRIPTOR_BYTES,
135            METRICS_REQUEST_MESSAGE_TYPE,
136            options.clone(),
137        )
138        .expect("Failed to create metrics deserializer");
139
140        let traces_deserializer = ProtobufDeserializer::new_from_bytes(
141            DESCRIPTOR_BYTES,
142            TRACES_REQUEST_MESSAGE_TYPE,
143            options,
144        )
145        .expect("Failed to create traces deserializer");
146
147        Self {
148            logs_deserializer,
149            metrics_deserializer,
150            traces_deserializer,
151            signals,
152        }
153    }
154}
155
156impl Deserializer for OtlpDeserializer {
157    fn parse(
158        &self,
159        bytes: Bytes,
160        log_namespace: LogNamespace,
161    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
162        // Try parsing in the priority order specified
163        for signal_type in &self.signals {
164            match signal_type {
165                OtlpSignalType::Logs => {
166                    if let Ok(events) = self.logs_deserializer.parse(bytes.clone(), log_namespace)
167                        && let Some(Event::Log(log)) = events.first()
168                        && log.get(RESOURCE_LOGS_JSON_FIELD).is_some()
169                    {
170                        return Ok(events);
171                    }
172                }
173                OtlpSignalType::Metrics => {
174                    if let Ok(events) = self
175                        .metrics_deserializer
176                        .parse(bytes.clone(), log_namespace)
177                        && let Some(Event::Log(log)) = events.first()
178                        && log.get(RESOURCE_METRICS_JSON_FIELD).is_some()
179                    {
180                        return Ok(events);
181                    }
182                }
183                OtlpSignalType::Traces => {
184                    if let Ok(mut events) =
185                        self.traces_deserializer.parse(bytes.clone(), log_namespace)
186                        && let Some(Event::Log(log)) = events.first()
187                        && log.get(RESOURCE_SPANS_JSON_FIELD).is_some()
188                    {
189                        // Convert the log event to a trace event by taking ownership
190                        if let Some(Event::Log(log)) = events.pop() {
191                            let trace_event = Event::Trace(log.into());
192                            return Ok(smallvec![trace_event]);
193                        }
194                    }
195                }
196            }
197        }
198
199        Err(format!("Invalid OTLP data: expected one of {:?}", self.signals).into())
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use opentelemetry_proto::proto::{
206        collector::{
207            logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest,
208            trace::v1::ExportTraceServiceRequest,
209        },
210        logs::v1::{LogRecord, ResourceLogs, ScopeLogs},
211        metrics::v1::{Metric, ResourceMetrics, ScopeMetrics},
212        resource::v1::Resource,
213        trace::v1::{ResourceSpans, ScopeSpans, Span},
214    };
215    use prost::Message;
216
217    use super::*;
218
219    // trace_id: 0102030405060708090a0b0c0d0e0f10 (16 bytes)
220    const TEST_TRACE_ID: [u8; 16] = [
221        0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
222        0x10,
223    ];
224    // span_id: 0102030405060708 (8 bytes)
225    const TEST_SPAN_ID: [u8; 8] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
226
227    fn create_logs_request_bytes() -> Bytes {
228        let request = ExportLogsServiceRequest {
229            resource_logs: vec![ResourceLogs {
230                resource: Some(Resource {
231                    attributes: vec![],
232                    dropped_attributes_count: 0,
233                }),
234                scope_logs: vec![ScopeLogs {
235                    scope: None,
236                    log_records: vec![LogRecord {
237                        time_unix_nano: 1234567890,
238                        severity_number: 9,
239                        severity_text: "INFO".to_string(),
240                        body: None,
241                        attributes: vec![],
242                        dropped_attributes_count: 0,
243                        flags: 0,
244                        trace_id: vec![],
245                        span_id: vec![],
246                        observed_time_unix_nano: 0,
247                    }],
248                    schema_url: String::new(),
249                }],
250                schema_url: String::new(),
251            }],
252        };
253
254        Bytes::from(request.encode_to_vec())
255    }
256
257    fn create_metrics_request_bytes() -> Bytes {
258        let request = ExportMetricsServiceRequest {
259            resource_metrics: vec![ResourceMetrics {
260                resource: Some(Resource {
261                    attributes: vec![],
262                    dropped_attributes_count: 0,
263                }),
264                scope_metrics: vec![ScopeMetrics {
265                    scope: None,
266                    metrics: vec![Metric {
267                        name: "test_metric".to_string(),
268                        description: String::new(),
269                        unit: String::new(),
270                        data: None,
271                    }],
272                    schema_url: String::new(),
273                }],
274                schema_url: String::new(),
275            }],
276        };
277
278        Bytes::from(request.encode_to_vec())
279    }
280
281    fn create_traces_request_bytes() -> Bytes {
282        let request = ExportTraceServiceRequest {
283            resource_spans: vec![ResourceSpans {
284                resource: Some(Resource {
285                    attributes: vec![],
286                    dropped_attributes_count: 0,
287                }),
288                scope_spans: vec![ScopeSpans {
289                    scope: None,
290                    spans: vec![Span {
291                        trace_id: TEST_TRACE_ID.to_vec(),
292                        span_id: TEST_SPAN_ID.to_vec(),
293                        trace_state: String::new(),
294                        parent_span_id: vec![],
295                        name: "test_span".to_string(),
296                        kind: 0,
297                        start_time_unix_nano: 1234567890,
298                        end_time_unix_nano: 1234567900,
299                        attributes: vec![],
300                        dropped_attributes_count: 0,
301                        events: vec![],
302                        dropped_events_count: 0,
303                        links: vec![],
304                        dropped_links_count: 0,
305                        status: None,
306                    }],
307                    schema_url: String::new(),
308                }],
309                schema_url: String::new(),
310            }],
311        };
312
313        Bytes::from(request.encode_to_vec())
314    }
315
316    fn validate_trace_ids(trace: &vrl::value::Value) {
317        // Navigate to the span and check traceId and spanId
318        let resource_spans = trace
319            .get("resourceSpans")
320            .and_then(|v| v.as_array())
321            .expect("resourceSpans should be an array");
322
323        let first_rs = resource_spans
324            .first()
325            .expect("should have at least one resource span");
326
327        let scope_spans = first_rs
328            .get("scopeSpans")
329            .and_then(|v| v.as_array())
330            .expect("scopeSpans should be an array");
331
332        let first_ss = scope_spans
333            .first()
334            .expect("should have at least one scope span");
335
336        let spans = first_ss
337            .get("spans")
338            .and_then(|v| v.as_array())
339            .expect("spans should be an array");
340
341        let span = spans.first().expect("should have at least one span");
342
343        // Verify traceId - should be raw bytes (16 bytes for trace_id)
344        let trace_id = span
345            .get("traceId")
346            .and_then(|v| v.as_bytes())
347            .expect("traceId should exist and be bytes");
348
349        assert_eq!(
350            trace_id.as_ref(),
351            &TEST_TRACE_ID,
352            "traceId should match the expected 16 bytes (0102030405060708090a0b0c0d0e0f10)"
353        );
354
355        // Verify spanId - should be raw bytes (8 bytes for span_id)
356        let span_id = span
357            .get("spanId")
358            .and_then(|v| v.as_bytes())
359            .expect("spanId should exist and be bytes");
360
361        assert_eq!(
362            span_id.as_ref(),
363            &TEST_SPAN_ID,
364            "spanId should match the expected 8 bytes (0102030405060708)"
365        );
366    }
367
368    fn assert_otlp_event(bytes: Bytes, field: &str, is_trace: bool) {
369        let deserializer = OtlpDeserializer::default();
370        let events = deserializer.parse(bytes, LogNamespace::Legacy).unwrap();
371
372        assert_eq!(events.len(), 1);
373        if is_trace {
374            assert!(matches!(events[0], Event::Trace(_)));
375            let trace = events[0].as_trace();
376            assert!(trace.get(field).is_some());
377            validate_trace_ids(trace.value());
378        } else {
379            assert!(events[0].as_log().get(field).is_some());
380        }
381    }
382
383    #[test]
384    fn deserialize_otlp_logs() {
385        assert_otlp_event(create_logs_request_bytes(), RESOURCE_LOGS_JSON_FIELD, false);
386    }
387
388    #[test]
389    fn deserialize_otlp_metrics() {
390        assert_otlp_event(
391            create_metrics_request_bytes(),
392            RESOURCE_METRICS_JSON_FIELD,
393            false,
394        );
395    }
396
397    #[test]
398    fn deserialize_otlp_traces() {
399        assert_otlp_event(
400            create_traces_request_bytes(),
401            RESOURCE_SPANS_JSON_FIELD,
402            true,
403        );
404    }
405
406    #[test]
407    fn deserialize_invalid_otlp() {
408        let deserializer = OtlpDeserializer::default();
409        let bytes = Bytes::from("invalid protobuf data");
410        let result = deserializer.parse(bytes, LogNamespace::Legacy);
411
412        assert!(result.is_err());
413        assert!(
414            result
415                .unwrap_err()
416                .to_string()
417                .contains("Invalid OTLP data")
418        );
419    }
420
421    #[test]
422    fn deserialize_with_custom_priority_traces_only() {
423        // Configure to only try traces - should succeed for traces, fail for others
424        let deserializer =
425            OtlpDeserializer::new_with_signals(IndexSet::from([OtlpSignalType::Traces]));
426
427        // Traces should work
428        let trace_bytes = create_traces_request_bytes();
429        let result = deserializer.parse(trace_bytes, LogNamespace::Legacy);
430        assert!(result.is_ok());
431        assert!(matches!(result.unwrap()[0], Event::Trace(_)));
432
433        // Logs should fail since we're not trying to parse logs
434        let log_bytes = create_logs_request_bytes();
435        let result = deserializer.parse(log_bytes, LogNamespace::Legacy);
436        assert!(result.is_err());
437    }
438}