codecs/encoding/format/
protobuf.rs1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use prost_reflect::{prost::Message as _, MessageDescriptor};
4use std::path::PathBuf;
5use tokio_util::codec::Encoder;
6use vector_config_macros::configurable_component;
7use vector_core::{
8 config::DataType,
9 event::{Event, Value},
10 schema,
11};
12
13#[configurable_component]
15#[derive(Debug, Clone)]
16pub struct ProtobufSerializerConfig {
17 pub protobuf: ProtobufSerializerOptions,
19}
20
21impl ProtobufSerializerConfig {
22 pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
24 let message_descriptor = vrl::protobuf::get_message_descriptor(
25 &self.protobuf.desc_file,
26 &self.protobuf.message_type,
27 )?;
28 Ok(ProtobufSerializer { message_descriptor })
29 }
30
31 pub fn input_type(&self) -> DataType {
33 DataType::Log | DataType::Trace
34 }
35
36 pub fn schema_requirement(&self) -> schema::Requirement {
38 schema::Requirement::empty()
41 }
42}
43
44#[configurable_component]
46#[derive(Debug, Clone)]
47pub struct ProtobufSerializerOptions {
48 #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
54 pub desc_file: PathBuf,
55
56 #[configurable(metadata(docs::examples = "package.Message"))]
58 pub message_type: String,
59}
60
61#[derive(Debug, Clone)]
63pub struct ProtobufSerializer {
64 message_descriptor: MessageDescriptor,
66}
67
68impl ProtobufSerializer {
69 pub fn new(message_descriptor: MessageDescriptor) -> Self {
71 Self { message_descriptor }
72 }
73
74 pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
76 self.message_descriptor.descriptor_proto()
77 }
78}
79
80impl Encoder<Event> for ProtobufSerializer {
81 type Error = vector_common::Error;
82
83 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
84 let message = match event {
85 Event::Log(log) => {
86 vrl::protobuf::encode_message(&self.message_descriptor, log.into_parts().0)
87 }
88 Event::Metric(_) => unimplemented!(),
89 Event::Trace(trace) => vrl::protobuf::encode_message(
90 &self.message_descriptor,
91 Value::Object(trace.into_parts().0),
92 ),
93 }?;
94 message.encode(buffer).map_err(Into::into)
95 }
96}