1use super::Deserializer;
2use crate::encoding::AvroSerializerOptions;
3use bytes::Buf;
4use bytes::Bytes;
5use chrono::Utc;
6use lookup::event_path;
7use serde::{Deserialize, Serialize};
8use smallvec::{smallvec, SmallVec};
9use vector_config::configurable_component;
10use vector_core::{
11 config::{log_schema, DataType, LogNamespace},
12 event::{Event, LogEvent},
13 schema,
14};
15use vrl::value::KeyString;
16
17type VrlValue = vrl::value::Value;
18type AvroValue = apache_avro::types::Value;
19
20const CONFLUENT_MAGIC_BYTE: u8 = 0;
21const CONFLUENT_SCHEMA_PREFIX_LEN: usize = 5;
22
23#[derive(Debug, Clone, Deserialize, Serialize)]
25pub struct AvroDeserializerConfig {
26 pub avro_options: AvroDeserializerOptions,
28}
29
30impl AvroDeserializerConfig {
31 pub const fn new(schema: String, strip_schema_id_prefix: bool) -> Self {
33 Self {
34 avro_options: AvroDeserializerOptions {
35 schema,
36 strip_schema_id_prefix,
37 },
38 }
39 }
40
41 pub fn build(&self) -> AvroDeserializer {
43 let schema = apache_avro::Schema::parse_str(&self.avro_options.schema)
44 .map_err(|error| format!("Failed building Avro serializer: {error}"))
45 .unwrap();
46 AvroDeserializer {
47 schema,
48 strip_schema_id_prefix: self.avro_options.strip_schema_id_prefix,
49 }
50 }
51
52 pub fn output_type(&self) -> DataType {
54 DataType::Log
55 }
56
57 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
59 match log_namespace {
60 LogNamespace::Legacy => {
61 let mut definition = schema::Definition::empty_legacy_namespace()
62 .unknown_fields(vrl::value::Kind::any());
63
64 if let Some(timestamp_key) = log_schema().timestamp_key() {
65 definition = definition.try_with_field(
66 timestamp_key,
67 vrl::value::Kind::any().or_timestamp(),
68 Some("timestamp"),
69 );
70 }
71 definition
72 }
73 LogNamespace::Vector => schema::Definition::new_with_default_metadata(
74 vrl::value::Kind::any(),
75 [log_namespace],
76 ),
77 }
78 }
79}
80
81impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
82 fn from(value: &AvroDeserializerOptions) -> Self {
83 Self {
84 schema: value.schema.clone(),
85 }
86 }
87}
88#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct AvroDeserializerOptions {
92 #[configurable(metadata(
100 docs::examples = r#"{ "type": "record", "name": "log", "fields": [{ "name": "message", "type": "string" }] }"#,
101 docs::additional_props_description = r#"Supports most avro data types, unsupported data types includes
102 ["decimal", "duration", "local-timestamp-millis", "local-timestamp-micros"]"#,
103 ))]
104 pub schema: String,
105
106 pub strip_schema_id_prefix: bool,
109}
110
111#[derive(Debug, Clone)]
113pub struct AvroDeserializer {
114 schema: apache_avro::Schema,
115 strip_schema_id_prefix: bool,
116}
117
118impl AvroDeserializer {
119 pub const fn new(schema: apache_avro::Schema, strip_schema_id_prefix: bool) -> Self {
121 Self {
122 schema,
123 strip_schema_id_prefix,
124 }
125 }
126}
127
128impl Deserializer for AvroDeserializer {
129 fn parse(
130 &self,
131 bytes: Bytes,
132 log_namespace: LogNamespace,
133 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
134 if bytes.is_empty() {
136 return Ok(smallvec![]);
137 }
138
139 let bytes = if self.strip_schema_id_prefix {
140 if bytes.len() >= CONFLUENT_SCHEMA_PREFIX_LEN && bytes[0] == CONFLUENT_MAGIC_BYTE {
141 bytes.slice(CONFLUENT_SCHEMA_PREFIX_LEN..)
142 } else {
143 return Err(vector_common::Error::from(
144 "Expected avro datum to be prefixed with schema id",
145 ));
146 }
147 } else {
148 bytes
149 };
150
151 let value = apache_avro::from_avro_datum(&self.schema, &mut bytes.reader(), None)?;
152
153 let apache_avro::types::Value::Record(fields) = value else {
154 return Err(vector_common::Error::from("Expected an avro Record"));
155 };
156
157 let mut log = LogEvent::default();
158 for (k, v) in fields {
159 log.insert(event_path!(k.as_str()), try_from(v)?);
160 }
161
162 let mut event = Event::Log(log);
163 let event = match log_namespace {
164 LogNamespace::Vector => event,
165 LogNamespace::Legacy => {
166 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
167 let log = event.as_mut_log();
168 if !log.contains(timestamp_key) {
169 let timestamp = Utc::now();
170 log.insert(timestamp_key, timestamp);
171 }
172 }
173 event
174 }
175 };
176 Ok(smallvec![event])
177 }
178}
179
180pub fn try_from(value: AvroValue) -> vector_common::Result<VrlValue> {
182 match value {
185 AvroValue::Array(array) => {
186 let mut vector = Vec::new();
187 for item in array {
188 vector.push(try_from(item)?);
189 }
190 Ok(VrlValue::Array(vector))
191 }
192 AvroValue::Boolean(boolean) => Ok(VrlValue::from(boolean)),
193 AvroValue::Bytes(bytes) => Ok(VrlValue::from(bytes)),
194 AvroValue::Date(_) => Err(vector_common::Error::from(
195 "AvroValue::Date is not supported",
196 )),
197 AvroValue::Decimal(_) => Err(vector_common::Error::from(
198 "AvroValue::Decimal is not supported",
199 )),
200 AvroValue::Double(double) => Ok(VrlValue::from_f64_or_zero(double)),
201 AvroValue::Duration(_) => Err(vector_common::Error::from(
202 "AvroValue::Duration is not supported",
203 )),
204 AvroValue::Enum(_, string) => Ok(VrlValue::from(string)),
205 AvroValue::Fixed(_, _) => Err(vector_common::Error::from(
206 "AvroValue::Fixed is not supported",
207 )),
208 AvroValue::Float(float) => Ok(VrlValue::from_f64_or_zero(float as f64)),
209 AvroValue::Int(int) => Ok(VrlValue::from(int)),
210 AvroValue::Long(long) => Ok(VrlValue::from(long)),
211 AvroValue::Map(items) => items
212 .into_iter()
213 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
214 .collect::<Result<Vec<_>, _>>()
215 .map(|v| VrlValue::Object(v.into_iter().collect())),
216 AvroValue::Null => Ok(VrlValue::Null),
217 AvroValue::Record(items) => items
218 .into_iter()
219 .map(|(key, value)| try_from(value).map(|v| (KeyString::from(key), v)))
220 .collect::<Result<Vec<_>, _>>()
221 .map(|v| VrlValue::Object(v.into_iter().collect())),
222 AvroValue::String(string) => Ok(VrlValue::from(string)),
223 AvroValue::TimeMicros(time_micros) => Ok(VrlValue::from(time_micros)),
224 AvroValue::TimeMillis(_) => Err(vector_common::Error::from(
225 "AvroValue::TimeMillis is not supported",
226 )),
227 AvroValue::TimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
228 AvroValue::TimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
229 AvroValue::Union(_, v) => try_from(*v),
230 AvroValue::Uuid(uuid) => Ok(VrlValue::from(uuid.as_hyphenated().to_string())),
231 AvroValue::LocalTimestampMillis(ts_millis) => Ok(VrlValue::from(ts_millis)),
232 AvroValue::LocalTimestampMicros(ts_micros) => Ok(VrlValue::from(ts_micros)),
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use apache_avro::Schema;
239 use bytes::BytesMut;
240 use uuid::Uuid;
241
242 use super::*;
243
244 #[derive(Debug, Clone, Serialize, Deserialize)]
245 struct Log {
246 message: String,
247 }
248
249 fn get_schema() -> Schema {
250 let schema = String::from(
251 r#"{
252 "type": "record",
253 "name": "log",
254 "fields": [
255 {
256 "name": "message",
257 "type": "string"
258 }
259 ]
260 }
261 "#,
262 );
263
264 Schema::parse_str(&schema).unwrap()
265 }
266
267 #[test]
268 fn deserialize_avro() {
269 let schema = get_schema();
270
271 let event = Log {
272 message: "hello from avro".to_owned(),
273 };
274 let record_value = apache_avro::to_value(event).unwrap();
275 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
276 let record_bytes = Bytes::from(record_datum);
277
278 let deserializer = AvroDeserializer::new(schema, false);
279 let events = deserializer
280 .parse(record_bytes, LogNamespace::Vector)
281 .unwrap();
282 assert_eq!(events.len(), 1);
283
284 assert_eq!(
285 events[0].as_log().get("message").unwrap(),
286 &VrlValue::from("hello from avro")
287 );
288 }
289
290 #[test]
291 fn deserialize_avro_strip_schema_id_prefix() {
292 let schema = get_schema();
293
294 let event = Log {
295 message: "hello from avro".to_owned(),
296 };
297 let record_value = apache_avro::to_value(event).unwrap();
298 let record_datum = apache_avro::to_avro_datum(&schema, record_value).unwrap();
299
300 let mut bytes = BytesMut::new();
301 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(record_datum);
303
304 let deserializer = AvroDeserializer::new(schema, true);
305 let events = deserializer
306 .parse(bytes.freeze(), LogNamespace::Vector)
307 .unwrap();
308 assert_eq!(events.len(), 1);
309
310 assert_eq!(
311 events[0].as_log().get("message").unwrap(),
312 &VrlValue::from("hello from avro")
313 );
314 }
315
316 #[test]
317 fn deserialize_avro_uuid() {
318 let schema = get_schema();
319
320 let uuid = Uuid::new_v4().hyphenated().to_string();
321 let event = Log {
322 message: uuid.clone(),
323 };
324 let value = apache_avro::to_value(event).unwrap();
325 let datum = apache_avro::to_avro_datum(&schema, value).unwrap();
327
328 let mut bytes = BytesMut::new();
329 bytes.extend([0, 0, 0, 0, 0]); bytes.extend(datum);
331
332 let deserializer = AvroDeserializer::new(schema, true);
333 let events = deserializer
334 .parse(bytes.freeze(), LogNamespace::Vector)
335 .unwrap();
336 assert_eq!(events.len(), 1);
337 assert_eq!(
338 events[0].as_log().get("message").unwrap(),
339 &VrlValue::from(uuid)
340 );
341 }
342}