use crate::encoding::BuildError;
use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AvroSerializerConfig {
pub avro: AvroSerializerOptions,
}
impl AvroSerializerConfig {
pub const fn new(schema: String) -> Self {
Self {
avro: AvroSerializerOptions { schema },
}
}
pub fn build(&self) -> Result<AvroSerializer, BuildError> {
let schema = apache_avro::Schema::parse_str(&self.avro.schema)
.map_err(|error| format!("Failed building Avro serializer: {}", error))?;
Ok(AvroSerializer { schema })
}
pub fn input_type(&self) -> DataType {
DataType::Log
}
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty()
}
}
#[configurable_component]
#[derive(Clone, Debug)]
pub struct AvroSerializerOptions {
#[configurable(metadata(
docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#
))]
#[configurable(metadata(docs::human_name = "Schema JSON"))]
pub schema: String,
}
#[derive(Debug, Clone)]
pub struct AvroSerializer {
schema: apache_avro::Schema,
}
impl AvroSerializer {
pub const fn new(schema: apache_avro::Schema) -> Self {
Self { schema }
}
}
impl Encoder<Event> for AvroSerializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let log = event.into_log();
let value = apache_avro::to_value(log)?;
let value = value.resolve(&self.schema)?;
let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
buffer.put_slice(&bytes);
Ok(())
}
}
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use indoc::indoc;
use vector_core::event::{LogEvent, Value};
use vrl::btreemap;
use super::*;
#[test]
fn serialize_avro() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let schema = indoc! {r#"
{
"type": "record",
"name": "Log",
"fields": [
{
"name": "foo",
"type": ["string"]
}
]
}
"#}
.to_owned();
let config = AvroSerializerConfig::new(schema);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();
serializer.encode(event, &mut bytes).unwrap();
assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
}
}