use crate::encoding::BuildError;
use bytes::BytesMut;
use prost_reflect::{prost::Message as _, MessageDescriptor};
use std::path::PathBuf;
use tokio_util::codec::Encoder;
use vector_config_macros::configurable_component;
use vector_core::{
config::DataType,
event::{Event, Value},
schema,
};
#[configurable_component]
#[derive(Debug, Clone)]
pub struct ProtobufSerializerConfig {
pub protobuf: ProtobufSerializerOptions,
}
impl ProtobufSerializerConfig {
pub fn build(&self) -> Result<ProtobufSerializer, BuildError> {
let message_descriptor = vrl::protobuf::get_message_descriptor(
&self.protobuf.desc_file,
&self.protobuf.message_type,
)?;
Ok(ProtobufSerializer { message_descriptor })
}
pub fn input_type(&self) -> DataType {
DataType::Log
}
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty()
}
}
#[configurable_component]
#[derive(Debug, Clone)]
pub struct ProtobufSerializerOptions {
#[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))]
pub desc_file: PathBuf,
#[configurable(metadata(docs::examples = "package.Message"))]
pub message_type: String,
}
#[derive(Debug, Clone)]
pub struct ProtobufSerializer {
message_descriptor: MessageDescriptor,
}
impl ProtobufSerializer {
pub fn new(message_descriptor: MessageDescriptor) -> Self {
Self { message_descriptor }
}
pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto {
self.message_descriptor.descriptor_proto()
}
}
impl Encoder<Event> for ProtobufSerializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let message = match event {
Event::Log(log) => {
vrl::protobuf::encode_message(&self.message_descriptor, log.into_parts().0)
}
Event::Metric(_) => unimplemented!(),
Event::Trace(trace) => vrl::protobuf::encode_message(
&self.message_descriptor,
Value::Object(trace.into_parts().0),
),
}?;
message.encode(buffer).map_err(Into::into)
}
}