1use arrow::{
8 datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9 error::ArrowError,
10 ipc::writer::StreamWriter,
11 json::reader::ReaderBuilder,
12 record_batch::RecordBatch,
13};
14use async_trait::async_trait;
15use bytes::{BufMut, Bytes, BytesMut};
16use snafu::{ResultExt, Snafu, ensure};
17use vector_config::configurable_component;
18use vector_core::event::Event;
19
20#[async_trait]
24pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
25 async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
30}
31
32#[configurable_component]
34#[derive(Clone, Default)]
35pub struct ArrowStreamSerializerConfig {
36 #[serde(skip)]
38 #[configurable(derived)]
39 pub schema: Option<arrow::datatypes::Schema>,
40
41 #[serde(default)]
50 #[configurable(derived)]
51 pub allow_nullable_fields: bool,
52}
53
54impl std::fmt::Debug for ArrowStreamSerializerConfig {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("ArrowStreamSerializerConfig")
57 .field(
58 "schema",
59 &self
60 .schema
61 .as_ref()
62 .map(|s| format!("{} fields", s.fields().len())),
63 )
64 .field("allow_nullable_fields", &self.allow_nullable_fields)
65 .finish()
66 }
67}
68
69impl ArrowStreamSerializerConfig {
70 pub fn new(schema: arrow::datatypes::Schema) -> Self {
72 Self {
73 schema: Some(schema),
74 allow_nullable_fields: false,
75 }
76 }
77
78 pub fn input_type(&self) -> vector_core::config::DataType {
80 vector_core::config::DataType::Log
81 }
82
83 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
85 vector_core::schema::Requirement::empty()
86 }
87}
88
89#[derive(Clone, Debug)]
91pub struct ArrowStreamSerializer {
92 schema: SchemaRef,
93}
94
95impl ArrowStreamSerializer {
96 pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
98 let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
99
100 let schema = if config.allow_nullable_fields {
103 let nullable_fields: Fields = schema
104 .fields()
105 .iter()
106 .map(|f| make_field_nullable(f))
107 .collect::<Result<Vec<_>, _>>()?
108 .into();
109 Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
110 } else {
111 schema
112 };
113
114 Ok(Self {
115 schema: SchemaRef::new(schema),
116 })
117 }
118}
119
120impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
121 type Error = ArrowEncodingError;
122
123 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
124 if events.is_empty() {
125 return Err(ArrowEncodingError::NoEvents);
126 }
127
128 let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
129
130 buffer.extend_from_slice(&bytes);
131 Ok(())
132 }
133}
134
135#[derive(Debug, Snafu)]
137pub enum ArrowEncodingError {
138 #[snafu(display("Failed to create Arrow record batch: {source}"))]
140 RecordBatchCreation {
141 source: arrow::error::ArrowError,
143 },
144
145 #[snafu(display("Failed to write Arrow IPC data: {source}"))]
147 IpcWrite {
148 source: arrow::error::ArrowError,
150 },
151
152 #[snafu(display("No events provided for encoding"))]
154 NoEvents,
155
156 #[snafu(display("Failed to fetch schema from provider: {message}"))]
158 SchemaFetchError {
159 message: String,
161 },
162
163 #[snafu(display("Null value for non-nullable field '{field_name}'"))]
165 NullConstraint {
166 field_name: String,
168 },
169
170 #[snafu(display("Arrow serializer requires a schema"))]
172 MissingSchema,
173
174 #[snafu(display("IO error: {source}"), context(false))]
176 Io {
177 source: std::io::Error,
179 },
180
181 #[snafu(display("Arrow JSON decoding error: {source}"))]
183 ArrowJsonDecode {
184 source: arrow::error::ArrowError,
186 },
187
188 #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
190 InvalidMapSchema {
191 field_name: String,
193 reason: String,
195 },
196}
197
198pub fn encode_events_to_arrow_ipc_stream(
200 events: &[Event],
201 schema: SchemaRef,
202) -> Result<Bytes, ArrowEncodingError> {
203 if events.is_empty() {
204 return Err(ArrowEncodingError::NoEvents);
205 }
206
207 let json_values = vector_log_events_to_json_values(events).map_err(|e| {
208 ArrowEncodingError::RecordBatchCreation {
209 source: ArrowError::JsonError(e.to_string()),
210 }
211 })?;
212
213 let record_batch = build_record_batch(schema, &json_values)?;
214
215 let mut buffer = BytesMut::new().writer();
216 let mut writer =
217 StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
218 writer.write(&record_batch).context(IpcWriteSnafu)?;
219 writer.finish().context(IpcWriteSnafu)?;
220
221 Ok(buffer.into_inner().freeze())
222}
223
224fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
226 let new_data_type = match field.data_type() {
227 DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
228 DataType::Struct(fields) => DataType::Struct(
229 fields
230 .iter()
231 .map(|f| make_field_nullable(f))
232 .collect::<Result<Vec<_>, _>>()?
233 .into(),
234 ),
235 DataType::Map(inner, sorted) => {
236 let DataType::Struct(fields) = inner.data_type() else {
238 return InvalidMapSchemaSnafu {
239 field_name: field.name(),
240 reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
241 }
242 .fail();
243 };
244
245 ensure!(
246 fields.len() == 2,
247 InvalidMapSchemaSnafu {
248 field_name: field.name(),
249 reason: format!("expected 2 fields (key, value), found {}", fields.len()),
250 },
251 );
252 let key_field = &fields[0];
253 let value_field = &fields[1];
254
255 let new_struct_fields: Fields =
256 [key_field.clone(), make_field_nullable(value_field)?.into()].into();
257
258 let new_inner_field = inner
261 .as_ref()
262 .clone()
263 .with_data_type(DataType::Struct(new_struct_fields))
264 .with_nullable(false);
265
266 DataType::Map(new_inner_field.into(), *sorted)
267 }
268 other => other.clone(),
269 };
270
271 Ok(field
272 .clone()
273 .with_data_type(new_data_type)
274 .with_nullable(true))
275}
276
277pub fn find_null_non_nullable_fields<'a>(
280 schema: &'a Schema,
281 values: &[serde_json::Value],
282) -> Vec<&'a str> {
283 schema
284 .fields()
285 .iter()
286 .filter(|field| {
287 !field.is_nullable()
288 && values.iter().any(|value| {
289 value
290 .as_object()
291 .and_then(|map| map.get(field.name().as_str()))
292 .is_none_or(serde_json::Value::is_null)
293 })
294 })
295 .map(|field| field.name().as_str())
296 .collect()
297}
298
299pub(crate) fn vector_log_events_to_json_values(
300 events: &[Event],
301) -> Result<Vec<serde_json::Value>, serde_json::Error> {
302 events
303 .iter()
304 .filter_map(Event::maybe_as_log)
305 .map(serde_json::to_value)
306 .collect()
307}
308
309pub(crate) fn build_record_batch(
311 schema: SchemaRef,
312 values: &[serde_json::Value],
313) -> Result<RecordBatch, ArrowEncodingError> {
314 if values.is_empty() {
315 return Err(ArrowEncodingError::NoEvents);
316 }
317
318 let missing = find_null_non_nullable_fields(&schema, values);
319 if !missing.is_empty() {
320 let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
321 field_name: missing.join(", "),
322 });
323 vector_common::internal_event::emit(crate::internal_events::EncoderNullConstraintError {
324 error: &error,
325 count: values.len(),
326 });
327 return Err(ArrowEncodingError::NullConstraint {
328 field_name: missing.join(", "),
329 });
330 }
331
332 let mut decoder = ReaderBuilder::new(schema)
333 .build_decoder()
334 .context(RecordBatchCreationSnafu)?;
335
336 decoder.serialize(values).context(ArrowJsonDecodeSnafu)?;
337
338 decoder
339 .flush()
340 .context(ArrowJsonDecodeSnafu)?
341 .ok_or(ArrowEncodingError::NoEvents)
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use arrow::{
348 array::{Array, AsArray},
349 datatypes::TimeUnit,
350 ipc::reader::StreamReader,
351 };
352 use chrono::Utc;
353 use std::io::Cursor;
354 use vector_core::event::{LogEvent, Value};
355
356 fn encode_and_decode(
358 events: Vec<Event>,
359 schema: SchemaRef,
360 ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
361 let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
362 let cursor = Cursor::new(bytes);
363 let mut reader = StreamReader::try_new(cursor, None)?;
364 Ok(reader.next().unwrap()?)
365 }
366
367 fn create_event<V>(fields: Vec<(&str, V)>) -> Event
369 where
370 V: Into<Value>,
371 {
372 let mut log = LogEvent::default();
373 for (key, value) in fields {
374 log.insert(key, value.into());
375 }
376 Event::Log(log)
377 }
378
379 mod comprehensive {
380 use super::*;
381
382 #[test]
383 fn test_encode_all_types() {
384 use arrow::datatypes::{
385 Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
386 Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
387 };
388 use vrl::value::ObjectMap;
389
390 let now = Utc::now();
391
392 let mut tuple_value = ObjectMap::new();
394 tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
395 tuple_value.insert("f1".into(), Value::Integer(999));
396
397 let mut named_tuple_value = ObjectMap::new();
399 named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
400 named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
401
402 let list_value = Value::Array(vec![
404 Value::Integer(1),
405 Value::Integer(2),
406 Value::Integer(3),
407 ]);
408
409 let mut map_value = ObjectMap::new();
411 map_value.insert("key1".into(), Value::Integer(100));
412 map_value.insert("key2".into(), Value::Integer(200));
413
414 let mut log = LogEvent::default();
415 log.insert("string_field", "test");
417 log.insert("int8_field", 127);
418 log.insert("int16_field", 32000);
419 log.insert("int32_field", 1000000);
420 log.insert("int64_field", 42);
421 log.insert("uint8_field", 255);
422 log.insert("uint16_field", 65535);
423 log.insert("uint32_field", 4000000);
424 log.insert("uint64_field", 9000000000_i64);
425 log.insert("float32_field", 3.15);
426 log.insert("float64_field", 3.15);
427 log.insert("bool_field", true);
428 log.insert("timestamp_field", now);
429 log.insert("decimal_field", 99.99);
430 log.insert("list_field", list_value);
432 log.insert("struct_field", Value::Object(tuple_value));
433 log.insert("named_struct_field", Value::Object(named_tuple_value));
434 log.insert("map_field", Value::Object(map_value));
435
436 let events = vec![Event::Log(log)];
437
438 let struct_fields = arrow::datatypes::Fields::from(vec![
440 Field::new("f0", DataType::Utf8, true),
441 Field::new("f1", DataType::Int64, true),
442 ]);
443
444 let named_struct_fields = arrow::datatypes::Fields::from(vec![
445 Field::new("category", DataType::Utf8, true),
446 Field::new("tag", DataType::Utf8, true),
447 ]);
448
449 let map_entries = Field::new(
450 "entries",
451 DataType::Struct(arrow::datatypes::Fields::from(vec![
452 Field::new("keys", DataType::Utf8, false),
453 Field::new("values", DataType::Int64, true),
454 ])),
455 false,
456 );
457
458 let schema = Schema::new(vec![
459 Field::new("string_field", DataType::Utf8, true),
460 Field::new("int8_field", DataType::Int8, true),
461 Field::new("int16_field", DataType::Int16, true),
462 Field::new("int32_field", DataType::Int32, true),
463 Field::new("int64_field", DataType::Int64, true),
464 Field::new("uint8_field", DataType::UInt8, true),
465 Field::new("uint16_field", DataType::UInt16, true),
466 Field::new("uint32_field", DataType::UInt32, true),
467 Field::new("uint64_field", DataType::UInt64, true),
468 Field::new("float32_field", DataType::Float32, true),
469 Field::new("float64_field", DataType::Float64, true),
470 Field::new("bool_field", DataType::Boolean, true),
471 Field::new(
472 "timestamp_field",
473 DataType::Timestamp(TimeUnit::Millisecond, None),
474 true,
475 ),
476 Field::new("decimal_field", DataType::Decimal128(10, 2), true),
477 Field::new(
478 "list_field",
479 DataType::List(Field::new("item", DataType::Int64, true).into()),
480 true,
481 ),
482 Field::new("struct_field", DataType::Struct(struct_fields), true),
483 Field::new(
484 "named_struct_field",
485 DataType::Struct(named_struct_fields),
486 true,
487 ),
488 Field::new("map_field", DataType::Map(map_entries.into(), false), true),
489 ])
490 .into();
491
492 let batch = encode_and_decode(events, schema).expect("Failed to encode");
493
494 assert_eq!(batch.num_rows(), 1);
495 assert_eq!(batch.num_columns(), 18);
496
497 assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
499 assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
500 assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
501 assert_eq!(
502 batch.column(3).as_primitive::<Int32Type>().value(0),
503 1000000
504 );
505 assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
506 assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
507 assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
508 assert_eq!(
509 batch.column(7).as_primitive::<UInt32Type>().value(0),
510 4000000
511 );
512 assert_eq!(
513 batch.column(8).as_primitive::<UInt64Type>().value(0),
514 9000000000
515 );
516 assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
517 assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
518 assert!(batch.column(11).as_boolean().value(0));
519 assert_eq!(
520 batch
521 .column(12)
522 .as_primitive::<TimestampMillisecondType>()
523 .value(0),
524 now.timestamp_millis()
525 );
526 assert_eq!(
527 batch.column(13).as_primitive::<Decimal128Type>().value(0),
528 9999
529 );
530
531 let list_array = batch.column(14).as_list::<i32>();
532 assert!(!list_array.is_null(0));
533 let list_values = list_array.value(0);
534 assert_eq!(list_values.len(), 3);
535 let int_array = list_values.as_primitive::<Int64Type>();
536 assert_eq!(int_array.value(0), 1);
537 assert_eq!(int_array.value(1), 2);
538 assert_eq!(int_array.value(2), 3);
539
540 let struct_array = batch.column(15).as_struct();
542 assert!(!struct_array.is_null(0));
543 assert_eq!(
544 struct_array.column(0).as_string::<i32>().value(0),
545 "nested_str"
546 );
547 assert_eq!(
548 struct_array.column(1).as_primitive::<Int64Type>().value(0),
549 999
550 );
551
552 let named_struct_array = batch.column(16).as_struct();
554 assert!(!named_struct_array.is_null(0));
555 assert_eq!(
556 named_struct_array.column(0).as_string::<i32>().value(0),
557 "test_category"
558 );
559 assert_eq!(
560 named_struct_array.column(1).as_string::<i32>().value(0),
561 "test_tag"
562 );
563
564 let map_array = batch.column(17).as_map();
566 assert!(!map_array.is_null(0));
567 let map_value = map_array.value(0);
568 assert_eq!(map_value.len(), 2);
569 }
570 }
571
572 mod error_handling {
573 use super::*;
574
575 #[test]
576 fn test_encode_empty_events() {
577 let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
578 let events: Vec<Event> = vec![];
579 let result = encode_events_to_arrow_ipc_stream(&events, schema);
580 assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
581 }
582
583 #[test]
584 fn test_missing_non_nullable_field_errors() {
585 let events = vec![create_event(vec![("other_field", "value")])];
586
587 let schema = Schema::new(vec![Field::new(
588 "required_field",
589 DataType::Utf8,
590 false, )])
592 .into();
593
594 let result = encode_events_to_arrow_ipc_stream(&events, schema);
595 assert!(result.is_err());
596 }
597 }
598
599 mod temporal_types {
600 use super::*;
601 use arrow::datatypes::{
602 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
603 TimestampSecondType,
604 };
605
606 #[test]
607 fn test_encode_timestamp_precisions() {
608 let now = Utc::now();
609 let mut log = LogEvent::default();
610 log.insert("ts_second", now);
611 log.insert("ts_milli", now);
612 log.insert("ts_micro", now);
613 log.insert("ts_nano", now);
614
615 let events = vec![Event::Log(log)];
616
617 let schema = Schema::new(vec![
618 Field::new(
619 "ts_second",
620 DataType::Timestamp(TimeUnit::Second, None),
621 true,
622 ),
623 Field::new(
624 "ts_milli",
625 DataType::Timestamp(TimeUnit::Millisecond, None),
626 true,
627 ),
628 Field::new(
629 "ts_micro",
630 DataType::Timestamp(TimeUnit::Microsecond, None),
631 true,
632 ),
633 Field::new(
634 "ts_nano",
635 DataType::Timestamp(TimeUnit::Nanosecond, None),
636 true,
637 ),
638 ])
639 .into();
640
641 let batch = encode_and_decode(events, schema).unwrap();
642
643 assert_eq!(batch.num_rows(), 1);
644 assert_eq!(batch.num_columns(), 4);
645
646 let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
647 assert!(!ts_second.is_null(0));
648 assert_eq!(ts_second.value(0), now.timestamp());
649
650 let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
651 assert!(!ts_milli.is_null(0));
652 assert_eq!(ts_milli.value(0), now.timestamp_millis());
653
654 let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
655 assert!(!ts_micro.is_null(0));
656 assert_eq!(ts_micro.value(0), now.timestamp_micros());
657
658 let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
659 assert!(!ts_nano.is_null(0));
660 assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
661 }
662
663 #[test]
664 fn test_encode_mixed_timestamp_string_native_and_integer() {
665 let now = Utc::now();
666
667 let mut log1 = LogEvent::default();
668 log1.insert("ts", "2025-10-22T10:18:44.256Z"); let mut log2 = LogEvent::default();
671 log2.insert("ts", now); let mut log3 = LogEvent::default();
674 log3.insert("ts", 1729594724256000000_i64); let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
677
678 let schema = Schema::new(vec![Field::new(
679 "ts",
680 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
681 true,
682 )])
683 .into();
684
685 let batch = encode_and_decode(events, schema).unwrap();
686
687 assert_eq!(batch.num_rows(), 3);
688
689 let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
690
691 assert!(!ts_array.is_null(0));
693 assert!(!ts_array.is_null(1));
694 assert!(!ts_array.is_null(2));
695
696 let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
698 .unwrap()
699 .timestamp_nanos_opt()
700 .unwrap();
701 assert_eq!(ts_array.value(0), expected);
702
703 assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
705
706 assert_eq!(ts_array.value(2), 1729594724256000000_i64);
708 }
709 }
710
711 mod config_tests {
712 use super::*;
713 use tokio_util::codec::Encoder;
714
715 #[test]
716 fn test_config_allow_nullable_fields_overrides_schema() {
717 let mut log1 = LogEvent::default();
718 log1.insert("strict_field", 42);
719 let log2 = LogEvent::default();
720 let events = vec![Event::Log(log1), Event::Log(log2)];
721
722 let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
723
724 let mut config = ArrowStreamSerializerConfig::new(schema);
725 config.allow_nullable_fields = true;
726
727 let mut serializer =
728 ArrowStreamSerializer::new(config).expect("Failed to create serializer");
729
730 let mut buffer = BytesMut::new();
731 serializer
732 .encode(events, &mut buffer)
733 .expect("Encoding should succeed when allow_nullable_fields is true");
734
735 let cursor = Cursor::new(buffer);
736 let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
737 let batch = reader.next().unwrap().expect("Failed to read batch");
738
739 assert_eq!(batch.num_rows(), 2);
740
741 let binding = batch.schema();
742 let output_field = binding.field(0);
743 assert!(
744 output_field.is_nullable(),
745 "The output schema field should have been transformed to nullable=true"
746 );
747
748 let array = batch
749 .column(0)
750 .as_primitive::<arrow::datatypes::Int64Type>();
751
752 assert_eq!(array.value(0), 42);
753 assert!(!array.is_null(0));
754 assert!(
755 array.is_null(1),
756 "The missing value should be encoded as null"
757 );
758 }
759
760 #[test]
761 fn test_make_field_nullable_with_nested_types() {
762 let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
763 let inner_struct =
764 DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
765 let list_field = Field::new("item", inner_struct, false);
766 let list_type = DataType::List(list_field.into());
767 let outer_field = Field::new("inner_list", list_type, false);
768 let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
769
770 let original_field = Field::new("root", outer_struct, false);
771 let nullable_field = make_field_nullable(&original_field).unwrap();
772
773 assert!(
774 nullable_field.is_nullable(),
775 "Root field should be nullable"
776 );
777
778 if let DataType::Struct(root_fields) = nullable_field.data_type() {
779 let inner_list_field = &root_fields[0];
780 assert!(inner_list_field.is_nullable());
781
782 if let DataType::List(list_item_field) = inner_list_field.data_type() {
783 assert!(list_item_field.is_nullable());
784
785 if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
786 let nested_field = &inner_struct_fields[0];
787 assert!(nested_field.is_nullable());
788 } else {
789 panic!("Expected Struct type for list items");
790 }
791 } else {
792 panic!("Expected List type for inner_list");
793 }
794 } else {
795 panic!("Expected Struct type for root field");
796 }
797 }
798
799 #[test]
800 fn test_make_field_nullable_with_map_type() {
801 let key_field = Field::new("key", DataType::Utf8, false);
802 let value_field = Field::new("value", DataType::Int64, false);
803 let entries_struct =
804 DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
805 let entries_field = Field::new("entries", entries_struct, false);
806 let map_type = DataType::Map(entries_field.into(), false);
807
808 let original_field = Field::new("my_map", map_type, false);
809 let nullable_field = make_field_nullable(&original_field).unwrap();
810
811 assert!(
812 nullable_field.is_nullable(),
813 "Root map field should be nullable"
814 );
815
816 if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
817 assert!(
818 !entries_field.is_nullable(),
819 "Map entries field should be non-nullable"
820 );
821
822 if let DataType::Struct(struct_fields) = entries_field.data_type() {
823 let key_field = &struct_fields[0];
824 let value_field = &struct_fields[1];
825 assert!(
826 !key_field.is_nullable(),
827 "Map key field should be non-nullable"
828 );
829 assert!(
830 value_field.is_nullable(),
831 "Map value field should be nullable"
832 );
833 } else {
834 panic!("Expected Struct type for map entries");
835 }
836 } else {
837 panic!("Expected Map type for my_map field");
838 }
839 }
840 }
841
842 mod null_non_nullable {
843 use super::*;
844
845 #[test]
846 fn test_missing_non_nullable_field_error_names_fields() {
847 let schema: SchemaRef = Schema::new(vec![
848 Field::new("required_field", DataType::Utf8, false),
849 Field::new("optional_field", DataType::Utf8, true),
850 ])
851 .into();
852
853 let event = create_event(vec![("optional_field", "hello")]);
855
856 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
857 let err = result.unwrap_err().to_string();
858 assert!(
859 err.contains("required_field"),
860 "Error should name the missing field, got: {err}"
861 );
862 assert!(
863 !err.contains("optional_field"),
864 "Error should not name nullable fields, got: {err}"
865 );
866 }
867
868 #[test]
869 fn test_null_value_in_non_nullable_field_error_names_fields() {
870 let schema: SchemaRef = Schema::new(vec![
871 Field::new("id", DataType::Int64, false),
872 Field::new("name", DataType::Utf8, false),
873 ])
874 .into();
875
876 let event = create_event(vec![("id", Value::Integer(1))]);
878
879 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
880 let err = result.unwrap_err().to_string();
881 assert!(
882 err.contains("name"),
883 "Error should name the null field, got: {err}"
884 );
885 }
886
887 #[test]
888 fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
889 let schema = Schema::new(vec![
890 Field::new("a", DataType::Utf8, false),
891 Field::new("b", DataType::Int64, false),
892 ]);
893
894 let event = create_event(vec![
895 ("a", Value::Bytes("val".into())),
896 ("b", Value::Integer(42)),
897 ]);
898 let missing = find_null_non_nullable_fields(
899 &schema,
900 &vector_log_events_to_json_values(&[event]).unwrap(),
901 );
902 assert!(
903 missing.is_empty(),
904 "Expected no missing fields, got: {missing:?}"
905 );
906 }
907
908 #[test]
909 fn test_find_null_non_nullable_fields_detects_explicit_null() {
910 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
911
912 let event = create_event(vec![("a", Value::Null)]);
913 let missing = find_null_non_nullable_fields(
914 &schema,
915 &vector_log_events_to_json_values(&[event]).unwrap(),
916 );
917 assert_eq!(missing, vec!["a"]);
918 }
919 }
920}