codecs/encoding/format/
avro.rs

1use crate::encoding::BuildError;
2use bytes::{BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::Encoder;
5use vector_config::configurable_component;
6use vector_core::{config::DataType, event::Event, schema};
7
8/// Config used to build a `AvroSerializer`.
9#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct AvroSerializerConfig {
11    /// Options for the Avro serializer.
12    pub avro: AvroSerializerOptions,
13}
14
15impl AvroSerializerConfig {
16    /// Creates a new `AvroSerializerConfig`.
17    pub const fn new(schema: String) -> Self {
18        Self {
19            avro: AvroSerializerOptions { schema },
20        }
21    }
22
23    /// Build the `AvroSerializer` from this configuration.
24    pub fn build(&self) -> Result<AvroSerializer, BuildError> {
25        let schema = apache_avro::Schema::parse_str(&self.avro.schema)
26            .map_err(|error| format!("Failed building Avro serializer: {error}"))?;
27        Ok(AvroSerializer { schema })
28    }
29
30    /// The data type of events that are accepted by `AvroSerializer`.
31    pub fn input_type(&self) -> DataType {
32        DataType::Log
33    }
34
35    /// The schema required by the serializer.
36    pub fn schema_requirement(&self) -> schema::Requirement {
37        // TODO: Convert the Avro schema to a vector schema requirement.
38        schema::Requirement::empty()
39    }
40}
41
42/// Apache Avro serializer options.
43#[configurable_component]
44#[derive(Clone, Debug)]
45pub struct AvroSerializerOptions {
46    /// The Avro schema.
47    #[configurable(metadata(
48        docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#
49    ))]
50    #[configurable(metadata(docs::human_name = "Schema JSON"))]
51    pub schema: String,
52}
53
54/// Serializer that converts an `Event` to bytes using the Apache Avro format.
55#[derive(Debug, Clone)]
56pub struct AvroSerializer {
57    schema: apache_avro::Schema,
58}
59
60impl AvroSerializer {
61    /// Creates a new `AvroSerializer`.
62    pub const fn new(schema: apache_avro::Schema) -> Self {
63        Self { schema }
64    }
65}
66
67impl Encoder<Event> for AvroSerializer {
68    type Error = vector_common::Error;
69
70    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
71        let log = event.into_log();
72        let value = apache_avro::to_value(log)?;
73        let value = value.resolve(&self.schema)?;
74        let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
75        buffer.put_slice(&bytes);
76        Ok(())
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use bytes::BytesMut;
83    use indoc::indoc;
84    use vector_core::event::{LogEvent, Value};
85    use vrl::btreemap;
86
87    use super::*;
88
89    #[test]
90    fn serialize_avro() {
91        let event = Event::Log(LogEvent::from(btreemap! {
92            "foo" => Value::from("bar")
93        }));
94        let schema = indoc! {r#"
95            {
96                "type": "record",
97                "name": "Log",
98                "fields": [
99                    {
100                        "name": "foo",
101                        "type": ["string"]
102                    }
103                ]
104            }
105        "#}
106        .to_owned();
107        let config = AvroSerializerConfig::new(schema);
108        let mut serializer = config.build().unwrap();
109        let mut bytes = BytesMut::new();
110
111        serializer.encode(event, &mut bytes).unwrap();
112
113        assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
114    }
115}