codecs/encoding/format/
otlp.rs1use crate::encoding::ProtobufSerializer;
2use bytes::BytesMut;
3use opentelemetry_proto::proto::{
4 DESCRIPTOR_BYTES, LOGS_REQUEST_MESSAGE_TYPE, METRICS_REQUEST_MESSAGE_TYPE,
5 RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
6 TRACES_REQUEST_MESSAGE_TYPE,
7};
8use tokio_util::codec::Encoder;
9use vector_config_macros::configurable_component;
10use vector_core::{config::DataType, event::Event, schema};
11use vrl::protobuf::encode::Options;
12
13#[configurable_component]
15#[derive(Debug, Clone, Default)]
16pub struct OtlpSerializerConfig {
17 }
19
20impl OtlpSerializerConfig {
21 pub fn build(&self) -> Result<OtlpSerializer, crate::encoding::BuildError> {
23 OtlpSerializer::new()
24 }
25
26 pub fn input_type(&self) -> DataType {
28 DataType::Log | DataType::Trace
29 }
30
31 pub fn schema_requirement(&self) -> schema::Requirement {
33 schema::Requirement::empty()
34 }
35}
36
37#[derive(Debug, Clone)]
54#[allow(dead_code)] pub struct OtlpSerializer {
56 logs_descriptor: ProtobufSerializer,
57 metrics_descriptor: ProtobufSerializer,
58 traces_descriptor: ProtobufSerializer,
59 options: Options,
60}
61
62impl OtlpSerializer {
63 pub fn new() -> vector_common::Result<Self> {
65 let options = Options {
66 use_json_names: true,
67 };
68
69 let logs_descriptor = ProtobufSerializer::new_from_bytes(
70 DESCRIPTOR_BYTES,
71 LOGS_REQUEST_MESSAGE_TYPE,
72 &options,
73 )?;
74
75 let metrics_descriptor = ProtobufSerializer::new_from_bytes(
76 DESCRIPTOR_BYTES,
77 METRICS_REQUEST_MESSAGE_TYPE,
78 &options,
79 )?;
80
81 let traces_descriptor = ProtobufSerializer::new_from_bytes(
82 DESCRIPTOR_BYTES,
83 TRACES_REQUEST_MESSAGE_TYPE,
84 &options,
85 )?;
86
87 Ok(Self {
88 logs_descriptor,
89 metrics_descriptor,
90 traces_descriptor,
91 options,
92 })
93 }
94}
95
96impl Encoder<Event> for OtlpSerializer {
97 type Error = vector_common::Error;
98
99 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
100 match &event {
104 Event::Log(log) => {
105 if log.contains(RESOURCE_LOGS_JSON_FIELD) {
106 self.logs_descriptor.encode(event, buffer)
107 } else if log.contains(RESOURCE_METRICS_JSON_FIELD) {
108 self.metrics_descriptor.encode(event, buffer)
110 } else {
111 Err(format!(
112 "Log event does not contain OTLP top-level fields ({RESOURCE_LOGS_JSON_FIELD} or {RESOURCE_METRICS_JSON_FIELD})",
113 )
114 .into())
115 }
116 }
117 Event::Trace(trace) => {
118 if trace.contains(RESOURCE_SPANS_JSON_FIELD) {
119 self.traces_descriptor.encode(event, buffer)
120 } else {
121 Err(format!(
122 "Trace event does not contain OTLP top-level field ({RESOURCE_SPANS_JSON_FIELD})",
123 )
124 .into())
125 }
126 }
127 Event::Metric(_) => {
128 Err("OTLP serializer does not support native Vector metrics yet.".into())
129 }
130 }
131 }
132}