codecs/decoding/format/
avro.rs

1use super::Deserializer;
2use crate::encoding::AvroSerializerOptions;
3use bytes::Buf;
4use bytes::Bytes;
5use chrono::Utc;
6use lookup::event_path;
7use serde::{Deserialize, Serialize};
8use smallvec::{smallvec, SmallVec};
9use vector_config::configurable_component;
10use vector_core::{
11    config::{log_schema, DataType, LogNamespace},
12    event::{Event, LogEvent},
13    schema,
14};
15use vrl::value::KeyString;
16
17type VrlValue = vrl::value::Value;
18type AvroValue = apache_avro::types::Value;
19
20const CONFLUENT_MAGIC_BYTE: u8 = 0;
21const CONFLUENT_SCHEMA_PREFIX_LEN: usize = 5;
22
23/// Config used to build a `AvroDeserializer`.
24#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct AvroDeserializerConfig {
26    /// Options for the Avro deserializer.
27    pub avro_options: AvroDeserializerOptions,
28}
29
30impl AvroDeserializerConfig {
31    /// Creates a new `AvroDeserializerConfig`.
32    pub const fn new(schema: String, strip_schema_id_prefix: bool) -> Self {
33        Self {
34            avro_options: AvroDeserializerOptions {
35                schema,
36                strip_schema_id_prefix,
37            },
38        }
39    }
40
41    /// Build the `AvroDeserializer` from this configuration.
42    pub fn build(&self) -> AvroDeserializer {
43        let schema = apache_avro::Schema::parse_str(&self.avro_options.schema)
44            .map_err(|error| format!("Failed building Avro serializer: {error}"))
45            .unwrap();
46        AvroDeserializer {
47            schema,
48            strip_schema_id_prefix: self.avro_options.strip_schema_id_prefix,
49        }
50    }
51
52    /// The data type of events that are accepted by `AvroDeserializer`.
53    pub fn output_type(&self) -> DataType {
54        DataType::Log
55    }
56
57    /// The schema required by the serializer.
58    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59        match log_namespace {
60            LogNamespace::Legacy => {
61                let mut definition = schema::Definition::empty_legacy_namespace()
62                    .unknown_fields(vrl::value::Kind::any());
63
64                if let Some(timestamp_key) = log_schema().timestamp_key() {
65                    definition = definition.try_with_field(
66                        timestamp_key,
67                        vrl::value::Kind::any().or_timestamp(),
68                        Some("timestamp"),
69                    );
70                }
71                definition
72            }
73            LogNamespace::Vector => schema::Definition::new_with_default_metadata(
74                vrl::value::Kind::any(),
75                [log_namespace],
76            ),
77        }
78    }
79}
80
81impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
82    fn from(value: &AvroDeserializerOptions) -> Self {
83        Self {
84            schema: value.schema.clone(),
85        }
86    }
87}
88/// Apache Avro serializer options.
89#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct AvroDeserializerOptions {
92    /// The Avro schema definition.
93    /// **Note**: The following [`apache_avro::types::Value`] variants are *not* supported:
94    /// * `Date`
95    /// * `Decimal`
96    /// * `Duration`
97    /// * `Fixed`
98    /// * `TimeMillis`
99    #[configurable(metadata(
100        docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#,
101        docs::additional_props_description = r#"Supports most avro data types, unsupported data types includes
102        ["decimal", "duration", "local-timestamp-millis", "local-timestamp-micros"]"#,
103    ))]
104    pub schema: String,
105
106    /// For Avro datum encoded in Kafka messages, the bytes are prefixed with the schema ID.  Set this to `true` to strip the schema ID prefix.
107    /// According to [Confluent Kafka's document](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).
108    pub strip_schema_id_prefix: bool,
109}
110
111/// Serializer that converts bytes to an `Event` using the Apache Avro format.
112#[derive(Debug, Clone)]
113pub struct AvroDeserializer {
114    schema: apache_avro::Schema,
115    strip_schema_id_prefix: bool,
116}
117
118impl AvroDeserializer {
119    /// Creates a new `AvroDeserializer`.
120    pub const fn new(schema: apache_avro::Schema, strip_schema_id_prefix: bool) -> Self {
121        Self {
122            schema,
123            strip_schema_id_prefix,
124        }
125    }
126}
127
128impl Deserializer for AvroDeserializer {
129    fn parse(
130        &self,
131        bytes: Bytes,
132        log_namespace: LogNamespace,
133    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
134        // Avro has a `null` type which indicates no value.
135        if bytes.is_empty() {
136            return Ok(smallvec![]);
137        }
138
139        let bytes = if self.strip_schema_id_prefix {
140            if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
141                bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
142            } else {
143                return Err(vector_common::Error::from(
144                    "Expected avro datum to be prefixed with schema id",
145                ));
146            }
147        } else {
148            bytes
149        };
150
151        let value = apache_avro::from_avro_datum(&self.schema, &mut bytes.reader(), None)?;
152
153        let apache_avro::types::Value::Record(fields) = value else {
154            return Err(vector_common::Error::from("Expected an avro Record"));
155        };
156
157        let mut log = LogEvent::default();
158        for (k, v) in fields {
159            log.insert(event_path!(k.as_str()), try_from(v)?);
160        }
161
162        let mut event = Event::Log(log);
163        let event = match log_namespace {
164            LogNamespace::Vector => event,
165            LogNamespace::Legacy => {
166                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
167                    let log = event.as_mut_log();
168                    if !log.contains(timestamp_key) {
169                        let timestamp = Utc::now();
170                        log.insert(timestamp_key, timestamp);
171                    }
172                }
173                event
174            }
175        };
176        Ok(smallvec![event])
177    }
178}
179
180// Can't use std::convert::TryFrom because of orphan rules
181pub fn try_from(value: AvroValue) -> vector_common::Result<VrlValue> {
182    // Very similar to avro to json see `impl std::convert::TryFrom<AvroValue> for serde_json::Value`
183    // LogEvent has native support for bytes, so it is used for Bytes and Fixed
184    match value {
185        AvroValue::Array(array) => {
186            let mut vector = Vec::new();
187            for item in array {
188                vector.push(try_from(item)?);
189            }
190            Ok(VrlValue::Array(vector))
191        }
192        AvroValue::Boolean(boolean) => Ok(VrlValue::from(boolean)),
193        AvroValue::Bytes(bytes) => Ok(VrlValue::from(bytes)),
194        AvroValue::Date(_) => Err(vector_common::Error::from(
195            "AvroValue::Date is not supported",
196        )),
197        AvroValue::Decimal(_) => Err(vector_common::Error::from(
198            "AvroValue::Decimal is not supported",
199        )),
200        AvroValue::Double(double) => Ok(VrlValue::from_f64_or_zero(double)),
201        AvroValue::Duration(_) => Err(vector_common::Error::from(
202            "AvroValue::Duration is not supported",
203        )),
204        AvroValue::Enum(_, string) => Ok(VrlValue::from(string)),
205        AvroValue::Fixed(_, _) => Err(vector_common::Error::from(
206            "AvroValue::Fixed is not supported",
207        )),
208        AvroValue::Float(float) => Ok(VrlValue::from_f64_or_zero(float as f64)),
209        AvroValue::Int(int) => Ok(VrlValue::from(int)),
210        AvroValue::Long(long) => Ok(VrlValue::from(long)),
211        AvroValue::Map(items) => items
212            .into_iter()
213            .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
214            .collect::<Result<Vec<_>, _>>()
215            .map(|v| VrlValue::Object(v.into_iter().collect())),
216        AvroValue::Null => Ok(VrlValue::Null),
217        AvroValue::Record(items) => items
218            .into_iter()
219            .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
220            .collect::<Result<Vec<_>, _>>()
221            .map(|v| VrlValue::Object(v.into_iter().collect())),
222        AvroValue::String(string) => Ok(VrlValue::from(string)),
223        AvroValue::TimeMicros(time_micros) => Ok(VrlValue::from(time_micros)),
224        AvroValue::TimeMillis(_) => Err(vector_common::Error::from(
225            "AvroValue::TimeMillis is not supported",
226        )),
227        AvroValue::TimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
228        AvroValue::TimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
229        AvroValue::Union(_, v) => try_from(*v),
230        AvroValue::Uuid(uuid) => Ok(VrlValue::from(uuid.as_hyphenated().to_string())),
231        AvroValue::LocalTimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
232        AvroValue::LocalTimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use apache_avro::Schema;
239    use bytes::BytesMut;
240    use uuid::Uuid;
241
242    use super::*;
243
244    #[derive(Debug, Clone, Serialize, Deserialize)]
245    struct Log {
246        message: String,
247    }
248
249    fn get_schema() -> Schema {
250        let schema = String::from(
251            r#"{
252                "type": "record",
253                "name": "log",
254                "fields": [
255                    {
256                        "name": "message",
257                        "type": "string"
258                    }
259                ]
260            }
261        "#,
262        );
263
264        Schema::parse_str(&schema).unwrap()
265    }
266
267    #[test]
268    fn deserialize_avro() {
269        let schema = get_schema();
270
271        let event = Log {
272            message: "hello from avro".to_owned(),
273        };
274        let record_value = apache_avro::to_value(event).unwrap();
275        let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
276        let record_bytes = Bytes::from(record_datum);
277
278        let deserializer = AvroDeserializer::new(schema, false);
279        let events = deserializer
280            .parse(record_bytes, LogNamespace::Vector)
281            .unwrap();
282        assert_eq!(events.len(), 1);
283
284        assert_eq!(
285            events[0].as_log().get("message").unwrap(),
286            &VrlValue::from("hello from avro")
287        );
288    }
289
290    #[test]
291    fn deserialize_avro_strip_schema_id_prefix() {
292        let schema = get_schema();
293
294        let event = Log {
295            message: "hello from avro".to_owned(),
296        };
297        let record_value = apache_avro::to_value(event).unwrap();
298        let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
299
300        let mut bytes = BytesMut::new();
301        bytes.extend([0, 0, 0, 0, 0]); // 0 prefix + 4 byte schema id
302        bytes.extend(record_datum);
303
304        let deserializer = AvroDeserializer::new(schema, true);
305        let events = deserializer
306            .parse(bytes.freeze(), LogNamespace::Vector)
307            .unwrap();
308        assert_eq!(events.len(), 1);
309
310        assert_eq!(
311            events[0].as_log().get("message").unwrap(),
312            &VrlValue::from("hello from avro")
313        );
314    }
315
316    #[test]
317    fn deserialize_avro_uuid() {
318        let schema = get_schema();
319
320        let uuid = Uuid::new_v4().hyphenated().to_string();
321        let event = Log {
322            message: uuid.clone(),
323        };
324        let value = apache_avro::to_value(event).unwrap();
325        // let value = value.resolve(&schema).unwrap();
326        let datum = apache_avro::to_avro_datum(&schema, value).unwrap();
327
328        let mut bytes = BytesMut::new();
329        bytes.extend([0, 0, 0, 0, 0]); // 0 prefix + 4 byte schema id
330        bytes.extend(datum);
331
332        let deserializer = AvroDeserializer::new(schema, true);
333        let events = deserializer
334            .parse(bytes.freeze(), LogNamespace::Vector)
335            .unwrap();
336        assert_eq!(events.len(), 1);
337        assert_eq!(
338            events[0].as_log().get("message").unwrap(),
339            &VrlValue::from(uuid)
340        );
341    }
342}