codecs/encoding/format/
avro.rs

1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7use crate::encoding::BuildError;
8
9/// Config used to build a `AvroSerializer`.
10#[derive(Debug, Clone, Deserialize, Serialize)]
11pub struct AvroSerializerConfig {
12    /// Options for the Avro serializer.
13    pub avro: AvroSerializerOptions,
14}
15
16impl AvroSerializerConfig {
17    /// Creates a new `AvroSerializerConfig`.
18    pub const fn new(schema: String) -> Self {
19        Self {
20            avro: AvroSerializerOptions { schema },
21        }
22    }
23
24    /// Build the `AvroSerializer` from this configuration.
25    pub fn build(&self) -> Result<AvroSerializer, BuildError> {
26        let schema = apache_avro::Schema::parse_str(&self.avro.schema)
27            .map_err(|error| format!("Failed building Avro serializer: {error}"))?;
28        Ok(AvroSerializer { schema })
29    }
30
31    /// The data type of events that are accepted by `AvroSerializer`.
32    pub fn input_type(&self) -> DataType {
33        DataType::Log
34    }
35
36    /// The schema required by the serializer.
37    pub fn schema_requirement(&self) -> schema::Requirement {
38        // TODO: Convert the Avro schema to a vector schema requirement.
39        schema::Requirement::empty()
40    }
41}
42
43/// Apache Avro serializer options.
44#[configurable_component]
45#[derive(Clone, Debug)]
46pub struct AvroSerializerOptions {
47    /// The Avro schema.
48    #[configurable(metadata(
49        docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#
50    ))]
51    #[configurable(metadata(docs::human_name = "Schema JSON"))]
52    pub schema: String,
53}
54
55/// Serializer that converts an `Event` to bytes using the Apache Avro format.
56#[derive(Debug, Clone)]
57pub struct AvroSerializer {
58    schema: apache_avro::Schema,
59}
60
61impl AvroSerializer {
62    /// Creates a new `AvroSerializer`.
63    pub const fn new(schema: apache_avro::Schema) -> Self {
64        Self { schema }
65    }
66}
67
68impl Encoder<Event> for AvroSerializer {
69    type Error = vector_common::Error;
70
71    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
72        let log = event.into_log();
73        let value = apache_avro::to_value(log)?;
74        let value = value.resolve(&self.schema)?;
75        let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
76        buffer.put_slice(&bytes);
77        Ok(())
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use bytes::BytesMut;
84    use indoc::indoc;
85    use vector_core::event::{LogEvent, Value};
86    use vrl::btreemap;
87
88    use super::*;
89
90    #[test]
91    fn serialize_avro() {
92        let event = Event::Log(LogEvent::from(btreemap! {
93            "foo" => Value::from("bar")
94        }));
95        let schema = indoc! {r#"
96            {
97                "type": "record",
98                "name": "Log",
99                "fields": [
100                    {
101                        "name": "foo",
102                        "type": ["string"]
103                    }
104                ]
105            }
106        "#}
107        .to_owned();
108        let config = AvroSerializerConfig::new(schema);
109        let mut serializer = config.build().unwrap();
110        let mut bytes = BytesMut::new();
111
112        serializer.encode(event, &mut bytes).unwrap();
113
114        assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
115    }
116}