1use bytes::{Buf, Bytes};
2use chrono::Utc;
3use lookup::event_path;
4use serde::{Deserialize, Serialize};
5use smallvec::{SmallVec, smallvec};
6use vector_config::configurable_component;
7use vector_core::{
8 config::{DataType, LogNamespace, log_schema},
9 event::{Event, LogEvent},
10 schema,
11};
12use vrl::value::KeyString;
13
14use super::Deserializer;
15use crate::encoding::AvroSerializerOptions;
16
17type VrlValue = vrl::value::Value;
18type AvroValue = apache_avro::types::Value;
19
20const CONFLUENT_MAGIC_BYTE: u8 = 0;
21const CONFLUENT_SCHEMA_PREFIX_LEN: usize = 5;
22
23#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct AvroDeserializerConfig {
26 pub avro_options: AvroDeserializerOptions,
28}
29
30impl AvroDeserializerConfig {
31 pub const fn new(schema: String, strip_schema_id_prefix: bool) -> Self {
33 Self {
34 avro_options: AvroDeserializerOptions {
35 schema,
36 strip_schema_id_prefix,
37 },
38 }
39 }
40
41 pub fn build(&self) -> AvroDeserializer {
43 let schema = apache_avro::Schema::parse_str(&self.avro_options.schema)
44 .map_err(|error| format!("Failed building Avro serializer: {error}"))
45 .unwrap();
46 AvroDeserializer {
47 schema,
48 strip_schema_id_prefix: self.avro_options.strip_schema_id_prefix,
49 }
50 }
51
52 pub fn output_type(&self) -> DataType {
54 DataType::Log
55 }
56
57 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59 match log_namespace {
60 LogNamespace::Legacy => {
61 let mut definition = schema::Definition::empty_legacy_namespace()
62 .unknown_fields(vrl::value::Kind::any());
63
64 if let Some(timestamp_key) = log_schema().timestamp_key() {
65 definition = definition.try_with_field(
66 timestamp_key,
67 vrl::value::Kind::any().or_timestamp(),
68 Some("timestamp"),
69 );
70 }
71 definition
72 }
73 LogNamespace::Vector => schema::Definition::new_with_default_metadata(
74 vrl::value::Kind::any(),
75 [log_namespace],
76 ),
77 }
78 }
79}
80
81impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
82 fn from(value: &AvroDeserializerOptions) -> Self {
83 Self {
84 schema: value.schema.clone(),
85 }
86 }
87}
88#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct AvroDeserializerOptions {
92 #[configurable(metadata(
100 docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#,
101 docs::additional_props_description = r#"Supports most avro data types, unsupported data types includes
102 ["decimal", "duration", "local-timestamp-millis", "local-timestamp-micros"]"#,
103 ))]
104 pub schema: String,
105
106 pub strip_schema_id_prefix: bool,
109}
110
111#[derive(Debug, Clone)]
113pub struct AvroDeserializer {
114 schema: apache_avro::Schema,
115 strip_schema_id_prefix: bool,
116}
117
118impl AvroDeserializer {
119 pub const fn new(schema: apache_avro::Schema, strip_schema_id_prefix: bool) -> Self {
121 Self {
122 schema,
123 strip_schema_id_prefix,
124 }
125 }
126}
127
128impl Deserializer for AvroDeserializer {
129 fn parse(
130 &self,
131 bytes: Bytes,
132 log_namespace: LogNamespace,
133 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
134 if bytes.is_empty() {
136 return Ok(smallvec![]);
137 }
138
139 let bytes = if self.strip_schema_id_prefix {
140 if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
141 bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
142 } else {
143 return Err(vector_common::Error::from(
144 "Expected avro datum to be prefixed with schema id",
145 ));
146 }
147 } else {
148 bytes
149 };
150
151 let value = apache_avro::from_avro_datum(&self.schema, &mut bytes.reader(), None)?;
152
153 let apache_avro::types::Value::Record(fields) = value else {
154 return Err(vector_common::Error::from("Expected an avro Record"));
155 };
156
157 let mut log = LogEvent::default();
158 for (k, v) in fields {
159 log.insert(event_path!(k.as_str()), try_from(v)?);
160 }
161
162 let mut event = Event::Log(log);
163 let event = match log_namespace {
164 LogNamespace::Vector => event,
165 LogNamespace::Legacy => {
166 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
167 let log = event.as_mut_log();
168 if !log.contains(timestamp_key) {
169 let timestamp = Utc::now();
170 log.insert(timestamp_key, timestamp);
171 }
172 }
173 event
174 }
175 };
176 Ok(smallvec![event])
177 }
178}
179
180pub fn try_from(value: AvroValue) -> vector_common::Result<VrlValue> {
182 match value {
185 AvroValue::Array(array) => {
186 let mut vector = Vec::new();
187 for item in array {
188 vector.push(try_from(item)?);
189 }
190 Ok(VrlValue::Array(vector))
191 }
192 AvroValue::Boolean(boolean) => Ok(VrlValue::from(boolean)),
193 AvroValue::Bytes(bytes) => Ok(VrlValue::from(bytes)),
194 AvroValue::Date(_) => Err(vector_common::Error::from(
195 "AvroValue::Date is not supported",
196 )),
197 AvroValue::Decimal(_) => Err(vector_common::Error::from(
198 "AvroValue::Decimal is not supported",
199 )),
200 AvroValue::Double(double) => Ok(VrlValue::from_f64_or_zero(double)),
201 AvroValue::Duration(_) => Err(vector_common::Error::from(
202 "AvroValue::Duration is not supported",
203 )),
204 AvroValue::Enum(_, string) => Ok(VrlValue::from(string)),
205 AvroValue::Fixed(_, _) => Err(vector_common::Error::from(
206 "AvroValue::Fixed is not supported",
207 )),
208 AvroValue::Float(float) => Ok(VrlValue::from_f64_or_zero(float as f64)),
209 AvroValue::Int(int) => Ok(VrlValue::from(int)),
210 AvroValue::Long(long) => Ok(VrlValue::from(long)),
211 AvroValue::Map(items) => items
212 .into_iter()
213 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
214 .collect::<Result<Vec<_>, _>>()
215 .map(|v| VrlValue::Object(v.into_iter().collect())),
216 AvroValue::Null => Ok(VrlValue::Null),
217 AvroValue::Record(items) => items
218 .into_iter()
219 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
220 .collect::<Result<Vec<_>, _>>()
221 .map(|v| VrlValue::Object(v.into_iter().collect())),
222 AvroValue::String(string) => Ok(VrlValue::from(string)),
223 AvroValue::TimeMicros(time_micros) => Ok(VrlValue::from(time_micros)),
224 AvroValue::TimeMillis(_) => Err(vector_common::Error::from(
225 "AvroValue::TimeMillis is not supported",
226 )),
227 AvroValue::TimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
228 AvroValue::TimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
229 AvroValue::Union(_, v) => try_from(*v),
230 AvroValue::Uuid(uuid) => Ok(VrlValue::from(uuid.as_hyphenated().to_string())),
231 AvroValue::LocalTimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
232 AvroValue::LocalTimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
233 AvroValue::BigDecimal(_) => Err(vector_common::Error::from(
234 "AvroValue::BigDecimal is not supported",
235 )),
236 AvroValue::TimestampNanos(_) => Err(vector_common::Error::from(
237 "AvroValue::TimestampNanos is not supported",
238 )),
239 AvroValue::LocalTimestampNanos(_) => Err(vector_common::Error::from(
240 "AvroValue::LocalTimestampNanos is not supported",
241 )),
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use apache_avro::Schema;
248 use bytes::BytesMut;
249 use uuid::Uuid;
250
251 use super::*;
252
253 #[derive(Debug, Clone, Serialize, Deserialize)]
254 struct Log {
255 message: String,
256 }
257
258 fn get_schema() -> Schema {
259 let schema = String::from(
260 r#"{
261 "type": "record",
262 "name": "log",
263 "fields": [
264 {
265 "name": "message",
266 "type": "string"
267 }
268 ]
269 }
270 "#,
271 );
272
273 Schema::parse_str(&schema).unwrap()
274 }
275
276 #[test]
277 fn deserialize_avro() {
278 let schema = get_schema();
279
280 let event = Log {
281 message: "hello from avro".to_owned(),
282 };
283 let record_value = apache_avro::to_value(event).unwrap();
284 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
285 let record_bytes = Bytes::from(record_datum);
286
287 let deserializer = AvroDeserializer::new(schema, false);
288 let events = deserializer
289 .parse(record_bytes, LogNamespace::Vector)
290 .unwrap();
291 assert_eq!(events.len(), 1);
292
293 assert_eq!(
294 events[0].as_log().get("message").unwrap(),
295 &VrlValue::from("hello from avro")
296 );
297 }
298
299 #[test]
300 fn deserialize_avro_strip_schema_id_prefix() {
301 let schema = get_schema();
302
303 let event = Log {
304 message: "hello from avro".to_owned(),
305 };
306 let record_value = apache_avro::to_value(event).unwrap();
307 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
308
309 let mut bytes = BytesMut::new();
310 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(record_datum);
312
313 let deserializer = AvroDeserializer::new(schema, true);
314 let events = deserializer
315 .parse(bytes.freeze(), LogNamespace::Vector)
316 .unwrap();
317 assert_eq!(events.len(), 1);
318
319 assert_eq!(
320 events[0].as_log().get("message").unwrap(),
321 &VrlValue::from("hello from avro")
322 );
323 }
324
325 #[test]
326 fn deserialize_avro_uuid() {
327 let schema = get_schema();
328
329 let uuid = Uuid::new_v4().hyphenated().to_string();
330 let event = Log {
331 message: uuid.clone(),
332 };
333 let value = apache_avro::to_value(event).unwrap();
334 let datum = apache_avro::to_avro_datum(&schema, value).unwrap();
336
337 let mut bytes = BytesMut::new();
338 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(datum);
340
341 let deserializer = AvroDeserializer::new(schema, true);
342 let events = deserializer
343 .parse(bytes.freeze(), LogNamespace::Vector)
344 .unwrap();
345 assert_eq!(events.len(), 1);
346 assert_eq!(
347 events[0].as_log().get("message").unwrap(),
348 &VrlValue::from(uuid)
349 );
350 }
351}