codecs/encoding/format/
avro.rs1use crate::encoding::BuildError;
2use bytes::{BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::Encoder;
5use vector_config::configurable_component;
6use vector_core::{config::DataType, event::Event, schema};
7
8#[derive(Debug, Clone, Deserialize, Serialize)]
10pub struct AvroSerializerConfig {
11 pub avro: AvroSerializerOptions,
13}
14
15impl AvroSerializerConfig {
16 pub const fn new(schema: String) -> Self {
18 Self {
19 avro: AvroSerializerOptions { schema },
20 }
21 }
22
23 pub fn build(&self) -> Result<AvroSerializer, BuildError> {
25 let schema = apache_avro::Schema::parse_str(&self.avro.schema)
26 .map_err(|error| format!("Failed building Avro serializer: {error}"))?;
27 Ok(AvroSerializer { schema })
28 }
29
30 pub fn input_type(&self) -> DataType {
32 DataType::Log
33 }
34
35 pub fn schema_requirement(&self) -> schema::Requirement {
37 schema::Requirement::empty()
39 }
40}
41
42#[configurable_component]
44#[derive(Clone, Debug)]
45pub struct AvroSerializerOptions {
46 #[configurable(metadata(
48 docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#
49 ))]
50 #[configurable(metadata(docs::human_name = "Schema JSON"))]
51 pub schema: String,
52}
53
54#[derive(Debug, Clone)]
56pub struct AvroSerializer {
57 schema: apache_avro::Schema,
58}
59
60impl AvroSerializer {
61 pub const fn new(schema: apache_avro::Schema) -> Self {
63 Self { schema }
64 }
65}
66
67impl Encoder<Event> for AvroSerializer {
68 type Error = vector_common::Error;
69
70 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
71 let log = event.into_log();
72 let value = apache_avro::to_value(log)?;
73 let value = value.resolve(&self.schema)?;
74 let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
75 buffer.put_slice(&bytes);
76 Ok(())
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use bytes::BytesMut;
83 use indoc::indoc;
84 use vector_core::event::{LogEvent, Value};
85 use vrl::btreemap;
86
87 use super::*;
88
89 #[test]
90 fn serialize_avro() {
91 let event = Event::Log(LogEvent::from(btreemap! {
92 "foo" => Value::from("bar")
93 }));
94 let schema = indoc! {r#"
95 {
96 "type": "record",
97 "name": "Log",
98 "fields": [
99 {
100 "name": "foo",
101 "type": ["string"]
102 }
103 ]
104 }
105 "#}
106 .to_owned();
107 let config = AvroSerializerConfig::new(schema);
108 let mut serializer = config.build().unwrap();
109 let mut bytes = BytesMut::new();
110
111 serializer.encode(event, &mut bytes).unwrap();
112
113 assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
114 }
115}