codecs/encoding/format/
protobuf.rs

1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use prost_reflect::{prost::Message as _, MessageDescriptor};
4use std::path::PathBuf;
5use tokio_util::codec::Encoder;
6use vector_config_macros::configurable_component;
7use vector_core::{
8    config::DataType,
9    event::{Event, Value},
10    schema,
11};
12
13/// Config used to build a `ProtobufSerializer`.
14#[configurable_component]
15#[derive(Debug, Clone)]
16pub struct ProtobufSerializerConfig {
17    /// Options for the Protobuf serializer.
18    pub protobuf: ProtobufSerializerOptions,
19}
20
21impl ProtobufSerializerConfig {
22    /// Build the `ProtobufSerializer` from this configuration.
23    pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
24        let message_descriptor = vrl::protobuf::get_message_descriptor(
25            &self.protobuf.desc_file,
26            &self.protobuf.message_type,
27        )?;
28        Ok(ProtobufSerializer { message_descriptor })
29    }
30
31    /// The data type of events that are accepted by `ProtobufSerializer`.
32    pub fn input_type(&self) -> DataType {
33        DataType::Log | DataType::Trace
34    }
35
36    /// The schema required by the serializer.
37    pub fn schema_requirement(&self) -> schema::Requirement {
38        // While technically we support `Value` variants that can't be losslessly serialized to
39        // Protobuf, we don't want to enforce that limitation to users yet.
40        schema::Requirement::empty()
41    }
42}
43
44/// Protobuf serializer options.
45#[configurable_component]
46#[derive(Debug, Clone)]
47pub struct ProtobufSerializerOptions {
48    /// The path to the protobuf descriptor set file.
49    ///
50    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`
51    ///
52    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
53    #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
54    pub desc_file: PathBuf,
55
56    /// The name of the message type to use for serializing.
57    #[configurable(metadata(docs::examples = "package.Message"))]
58    pub message_type: String,
59}
60
61/// Serializer that converts an `Event` to bytes using the Protobuf format.
62#[derive(Debug, Clone)]
63pub struct ProtobufSerializer {
64    /// The protobuf message definition to use for serialization.
65    message_descriptor: MessageDescriptor,
66}
67
68impl ProtobufSerializer {
69    /// Creates a new `ProtobufSerializer`.
70    pub fn new(message_descriptor: MessageDescriptor) -> Self {
71        Self { message_descriptor }
72    }
73
74    /// Get a description of the message type used in serialization.
75    pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
76        self.message_descriptor.descriptor_proto()
77    }
78}
79
80impl Encoder<Event> for ProtobufSerializer {
81    type Error = vector_common::Error;
82
83    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
84        let message = match event {
85            Event::Log(log) => {
86                vrl::protobuf::encode_message(&self.message_descriptor, log.into_parts().0)
87            }
88            Event::Metric(_) => unimplemented!(),
89            Event::Trace(trace) => vrl::protobuf::encode_message(
90                &self.message_descriptor,
91                Value::Object(trace.into_parts().0),
92            ),
93        }?;
94        message.encode(buffer).map_err(Into::into)
95    }
96}