codecs/encoding/format/
protobuf.rs

1use std::path::PathBuf;
2
3use crate::encoding::BuildError;
4use bytes::BytesMut;
5use prost_reflect::{MessageDescriptor, prost::Message as _};
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9    config::DataType,
10    event::{Event, Value},
11    schema,
12};
13use vrl::protobuf::{
14    descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
15    encode::{Options, encode_message},
16};
17
18/// Config used to build a `ProtobufSerializer`.
19#[configurable_component]
20#[derive(Debug, Clone)]
21pub struct ProtobufSerializerConfig {
22    /// Options for the Protobuf serializer.
23    pub protobuf: ProtobufSerializerOptions,
24}
25
26impl ProtobufSerializerConfig {
27    /// Build the `ProtobufSerializer` from this configuration.
28    pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
29        let message_descriptor =
30            get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?;
31        Ok(ProtobufSerializer {
32            message_descriptor,
33            options: Options {
34                use_json_names: self.protobuf.use_json_names,
35            },
36        })
37    }
38
39    /// The data type of events that are accepted by `ProtobufSerializer`.
40    pub fn input_type(&self) -> DataType {
41        DataType::Log | DataType::Trace
42    }
43
44    /// The schema required by the serializer.
45    pub fn schema_requirement(&self) -> schema::Requirement {
46        // While technically we support `Value` variants that can't be losslessly serialized to
47        // Protobuf, we don't want to enforce that limitation to users yet.
48        schema::Requirement::empty()
49    }
50}
51
52/// Protobuf serializer options.
53#[configurable_component]
54#[derive(Debug, Clone)]
55pub struct ProtobufSerializerOptions {
56    /// The path to the protobuf descriptor set file.
57    ///
58    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`
59    ///
60    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
61    #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
62    pub desc_file: PathBuf,
63
64    /// The name of the message type to use for serializing.
65    #[configurable(metadata(docs::examples = "package.Message"))]
66    pub message_type: String,
67
68    /// Use JSON field names (camelCase) instead of protobuf field names (snake_case).
69    ///
70    /// When enabled, the serializer looks for fields using their JSON names as defined
71    /// in the `.proto` file (for example `jobDescription` instead of `job_description`).
72    ///
73    /// This is useful when working with data that has already been converted from JSON or
74    /// when interfacing with systems that use JSON naming conventions.
75    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
76    pub use_json_names: bool,
77}
78
79/// Serializer that converts an `Event` to bytes using the Protobuf format.
80#[derive(Debug, Clone)]
81pub struct ProtobufSerializer {
82    /// The protobuf message definition to use for serialization.
83    message_descriptor: MessageDescriptor,
84    options: Options,
85}
86
87impl ProtobufSerializer {
88    /// Creates a new `ProtobufSerializer`.
89    pub fn new(message_descriptor: MessageDescriptor) -> Self {
90        Self {
91            message_descriptor,
92            options: Options::default(),
93        }
94    }
95
96    /// Creates a new serializer instance using the descriptor bytes directly.
97    pub fn new_from_bytes(
98        desc_bytes: &[u8],
99        message_type: &str,
100        options: &Options,
101    ) -> vector_common::Result<Self> {
102        let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
103        Ok(Self {
104            message_descriptor,
105            options: options.clone(),
106        })
107    }
108
109    /// Get a description of the message type used in serialization.
110    pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
111        self.message_descriptor.descriptor_proto()
112    }
113}
114
115impl Encoder<Event> for ProtobufSerializer {
116    type Error = vector_common::Error;
117
118    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
119        let message = match event {
120            Event::Log(log) => {
121                encode_message(&self.message_descriptor, log.into_parts().0, &self.options)
122            }
123            Event::Metric(_) => unimplemented!(),
124            Event::Trace(trace) => encode_message(
125                &self.message_descriptor,
126                Value::Object(trace.into_parts().0),
127                &self.options,
128            ),
129        }?;
130        message.encode(buffer).map_err(Into::into)
131    }
132}