codecs/encoding/format/
protobuf.rs1use std::path::PathBuf;
2
3use crate::encoding::BuildError;
4use bytes::BytesMut;
5use prost_reflect::{MessageDescriptor, prost::Message as _};
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9 config::DataType,
10 event::{Event, Value},
11 schema,
12};
13use vrl::protobuf::{
14 descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
15 encode::{Options, encode_message},
16};
17
18#[configurable_component]
20#[derive(Debug, Clone)]
21pub struct ProtobufSerializerConfig {
22 pub protobuf: ProtobufSerializerOptions,
24}
25
26impl ProtobufSerializerConfig {
27 pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
29 let message_descriptor =
30 get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
31 Ok(ProtobufSerializer {
32 message_descriptor,
33 options: Options {
34 use_json_names: self.protobuf.use_json_names,
35 },
36 })
37 }
38
39 pub fn input_type(&self) -> DataType {
41 DataType::Log | DataType::Trace
42 }
43
44 pub fn schema_requirement(&self) -> schema::Requirement {
46 schema::Requirement::empty()
49 }
50}
51
52#[configurable_component]
54#[derive(Debug, Clone)]
55pub struct ProtobufSerializerOptions {
56 #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
62 pub desc_file: PathBuf,
63
64 #[configurable(metadata(docs::examples = "package.Message"))]
66 pub message_type: String,
67
68 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
76 pub use_json_names: bool,
77}
78
79#[derive(Debug, Clone)]
81pub struct ProtobufSerializer {
82 message_descriptor: MessageDescriptor,
84 options: Options,
85}
86
87impl ProtobufSerializer {
88 pub fn new(message_descriptor: MessageDescriptor) -> Self {
90 Self {
91 message_descriptor,
92 options: Options::default(),
93 }
94 }
95
96 pub fn new_from_bytes(
98 desc_bytes: &[u8],
99 message_type: &str,
100 options: &Options,
101 ) -> vector_common::Result<Self> {
102 let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
103 Ok(Self {
104 message_descriptor,
105 options: options.clone(),
106 })
107 }
108
109 pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
111 self.message_descriptor.descriptor_proto()
112 }
113}
114
115impl Encoder<Event> for ProtobufSerializer {
116 type Error = vector_common::Error;
117
118 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
119 let message = match event {
120 Event::Log(log) => {
121 encode_message(&self.message_descriptor, log.into_parts().0, &self.options)
122 }
123 Event::Metric(_) => unimplemented!(),
124 Event::Trace(trace) => encode_message(
125 &self.message_descriptor,
126 Value::Object(trace.into_parts().0),
127 &self.options,
128 ),
129 }?;
130 message.encode(buffer).map_err(Into::into)
131 }
132}