1use arrow::{
8 datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9 ipc::writer::StreamWriter,
10 json::reader::ReaderBuilder,
11 record_batch::RecordBatch,
12};
13use async_trait::async_trait;
14use bytes::{BufMut, Bytes, BytesMut};
15use snafu::{ResultExt, Snafu, ensure};
16use vector_config::configurable_component;
17use vector_core::event::Event;
18
19#[async_trait]
23pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
24 async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
29}
30
31#[configurable_component]
33#[derive(Clone, Default)]
34pub struct ArrowStreamSerializerConfig {
35 #[serde(skip)]
37 #[configurable(derived)]
38 pub schema: Option<arrow::datatypes::Schema>,
39
40 #[serde(default)]
49 #[configurable(derived)]
50 pub allow_nullable_fields: bool,
51}
52
53impl std::fmt::Debug for ArrowStreamSerializerConfig {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("ArrowStreamSerializerConfig")
56 .field(
57 "schema",
58 &self
59 .schema
60 .as_ref()
61 .map(|s| format!("{} fields", s.fields().len())),
62 )
63 .field("allow_nullable_fields", &self.allow_nullable_fields)
64 .finish()
65 }
66}
67
68impl ArrowStreamSerializerConfig {
69 pub fn new(schema: arrow::datatypes::Schema) -> Self {
71 Self {
72 schema: Some(schema),
73 allow_nullable_fields: false,
74 }
75 }
76
77 pub fn input_type(&self) -> vector_core::config::DataType {
79 vector_core::config::DataType::Log
80 }
81
82 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
84 vector_core::schema::Requirement::empty()
85 }
86}
87
88#[derive(Clone, Debug)]
90pub struct ArrowStreamSerializer {
91 schema: SchemaRef,
92}
93
94impl ArrowStreamSerializer {
95 pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
97 let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
98
99 let schema = if config.allow_nullable_fields {
102 let nullable_fields: Fields = schema
103 .fields()
104 .iter()
105 .map(|f| make_field_nullable(f))
106 .collect::<Result<Vec<_>, _>>()?
107 .into();
108 Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
109 } else {
110 schema
111 };
112
113 Ok(Self {
114 schema: SchemaRef::new(schema),
115 })
116 }
117}
118
119impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
120 type Error = ArrowEncodingError;
121
122 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
123 if events.is_empty() {
124 return Err(ArrowEncodingError::NoEvents);
125 }
126
127 let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
128
129 buffer.extend_from_slice(&bytes);
130 Ok(())
131 }
132}
133
134#[derive(Debug, Snafu)]
136pub enum ArrowEncodingError {
137 #[snafu(display("Failed to create Arrow record batch: {source}"))]
139 RecordBatchCreation {
140 source: arrow::error::ArrowError,
142 },
143
144 #[snafu(display("Failed to write Arrow IPC data: {source}"))]
146 IpcWrite {
147 source: arrow::error::ArrowError,
149 },
150
151 #[snafu(display("No events provided for encoding"))]
153 NoEvents,
154
155 #[snafu(display("Failed to fetch schema from provider: {message}"))]
157 SchemaFetchError {
158 message: String,
160 },
161
162 #[snafu(display("Null value for non-nullable field '{field_name}'"))]
164 NullConstraint {
165 field_name: String,
167 },
168
169 #[snafu(display("Arrow serializer requires a schema"))]
171 MissingSchema,
172
173 #[snafu(display("IO error: {source}"), context(false))]
175 Io {
176 source: std::io::Error,
178 },
179
180 #[snafu(display("Arrow JSON decoding error: {source}"))]
182 ArrowJsonDecode {
183 source: arrow::error::ArrowError,
185 },
186
187 #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
189 InvalidMapSchema {
190 field_name: String,
192 reason: String,
194 },
195}
196
197pub fn encode_events_to_arrow_ipc_stream(
199 events: &[Event],
200 schema: SchemaRef,
201) -> Result<Bytes, ArrowEncodingError> {
202 if events.is_empty() {
203 return Err(ArrowEncodingError::NoEvents);
204 }
205
206 let record_batch = build_record_batch(schema, events)?;
207
208 let mut buffer = BytesMut::new().writer();
209 let mut writer =
210 StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
211 writer.write(&record_batch).context(IpcWriteSnafu)?;
212 writer.finish().context(IpcWriteSnafu)?;
213
214 Ok(buffer.into_inner().freeze())
215}
216
217fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
219 let new_data_type = match field.data_type() {
220 DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
221 DataType::Struct(fields) => DataType::Struct(
222 fields
223 .iter()
224 .map(|f| make_field_nullable(f))
225 .collect::<Result<Vec<_>, _>>()?
226 .into(),
227 ),
228 DataType::Map(inner, sorted) => {
229 let DataType::Struct(fields) = inner.data_type() else {
231 return InvalidMapSchemaSnafu {
232 field_name: field.name(),
233 reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
234 }
235 .fail();
236 };
237
238 ensure!(
239 fields.len() == 2,
240 InvalidMapSchemaSnafu {
241 field_name: field.name(),
242 reason: format!("expected 2 fields (key, value), found {}", fields.len()),
243 },
244 );
245 let key_field = &fields[0];
246 let value_field = &fields[1];
247
248 let new_struct_fields: Fields =
249 [key_field.clone(), make_field_nullable(value_field)?.into()].into();
250
251 let new_inner_field = inner
254 .as_ref()
255 .clone()
256 .with_data_type(DataType::Struct(new_struct_fields))
257 .with_nullable(false);
258
259 DataType::Map(new_inner_field.into(), *sorted)
260 }
261 other => other.clone(),
262 };
263
264 Ok(field
265 .clone()
266 .with_data_type(new_data_type)
267 .with_nullable(true))
268}
269
270pub fn find_null_non_nullable_fields<'a>(
273 schema: &'a Schema,
274 values: &[&vrl::value::Value],
275) -> Vec<&'a str> {
276 schema
277 .fields()
278 .iter()
279 .filter(|field| {
280 !field.is_nullable()
281 && values.iter().any(|value| {
282 value
283 .as_object()
284 .and_then(|map| map.get(field.name().as_str()))
285 .is_none_or(vrl::value::Value::is_null)
286 })
287 })
288 .map(|field| field.name().as_str())
289 .collect()
290}
291
292fn build_record_batch(
294 schema: SchemaRef,
295 events: &[Event],
296) -> Result<RecordBatch, ArrowEncodingError> {
297 let values: Vec<_> = events
298 .iter()
299 .filter_map(Event::maybe_as_log)
300 .map(|log| log.value())
301 .collect();
302
303 if values.is_empty() {
304 return Err(ArrowEncodingError::NoEvents);
305 }
306
307 let missing = find_null_non_nullable_fields(&schema, &values);
308 if !missing.is_empty() {
309 for field_name in &missing {
310 let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
311 field_name: field_name.to_string(),
312 });
313 vector_common::internal_event::emit(
314 crate::internal_events::EncoderNullConstraintError { error: &error },
315 );
316 }
317 return Err(ArrowEncodingError::NullConstraint {
318 field_name: missing.join(", "),
319 });
320 }
321
322 let mut decoder = ReaderBuilder::new(schema)
323 .build_decoder()
324 .context(RecordBatchCreationSnafu)?;
325
326 decoder.serialize(&values).context(ArrowJsonDecodeSnafu)?;
327
328 decoder
329 .flush()
330 .context(ArrowJsonDecodeSnafu)?
331 .ok_or(ArrowEncodingError::NoEvents)
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use arrow::{
338 array::{Array, AsArray},
339 datatypes::TimeUnit,
340 ipc::reader::StreamReader,
341 };
342 use chrono::Utc;
343 use std::io::Cursor;
344 use vector_core::event::{LogEvent, Value};
345
346 fn encode_and_decode(
348 events: Vec<Event>,
349 schema: SchemaRef,
350 ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
351 let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
352 let cursor = Cursor::new(bytes);
353 let mut reader = StreamReader::try_new(cursor, None)?;
354 Ok(reader.next().unwrap()?)
355 }
356
357 fn create_event<V>(fields: Vec<(&str, V)>) -> Event
359 where
360 V: Into<Value>,
361 {
362 let mut log = LogEvent::default();
363 for (key, value) in fields {
364 log.insert(key, value.into());
365 }
366 Event::Log(log)
367 }
368
369 mod comprehensive {
370 use super::*;
371
372 #[test]
373 fn test_encode_all_types() {
374 use arrow::datatypes::{
375 Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
376 Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
377 };
378 use vrl::value::ObjectMap;
379
380 let now = Utc::now();
381
382 let mut tuple_value = ObjectMap::new();
384 tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
385 tuple_value.insert("f1".into(), Value::Integer(999));
386
387 let mut named_tuple_value = ObjectMap::new();
389 named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
390 named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
391
392 let list_value = Value::Array(vec![
394 Value::Integer(1),
395 Value::Integer(2),
396 Value::Integer(3),
397 ]);
398
399 let mut map_value = ObjectMap::new();
401 map_value.insert("key1".into(), Value::Integer(100));
402 map_value.insert("key2".into(), Value::Integer(200));
403
404 let mut log = LogEvent::default();
405 log.insert("string_field", "test");
407 log.insert("int8_field", 127);
408 log.insert("int16_field", 32000);
409 log.insert("int32_field", 1000000);
410 log.insert("int64_field", 42);
411 log.insert("uint8_field", 255);
412 log.insert("uint16_field", 65535);
413 log.insert("uint32_field", 4000000);
414 log.insert("uint64_field", 9000000000_i64);
415 log.insert("float32_field", 3.15);
416 log.insert("float64_field", 3.15);
417 log.insert("bool_field", true);
418 log.insert("timestamp_field", now);
419 log.insert("decimal_field", 99.99);
420 log.insert("list_field", list_value);
422 log.insert("struct_field", Value::Object(tuple_value));
423 log.insert("named_struct_field", Value::Object(named_tuple_value));
424 log.insert("map_field", Value::Object(map_value));
425
426 let events = vec![Event::Log(log)];
427
428 let struct_fields = arrow::datatypes::Fields::from(vec![
430 Field::new("f0", DataType::Utf8, true),
431 Field::new("f1", DataType::Int64, true),
432 ]);
433
434 let named_struct_fields = arrow::datatypes::Fields::from(vec![
435 Field::new("category", DataType::Utf8, true),
436 Field::new("tag", DataType::Utf8, true),
437 ]);
438
439 let map_entries = Field::new(
440 "entries",
441 DataType::Struct(arrow::datatypes::Fields::from(vec![
442 Field::new("keys", DataType::Utf8, false),
443 Field::new("values", DataType::Int64, true),
444 ])),
445 false,
446 );
447
448 let schema = Schema::new(vec![
449 Field::new("string_field", DataType::Utf8, true),
450 Field::new("int8_field", DataType::Int8, true),
451 Field::new("int16_field", DataType::Int16, true),
452 Field::new("int32_field", DataType::Int32, true),
453 Field::new("int64_field", DataType::Int64, true),
454 Field::new("uint8_field", DataType::UInt8, true),
455 Field::new("uint16_field", DataType::UInt16, true),
456 Field::new("uint32_field", DataType::UInt32, true),
457 Field::new("uint64_field", DataType::UInt64, true),
458 Field::new("float32_field", DataType::Float32, true),
459 Field::new("float64_field", DataType::Float64, true),
460 Field::new("bool_field", DataType::Boolean, true),
461 Field::new(
462 "timestamp_field",
463 DataType::Timestamp(TimeUnit::Millisecond, None),
464 true,
465 ),
466 Field::new("decimal_field", DataType::Decimal128(10, 2), true),
467 Field::new(
468 "list_field",
469 DataType::List(Field::new("item", DataType::Int64, true).into()),
470 true,
471 ),
472 Field::new("struct_field", DataType::Struct(struct_fields), true),
473 Field::new(
474 "named_struct_field",
475 DataType::Struct(named_struct_fields),
476 true,
477 ),
478 Field::new("map_field", DataType::Map(map_entries.into(), false), true),
479 ])
480 .into();
481
482 let batch = encode_and_decode(events, schema).expect("Failed to encode");
483
484 assert_eq!(batch.num_rows(), 1);
485 assert_eq!(batch.num_columns(), 18);
486
487 assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
489 assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
490 assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
491 assert_eq!(
492 batch.column(3).as_primitive::<Int32Type>().value(0),
493 1000000
494 );
495 assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
496 assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
497 assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
498 assert_eq!(
499 batch.column(7).as_primitive::<UInt32Type>().value(0),
500 4000000
501 );
502 assert_eq!(
503 batch.column(8).as_primitive::<UInt64Type>().value(0),
504 9000000000
505 );
506 assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
507 assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
508 assert!(batch.column(11).as_boolean().value(0));
509 assert_eq!(
510 batch
511 .column(12)
512 .as_primitive::<TimestampMillisecondType>()
513 .value(0),
514 now.timestamp_millis()
515 );
516 assert_eq!(
517 batch.column(13).as_primitive::<Decimal128Type>().value(0),
518 9999
519 );
520
521 let list_array = batch.column(14).as_list::<i32>();
522 assert!(!list_array.is_null(0));
523 let list_values = list_array.value(0);
524 assert_eq!(list_values.len(), 3);
525 let int_array = list_values.as_primitive::<Int64Type>();
526 assert_eq!(int_array.value(0), 1);
527 assert_eq!(int_array.value(1), 2);
528 assert_eq!(int_array.value(2), 3);
529
530 let struct_array = batch.column(15).as_struct();
532 assert!(!struct_array.is_null(0));
533 assert_eq!(
534 struct_array.column(0).as_string::<i32>().value(0),
535 "nested_str"
536 );
537 assert_eq!(
538 struct_array.column(1).as_primitive::<Int64Type>().value(0),
539 999
540 );
541
542 let named_struct_array = batch.column(16).as_struct();
544 assert!(!named_struct_array.is_null(0));
545 assert_eq!(
546 named_struct_array.column(0).as_string::<i32>().value(0),
547 "test_category"
548 );
549 assert_eq!(
550 named_struct_array.column(1).as_string::<i32>().value(0),
551 "test_tag"
552 );
553
554 let map_array = batch.column(17).as_map();
556 assert!(!map_array.is_null(0));
557 let map_value = map_array.value(0);
558 assert_eq!(map_value.len(), 2);
559 }
560 }
561
562 mod error_handling {
563 use super::*;
564
565 #[test]
566 fn test_encode_empty_events() {
567 let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
568 let events: Vec<Event> = vec![];
569 let result = encode_events_to_arrow_ipc_stream(&events, schema);
570 assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
571 }
572
573 #[test]
574 fn test_missing_non_nullable_field_errors() {
575 let events = vec![create_event(vec![("other_field", "value")])];
576
577 let schema = Schema::new(vec![Field::new(
578 "required_field",
579 DataType::Utf8,
580 false, )])
582 .into();
583
584 let result = encode_events_to_arrow_ipc_stream(&events, schema);
585 assert!(result.is_err());
586 }
587 }
588
589 mod temporal_types {
590 use super::*;
591 use arrow::datatypes::{
592 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
593 TimestampSecondType,
594 };
595
596 #[test]
597 fn test_encode_timestamp_precisions() {
598 let now = Utc::now();
599 let mut log = LogEvent::default();
600 log.insert("ts_second", now);
601 log.insert("ts_milli", now);
602 log.insert("ts_micro", now);
603 log.insert("ts_nano", now);
604
605 let events = vec![Event::Log(log)];
606
607 let schema = Schema::new(vec![
608 Field::new(
609 "ts_second",
610 DataType::Timestamp(TimeUnit::Second, None),
611 true,
612 ),
613 Field::new(
614 "ts_milli",
615 DataType::Timestamp(TimeUnit::Millisecond, None),
616 true,
617 ),
618 Field::new(
619 "ts_micro",
620 DataType::Timestamp(TimeUnit::Microsecond, None),
621 true,
622 ),
623 Field::new(
624 "ts_nano",
625 DataType::Timestamp(TimeUnit::Nanosecond, None),
626 true,
627 ),
628 ])
629 .into();
630
631 let batch = encode_and_decode(events, schema).unwrap();
632
633 assert_eq!(batch.num_rows(), 1);
634 assert_eq!(batch.num_columns(), 4);
635
636 let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
637 assert!(!ts_second.is_null(0));
638 assert_eq!(ts_second.value(0), now.timestamp());
639
640 let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
641 assert!(!ts_milli.is_null(0));
642 assert_eq!(ts_milli.value(0), now.timestamp_millis());
643
644 let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
645 assert!(!ts_micro.is_null(0));
646 assert_eq!(ts_micro.value(0), now.timestamp_micros());
647
648 let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
649 assert!(!ts_nano.is_null(0));
650 assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
651 }
652
653 #[test]
654 fn test_encode_mixed_timestamp_string_native_and_integer() {
655 let now = Utc::now();
656
657 let mut log1 = LogEvent::default();
658 log1.insert("ts", "2025-10-22T10:18:44.256Z"); let mut log2 = LogEvent::default();
661 log2.insert("ts", now); let mut log3 = LogEvent::default();
664 log3.insert("ts", 1729594724256000000_i64); let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
667
668 let schema = Schema::new(vec![Field::new(
669 "ts",
670 DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
671 true,
672 )])
673 .into();
674
675 let batch = encode_and_decode(events, schema).unwrap();
676
677 assert_eq!(batch.num_rows(), 3);
678
679 let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
680
681 assert!(!ts_array.is_null(0));
683 assert!(!ts_array.is_null(1));
684 assert!(!ts_array.is_null(2));
685
686 let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
688 .unwrap()
689 .timestamp_nanos_opt()
690 .unwrap();
691 assert_eq!(ts_array.value(0), expected);
692
693 assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
695
696 assert_eq!(ts_array.value(2), 1729594724256000000_i64);
698 }
699 }
700
701 mod config_tests {
702 use super::*;
703 use tokio_util::codec::Encoder;
704
705 #[test]
706 fn test_config_allow_nullable_fields_overrides_schema() {
707 let mut log1 = LogEvent::default();
708 log1.insert("strict_field", 42);
709 let log2 = LogEvent::default();
710 let events = vec![Event::Log(log1), Event::Log(log2)];
711
712 let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
713
714 let mut config = ArrowStreamSerializerConfig::new(schema);
715 config.allow_nullable_fields = true;
716
717 let mut serializer =
718 ArrowStreamSerializer::new(config).expect("Failed to create serializer");
719
720 let mut buffer = BytesMut::new();
721 serializer
722 .encode(events, &mut buffer)
723 .expect("Encoding should succeed when allow_nullable_fields is true");
724
725 let cursor = Cursor::new(buffer);
726 let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
727 let batch = reader.next().unwrap().expect("Failed to read batch");
728
729 assert_eq!(batch.num_rows(), 2);
730
731 let binding = batch.schema();
732 let output_field = binding.field(0);
733 assert!(
734 output_field.is_nullable(),
735 "The output schema field should have been transformed to nullable=true"
736 );
737
738 let array = batch
739 .column(0)
740 .as_primitive::<arrow::datatypes::Int64Type>();
741
742 assert_eq!(array.value(0), 42);
743 assert!(!array.is_null(0));
744 assert!(
745 array.is_null(1),
746 "The missing value should be encoded as null"
747 );
748 }
749
750 #[test]
751 fn test_make_field_nullable_with_nested_types() {
752 let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
753 let inner_struct =
754 DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
755 let list_field = Field::new("item", inner_struct, false);
756 let list_type = DataType::List(list_field.into());
757 let outer_field = Field::new("inner_list", list_type, false);
758 let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
759
760 let original_field = Field::new("root", outer_struct, false);
761 let nullable_field = make_field_nullable(&original_field).unwrap();
762
763 assert!(
764 nullable_field.is_nullable(),
765 "Root field should be nullable"
766 );
767
768 if let DataType::Struct(root_fields) = nullable_field.data_type() {
769 let inner_list_field = &root_fields[0];
770 assert!(inner_list_field.is_nullable());
771
772 if let DataType::List(list_item_field) = inner_list_field.data_type() {
773 assert!(list_item_field.is_nullable());
774
775 if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
776 let nested_field = &inner_struct_fields[0];
777 assert!(nested_field.is_nullable());
778 } else {
779 panic!("Expected Struct type for list items");
780 }
781 } else {
782 panic!("Expected List type for inner_list");
783 }
784 } else {
785 panic!("Expected Struct type for root field");
786 }
787 }
788
789 #[test]
790 fn test_make_field_nullable_with_map_type() {
791 let key_field = Field::new("key", DataType::Utf8, false);
792 let value_field = Field::new("value", DataType::Int64, false);
793 let entries_struct =
794 DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
795 let entries_field = Field::new("entries", entries_struct, false);
796 let map_type = DataType::Map(entries_field.into(), false);
797
798 let original_field = Field::new("my_map", map_type, false);
799 let nullable_field = make_field_nullable(&original_field).unwrap();
800
801 assert!(
802 nullable_field.is_nullable(),
803 "Root map field should be nullable"
804 );
805
806 if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
807 assert!(
808 !entries_field.is_nullable(),
809 "Map entries field should be non-nullable"
810 );
811
812 if let DataType::Struct(struct_fields) = entries_field.data_type() {
813 let key_field = &struct_fields[0];
814 let value_field = &struct_fields[1];
815 assert!(
816 !key_field.is_nullable(),
817 "Map key field should be non-nullable"
818 );
819 assert!(
820 value_field.is_nullable(),
821 "Map value field should be nullable"
822 );
823 } else {
824 panic!("Expected Struct type for map entries");
825 }
826 } else {
827 panic!("Expected Map type for my_map field");
828 }
829 }
830 }
831
832 mod null_non_nullable {
833 use super::*;
834
835 #[test]
836 fn test_missing_non_nullable_field_error_names_fields() {
837 let schema: SchemaRef = Schema::new(vec![
838 Field::new("required_field", DataType::Utf8, false),
839 Field::new("optional_field", DataType::Utf8, true),
840 ])
841 .into();
842
843 let event = create_event(vec![("optional_field", "hello")]);
845
846 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
847 let err = result.unwrap_err().to_string();
848 assert!(
849 err.contains("required_field"),
850 "Error should name the missing field, got: {err}"
851 );
852 assert!(
853 !err.contains("optional_field"),
854 "Error should not name nullable fields, got: {err}"
855 );
856 }
857
858 #[test]
859 fn test_null_value_in_non_nullable_field_error_names_fields() {
860 let schema: SchemaRef = Schema::new(vec![
861 Field::new("id", DataType::Int64, false),
862 Field::new("name", DataType::Utf8, false),
863 ])
864 .into();
865
866 let event = create_event(vec![("id", Value::Integer(1))]);
868
869 let result = encode_events_to_arrow_ipc_stream(&[event], schema);
870 let err = result.unwrap_err().to_string();
871 assert!(
872 err.contains("name"),
873 "Error should name the null field, got: {err}"
874 );
875 }
876
877 #[test]
878 fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
879 let schema = Schema::new(vec![
880 Field::new("a", DataType::Utf8, false),
881 Field::new("b", DataType::Int64, false),
882 ]);
883
884 let event = create_event(vec![
885 ("a", Value::Bytes("val".into())),
886 ("b", Value::Integer(42)),
887 ]);
888 let value = event.as_log().value();
889 let missing = find_null_non_nullable_fields(&schema, &[value]);
890 assert!(
891 missing.is_empty(),
892 "Expected no missing fields, got: {missing:?}"
893 );
894 }
895
896 #[test]
897 fn test_find_null_non_nullable_fields_detects_explicit_null() {
898 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
899
900 let event = create_event(vec![("a", Value::Null)]);
901 let value = event.as_log().value();
902 let missing = find_null_non_nullable_fields(&schema, &[value]);
903 assert_eq!(missing, vec!["a"]);
904 }
905 }
906}