codecs/encoding/format/
otlp.rs

1use crate::encoding::ProtobufSerializer;
2use bytes::BytesMut;
3use opentelemetry_proto::proto::{
4    DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
5    RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
6    TRACES_REQUEST_MESSAGE_TYPE,
7};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::Options;
12
13/// Config used to build an `OtlpSerializer`.
14#[configurable_component]
15#[derive(Debug, Clone, Default)]
16pub struct OtlpSerializerConfig {
17    // No configuration options needed - OTLP serialization is opinionated
18}
19
20impl OtlpSerializerConfig {
21    /// Build the `OtlpSerializer` from this configuration.
22    pub fn build(&self) -> Result<OtlpSerializer, crate::encoding::BuildError> {
23        OtlpSerializer::new()
24    }
25
26    /// The data type of events that are accepted by `OtlpSerializer`.
27    pub fn input_type(&self) -> DataType {
28        DataType::Log | DataType::Trace
29    }
30
31    /// The schema required by the serializer.
32    pub fn schema_requirement(&self) -> schema::Requirement {
33        schema::Requirement::empty()
34    }
35}
36
37/// Serializer that converts an `Event` to bytes using the OTLP (OpenTelemetry Protocol) protobuf format.
38///
39/// This serializer encodes events using the OTLP protobuf specification, which is the recommended
40/// encoding format for OpenTelemetry data. The output is suitable for sending to OTLP-compatible
41/// endpoints with `content-type: application/x-protobuf`.
42///
43/// # Implementation approach
44///
45/// This serializer converts Vector's internal event representation to the appropriate OTLP message type
46/// based on the top-level field in the event:
47/// - `resourceLogs` → `ExportLogsServiceRequest`
48/// - `resourceMetrics` → `ExportMetricsServiceRequest`
49/// - `resourceSpans` → `ExportTraceServiceRequest`
50///
51/// The implementation is the inverse of what the `opentelemetry` source does when decoding,
52/// ensuring round-trip compatibility.
53#[derive(Debug, Clone)]
54#[allow(dead_code)] // Fields will be used once encoding is implemented
55pub struct OtlpSerializer {
56    logs_descriptor: ProtobufSerializer,
57    metrics_descriptor: ProtobufSerializer,
58    traces_descriptor: ProtobufSerializer,
59    options: Options,
60}
61
62impl OtlpSerializer {
63    /// Creates a new OTLP serializer with the appropriate message descriptors.
64    pub fn new() -> vector_common::Result<Self> {
65        let options = Options {
66            use_json_names: true,
67        };
68
69        let logs_descriptor = ProtobufSerializer::new_from_bytes(
70            DESCRIPTOR_BYTES,
71            LOGS_REQUEST_MESSAGE_TYPE,
72            &options,
73        )?;
74
75        let metrics_descriptor = ProtobufSerializer::new_from_bytes(
76            DESCRIPTOR_BYTES,
77            METRICS_REQUEST_MESSAGE_TYPE,
78            &options,
79        )?;
80
81        let traces_descriptor = ProtobufSerializer::new_from_bytes(
82            DESCRIPTOR_BYTES,
83            TRACES_REQUEST_MESSAGE_TYPE,
84            &options,
85        )?;
86
87        Ok(Self {
88            logs_descriptor,
89            metrics_descriptor,
90            traces_descriptor,
91            options,
92        })
93    }
94}
95
96impl Encoder<Event> for OtlpSerializer {
97    type Error = vector_common::Error;
98
99    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
100        // Determine which descriptor to use based on top-level OTLP fields
101        // This handles events that were decoded with use_otlp_decoding enabled
102        // The deserializer uses use_json_names: true, so fields are in camelCase
103        match &event {
104            Event::Log(log) => {
105                if log.contains(RESOURCE_LOGS_JSON_FIELD) {
106                    self.logs_descriptor.encode(event, buffer)
107                } else if log.contains(RESOURCE_METRICS_JSON_FIELD) {
108                    // Currently the OTLP metrics are Vector logs (not metrics).
109                    self.metrics_descriptor.encode(event, buffer)
110                } else {
111                    Err(format!(
112                        "Log event does not contain OTLP top-level fields ({RESOURCE_LOGS_JSON_FIELD} or {RESOURCE_METRICS_JSON_FIELD})",
113                    )
114                        .into())
115                }
116            }
117            Event::Trace(trace) => {
118                if trace.contains(RESOURCE_SPANS_JSON_FIELD) {
119                    self.traces_descriptor.encode(event, buffer)
120                } else {
121                    Err(format!(
122                        "Trace event does not contain OTLP top-level field ({RESOURCE_SPANS_JSON_FIELD})",
123                    )
124                        .into())
125                }
126            }
127            Event::Metric(_) => {
128                Err("OTLP serializer does not support native Vector metrics yet.".into())
129            }
130        }
131    }
132}