codecs/encoding/format/
protobuf.rs1use std::path::PathBuf;
2
3use bytes::BytesMut;
4use prost_reflect::{MessageDescriptor, prost::Message as _};
5use tokio_util::codec::Encoder;
6use vector_config_macros::configurable_component;
7use vector_core::{
8    config::DataType,
9    event::{Event, Value},
10    schema,
11};
12use vrl::protobuf::{descriptor::get_message_descriptor, encode::encode_message};
13
14use crate::encoding::BuildError;
15
16#[configurable_component]
18#[derive(Debug, Clone)]
19pub struct ProtobufSerializerConfig {
20    pub protobuf: ProtobufSerializerOptions,
22}
23
24impl ProtobufSerializerConfig {
25    pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
27        let message_descriptor =
28            get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
29        Ok(ProtobufSerializer { message_descriptor })
30    }
31
32    pub fn input_type(&self) -> DataType {
34        DataType::Log | DataType::Trace
35    }
36
37    pub fn schema_requirement(&self) -> schema::Requirement {
39        schema::Requirement::empty()
42    }
43}
44
45#[configurable_component]
47#[derive(Debug, Clone)]
48pub struct ProtobufSerializerOptions {
49    #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
55    pub desc_file: PathBuf,
56
57    #[configurable(metadata(docs::examples = "package.Message"))]
59    pub message_type: String,
60}
61
62#[derive(Debug, Clone)]
64pub struct ProtobufSerializer {
65    message_descriptor: MessageDescriptor,
67}
68
69impl ProtobufSerializer {
70    pub fn new(message_descriptor: MessageDescriptor) -> Self {
72        Self { message_descriptor }
73    }
74
75    pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
77        self.message_descriptor.descriptor_proto()
78    }
79}
80
81impl Encoder<Event> for ProtobufSerializer {
82    type Error = vector_common::Error;
83
84    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
85        let message = match event {
86            Event::Log(log) => encode_message(&self.message_descriptor, log.into_parts().0),
87            Event::Metric(_) => unimplemented!(),
88            Event::Trace(trace) => encode_message(
89                &self.message_descriptor,
90                Value::Object(trace.into_parts().0),
91            ),
92        }?;
93        message.encode(buffer).map_err(Into::into)
94    }
95}