1use arrow::{
8 array::{
9 ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, Decimal256Builder,
10 Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder,
11 StringBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
12 TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder, UInt16Builder,
13 UInt32Builder, UInt64Builder,
14 },
15 datatypes::{DataType, Schema, TimeUnit, i256},
16 ipc::writer::StreamWriter,
17 record_batch::RecordBatch,
18};
19use async_trait::async_trait;
20use bytes::{BufMut, Bytes, BytesMut};
21use chrono::{DateTime, Utc};
22use rust_decimal::Decimal;
23use snafu::Snafu;
24use std::sync::Arc;
25use vector_config::configurable_component;
26
27use vector_core::event::{Event, Value};
28
29#[async_trait]
33pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
34 async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
39}
40
41#[configurable_component]
43#[derive(Clone, Default)]
44pub struct ArrowStreamSerializerConfig {
45 #[serde(skip)]
47 #[configurable(derived)]
48 pub schema: Option<arrow::datatypes::Schema>,
49
50 #[serde(default)]
59 #[configurable(derived)]
60 pub allow_nullable_fields: bool,
61}
62
63impl std::fmt::Debug for ArrowStreamSerializerConfig {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 f.debug_struct("ArrowStreamSerializerConfig")
66 .field(
67 "schema",
68 &self
69 .schema
70 .as_ref()
71 .map(|s| format!("{} fields", s.fields().len())),
72 )
73 .field("allow_nullable_fields", &self.allow_nullable_fields)
74 .finish()
75 }
76}
77
78impl ArrowStreamSerializerConfig {
79 pub fn new(schema: arrow::datatypes::Schema) -> Self {
81 Self {
82 schema: Some(schema),
83 allow_nullable_fields: false,
84 }
85 }
86
87 pub fn input_type(&self) -> vector_core::config::DataType {
89 vector_core::config::DataType::Log
90 }
91
92 pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
94 vector_core::schema::Requirement::empty()
95 }
96}
97
98#[derive(Clone, Debug)]
100pub struct ArrowStreamSerializer {
101 schema: Arc<Schema>,
102}
103
104impl ArrowStreamSerializer {
105 pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, vector_common::Error> {
107 let schema = config
108 .schema
109 .ok_or_else(|| vector_common::Error::from("Arrow serializer requires a schema."))?;
110
111 let schema = if config.allow_nullable_fields {
114 Schema::new_with_metadata(
115 schema
116 .fields()
117 .iter()
118 .map(|f| Arc::new(make_field_nullable(f)))
119 .collect::<Vec<_>>(),
120 schema.metadata().clone(),
121 )
122 } else {
123 schema
124 };
125
126 Ok(Self {
127 schema: Arc::new(schema),
128 })
129 }
130}
131
132impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
133 type Error = ArrowEncodingError;
134
135 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
136 if events.is_empty() {
137 return Err(ArrowEncodingError::NoEvents);
138 }
139
140 let bytes = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&self.schema)))?;
141
142 buffer.extend_from_slice(&bytes);
143 Ok(())
144 }
145}
146
147#[derive(Debug, Snafu)]
149pub enum ArrowEncodingError {
150 #[snafu(display("Failed to create Arrow record batch: {}", source))]
152 RecordBatchCreation {
153 source: arrow::error::ArrowError,
155 },
156
157 #[snafu(display("Failed to write Arrow IPC data: {}", source))]
159 IpcWrite {
160 source: arrow::error::ArrowError,
162 },
163
164 #[snafu(display("No events provided for encoding"))]
166 NoEvents,
167
168 #[snafu(display("Schema must be provided before encoding"))]
170 NoSchemaProvided,
171
172 #[snafu(display("Failed to fetch schema from provider: {}", message))]
174 SchemaFetchError {
175 message: String,
177 },
178
179 #[snafu(display(
181 "Unsupported Arrow data type for field '{}': {:?}",
182 field_name,
183 data_type
184 ))]
185 UnsupportedType {
186 field_name: String,
188 data_type: DataType,
190 },
191
192 #[snafu(display("Null value for non-nullable field '{}'", field_name))]
194 NullConstraint {
195 field_name: String,
197 },
198
199 #[snafu(display("IO error: {}", source))]
201 Io {
202 source: std::io::Error,
204 },
205}
206
207impl From<std::io::Error> for ArrowEncodingError {
208 fn from(error: std::io::Error) -> Self {
209 Self::Io { source: error }
210 }
211}
212
213pub fn encode_events_to_arrow_ipc_stream(
215 events: &[Event],
216 schema: Option<Arc<Schema>>,
217) -> Result<Bytes, ArrowEncodingError> {
218 if events.is_empty() {
219 return Err(ArrowEncodingError::NoEvents);
220 }
221
222 let schema_ref = schema.ok_or(ArrowEncodingError::NoSchemaProvided)?;
223
224 let record_batch = build_record_batch(schema_ref, events)?;
225
226 let ipc_err = |source| ArrowEncodingError::IpcWrite { source };
227
228 let mut buffer = BytesMut::new().writer();
229 let mut writer =
230 StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).map_err(ipc_err)?;
231 writer.write(&record_batch).map_err(ipc_err)?;
232 writer.finish().map_err(ipc_err)?;
233
234 Ok(buffer.into_inner().freeze())
235}
236
237fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
239 let new_data_type = match field.data_type() {
240 DataType::List(inner_field) => DataType::List(Arc::new(make_field_nullable(inner_field))),
241 DataType::Struct(fields) => {
242 DataType::Struct(fields.iter().map(|f| make_field_nullable(f)).collect())
243 }
244 DataType::Map(inner_field, sorted) => {
245 DataType::Map(Arc::new(make_field_nullable(inner_field)), *sorted)
246 }
247 other => other.clone(),
248 };
249
250 field
251 .clone()
252 .with_data_type(new_data_type)
253 .with_nullable(true)
254}
255
256fn build_record_batch(
258 schema: Arc<Schema>,
259 events: &[Event],
260) -> Result<RecordBatch, ArrowEncodingError> {
261 let num_fields = schema.fields().len();
262 let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_fields);
263
264 for field in schema.fields() {
265 let field_name = field.name();
266 let nullable = field.is_nullable();
267 let array: ArrayRef = match field.data_type() {
268 DataType::Timestamp(time_unit, _) => {
269 build_timestamp_array(events, field_name, *time_unit, nullable)?
270 }
271 DataType::Utf8 => build_string_array(events, field_name, nullable)?,
272 DataType::Int8 => build_int8_array(events, field_name, nullable)?,
273 DataType::Int16 => build_int16_array(events, field_name, nullable)?,
274 DataType::Int32 => build_int32_array(events, field_name, nullable)?,
275 DataType::Int64 => build_int64_array(events, field_name, nullable)?,
276 DataType::UInt8 => build_uint8_array(events, field_name, nullable)?,
277 DataType::UInt16 => build_uint16_array(events, field_name, nullable)?,
278 DataType::UInt32 => build_uint32_array(events, field_name, nullable)?,
279 DataType::UInt64 => build_uint64_array(events, field_name, nullable)?,
280 DataType::Float32 => build_float32_array(events, field_name, nullable)?,
281 DataType::Float64 => build_float64_array(events, field_name, nullable)?,
282 DataType::Boolean => build_boolean_array(events, field_name, nullable)?,
283 DataType::Binary => build_binary_array(events, field_name, nullable)?,
284 DataType::Decimal128(precision, scale) => {
285 build_decimal128_array(events, field_name, *precision, *scale, nullable)?
286 }
287 DataType::Decimal256(precision, scale) => {
288 build_decimal256_array(events, field_name, *precision, *scale, nullable)?
289 }
290 other_type => {
291 return Err(ArrowEncodingError::UnsupportedType {
292 field_name: field_name.into(),
293 data_type: other_type.clone(),
294 });
295 }
296 };
297
298 columns.push(array);
299 }
300
301 RecordBatch::try_new(schema, columns)
302 .map_err(|source| ArrowEncodingError::RecordBatchCreation { source })
303}
304
305macro_rules! handle_null_constraints {
307 ($builder:expr, $nullable:expr, $field_name:expr) => {{
308 if !$nullable {
309 return Err(ArrowEncodingError::NullConstraint {
310 field_name: $field_name.into(),
311 });
312 }
313 $builder.append_null();
314 }};
315}
316
317macro_rules! define_build_primitive_array_fn {
319 (
320 $fn_name:ident, $builder_ty:ty, $( $value_pat:pat $(if $guard:expr)? => $append_expr:expr ),+
324 ) => {
325 fn $fn_name(
326 events: &[Event],
327 field_name: &str,
328 nullable: bool,
329 ) -> Result<ArrayRef, ArrowEncodingError> {
330 let mut builder = <$builder_ty>::with_capacity(events.len());
331
332 for event in events {
333 if let Event::Log(log) = event {
334 match log.get(field_name) {
335 $(
336 $value_pat $(if $guard)? => builder.append_value($append_expr),
337 )+
338 _ => handle_null_constraints!(builder, nullable, field_name),
340 }
341 }
342 }
343 Ok(Arc::new(builder.finish()))
344 }
345 };
346}
347
348fn extract_timestamp(value: &Value) -> Option<DateTime<Utc>> {
349 match value {
350 Value::Timestamp(ts) => Some(*ts),
351 Value::Bytes(bytes) => std::str::from_utf8(bytes)
352 .ok()
353 .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
354 .map(|dt| dt.with_timezone(&Utc)),
355 _ => None,
356 }
357}
358
359fn build_timestamp_array(
360 events: &[Event],
361 field_name: &str,
362 time_unit: TimeUnit,
363 nullable: bool,
364) -> Result<ArrayRef, ArrowEncodingError> {
365 macro_rules! build_array {
366 ($builder:ty, $converter:expr) => {{
367 let mut builder = <$builder>::with_capacity(events.len());
368 for event in events {
369 if let Event::Log(log) = event {
370 let value_to_append = log.get(field_name).and_then(|value| {
371 if let Some(ts) = extract_timestamp(value) {
373 $converter(&ts)
374 }
375 else if let Value::Integer(i) = value {
377 Some(*i)
378 }
379 else {
381 None
382 }
383 });
384
385 if value_to_append.is_none() && !nullable {
386 return Err(ArrowEncodingError::NullConstraint {
387 field_name: field_name.into(),
388 });
389 }
390
391 builder.append_option(value_to_append);
392 }
393 }
394 Ok(Arc::new(builder.finish()))
395 }};
396 }
397
398 match time_unit {
399 TimeUnit::Second => {
400 build_array!(TimestampSecondBuilder, |ts: &DateTime<Utc>| Some(
401 ts.timestamp()
402 ))
403 }
404 TimeUnit::Millisecond => {
405 build_array!(TimestampMillisecondBuilder, |ts: &DateTime<Utc>| Some(
406 ts.timestamp_millis()
407 ))
408 }
409 TimeUnit::Microsecond => {
410 build_array!(TimestampMicrosecondBuilder, |ts: &DateTime<Utc>| Some(
411 ts.timestamp_micros()
412 ))
413 }
414 TimeUnit::Nanosecond => {
415 build_array!(TimestampNanosecondBuilder, |ts: &DateTime<Utc>| ts
416 .timestamp_nanos_opt())
417 }
418 }
419}
420
421fn build_string_array(
422 events: &[Event],
423 field_name: &str,
424 nullable: bool,
425) -> Result<ArrayRef, ArrowEncodingError> {
426 let mut builder = StringBuilder::with_capacity(events.len(), 0);
427
428 for event in events {
429 if let Event::Log(log) = event {
430 let mut appended = false;
431 if let Some(value) = log.get(field_name) {
432 match value {
433 Value::Bytes(bytes) => {
434 match std::str::from_utf8(bytes) {
436 Ok(s) => builder.append_value(s),
437 Err(_) => builder.append_value(&String::from_utf8_lossy(bytes)),
438 }
439 appended = true;
440 }
441 Value::Object(obj) => {
442 if let Ok(s) = serde_json::to_string(&obj) {
443 builder.append_value(s);
444 appended = true;
445 }
446 }
447 Value::Array(arr) => {
448 if let Ok(s) = serde_json::to_string(&arr) {
449 builder.append_value(s);
450 appended = true;
451 }
452 }
453 _ => {
454 builder.append_value(&value.to_string_lossy());
455 appended = true;
456 }
457 }
458 }
459
460 if !appended {
461 handle_null_constraints!(builder, nullable, field_name);
462 }
463 }
464 }
465
466 Ok(Arc::new(builder.finish()))
467}
468
469define_build_primitive_array_fn!(
470 build_int8_array,
471 Int8Builder,
472 Some(Value::Integer(i)) if *i >= i8::MIN as i64 && *i <= i8::MAX as i64 => *i as i8
473);
474
475define_build_primitive_array_fn!(
476 build_int16_array,
477 Int16Builder,
478 Some(Value::Integer(i)) if *i >= i16::MIN as i64 && *i <= i16::MAX as i64 => *i as i16
479);
480
481define_build_primitive_array_fn!(
482 build_int32_array,
483 Int32Builder,
484 Some(Value::Integer(i)) if *i >= i32::MIN as i64 && *i <= i32::MAX as i64 => *i as i32
485);
486
487define_build_primitive_array_fn!(
488 build_int64_array,
489 Int64Builder,
490 Some(Value::Integer(i)) => *i
491);
492
493define_build_primitive_array_fn!(
494 build_uint8_array,
495 UInt8Builder,
496 Some(Value::Integer(i)) if *i >= 0 && *i <= u8::MAX as i64 => *i as u8
497);
498
499define_build_primitive_array_fn!(
500 build_uint16_array,
501 UInt16Builder,
502 Some(Value::Integer(i)) if *i >= 0 && *i <= u16::MAX as i64 => *i as u16
503);
504
505define_build_primitive_array_fn!(
506 build_uint32_array,
507 UInt32Builder,
508 Some(Value::Integer(i)) if *i >= 0 && *i <= u32::MAX as i64 => *i as u32
509);
510
511define_build_primitive_array_fn!(
512 build_uint64_array,
513 UInt64Builder,
514 Some(Value::Integer(i)) if *i >= 0 => *i as u64
515);
516
517define_build_primitive_array_fn!(
518 build_float32_array,
519 Float32Builder,
520 Some(Value::Float(f)) => f.into_inner() as f32,
521 Some(Value::Integer(i)) => *i as f32
522);
523
524define_build_primitive_array_fn!(
525 build_float64_array,
526 Float64Builder,
527 Some(Value::Float(f)) => f.into_inner(),
528 Some(Value::Integer(i)) => *i as f64
529);
530
531define_build_primitive_array_fn!(
532 build_boolean_array,
533 BooleanBuilder,
534 Some(Value::Boolean(b)) => *b
535);
536
537fn build_binary_array(
538 events: &[Event],
539 field_name: &str,
540 nullable: bool,
541) -> Result<ArrayRef, ArrowEncodingError> {
542 let mut builder = BinaryBuilder::with_capacity(events.len(), 0);
543
544 for event in events {
545 if let Event::Log(log) = event {
546 match log.get(field_name) {
547 Some(Value::Bytes(bytes)) => builder.append_value(bytes),
548 _ => handle_null_constraints!(builder, nullable, field_name),
549 }
550 }
551 }
552
553 Ok(Arc::new(builder.finish()))
554}
555
556fn build_decimal128_array(
557 events: &[Event],
558 field_name: &str,
559 precision: u8,
560 scale: i8,
561 nullable: bool,
562) -> Result<ArrayRef, ArrowEncodingError> {
563 let mut builder = Decimal128Builder::with_capacity(events.len())
564 .with_precision_and_scale(precision, scale)
565 .map_err(|_| ArrowEncodingError::UnsupportedType {
566 field_name: field_name.into(),
567 data_type: DataType::Decimal128(precision, scale),
568 })?;
569
570 let target_scale = scale.unsigned_abs() as u32;
571
572 for event in events {
573 if let Event::Log(log) = event {
574 let mut appended = false;
575 match log.get(field_name) {
576 Some(Value::Float(f)) => {
577 if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) {
578 decimal.rescale(target_scale);
579 let mantissa = decimal.mantissa();
580 builder.append_value(mantissa);
581 appended = true;
582 }
583 }
584 Some(Value::Integer(i)) => {
585 let mut decimal = Decimal::from(*i);
586 decimal.rescale(target_scale);
587 let mantissa = decimal.mantissa();
588 builder.append_value(mantissa);
589 appended = true;
590 }
591 _ => {}
592 }
593
594 if !appended {
595 handle_null_constraints!(builder, nullable, field_name);
596 }
597 }
598 }
599
600 Ok(Arc::new(builder.finish()))
601}
602
603fn build_decimal256_array(
604 events: &[Event],
605 field_name: &str,
606 precision: u8,
607 scale: i8,
608 nullable: bool,
609) -> Result<ArrayRef, ArrowEncodingError> {
610 let mut builder = Decimal256Builder::with_capacity(events.len())
611 .with_precision_and_scale(precision, scale)
612 .map_err(|_| ArrowEncodingError::UnsupportedType {
613 field_name: field_name.into(),
614 data_type: DataType::Decimal256(precision, scale),
615 })?;
616
617 let target_scale = scale.unsigned_abs() as u32;
618
619 for event in events {
620 if let Event::Log(log) = event {
621 let mut appended = false;
622 match log.get(field_name) {
623 Some(Value::Float(f)) => {
624 if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) {
625 decimal.rescale(target_scale);
626 let mantissa = decimal.mantissa();
627 builder.append_value(i256::from_i128(mantissa));
629 appended = true;
630 }
631 }
632 Some(Value::Integer(i)) => {
633 let mut decimal = Decimal::from(*i);
634 decimal.rescale(target_scale);
635 let mantissa = decimal.mantissa();
636 builder.append_value(i256::from_i128(mantissa));
637 appended = true;
638 }
639 _ => {}
640 }
641
642 if !appended {
643 handle_null_constraints!(builder, nullable, field_name);
644 }
645 }
646 }
647
648 Ok(Arc::new(builder.finish()))
649}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use arrow::{
655 array::{
656 Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray,
657 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
658 TimestampSecondArray,
659 },
660 datatypes::Field,
661 ipc::reader::StreamReader,
662 };
663 use chrono::Utc;
664 use std::io::Cursor;
665 use vector_core::event::LogEvent;
666
667 #[test]
668 fn test_encode_all_types() {
669 let mut log = LogEvent::default();
670 log.insert("string_field", "test");
671 log.insert("int8_field", 127);
672 log.insert("int16_field", 32000);
673 log.insert("int32_field", 1000000);
674 log.insert("int64_field", 42);
675 log.insert("float32_field", 3.15);
676 log.insert("float64_field", 3.15);
677 log.insert("bool_field", true);
678 log.insert("bytes_field", bytes::Bytes::from("binary"));
679 log.insert("timestamp_field", Utc::now());
680
681 let events = vec![Event::Log(log)];
682
683 let schema = Arc::new(Schema::new(vec![
684 Field::new("string_field", DataType::Utf8, true),
685 Field::new("int8_field", DataType::Int8, true),
686 Field::new("int16_field", DataType::Int16, true),
687 Field::new("int32_field", DataType::Int32, true),
688 Field::new("int64_field", DataType::Int64, true),
689 Field::new("float32_field", DataType::Float32, true),
690 Field::new("float64_field", DataType::Float64, true),
691 Field::new("bool_field", DataType::Boolean, true),
692 Field::new("bytes_field", DataType::Binary, true),
693 Field::new(
694 "timestamp_field",
695 DataType::Timestamp(TimeUnit::Millisecond, None),
696 true,
697 ),
698 ]));
699
700 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
701 assert!(result.is_ok());
702
703 let bytes = result.unwrap();
704 let cursor = Cursor::new(bytes);
705 let mut reader = StreamReader::try_new(cursor, None).unwrap();
706 let batch = reader.next().unwrap().unwrap();
707
708 assert_eq!(batch.num_rows(), 1);
709 assert_eq!(batch.num_columns(), 10);
710
711 assert_eq!(
713 batch
714 .column(0)
715 .as_any()
716 .downcast_ref::<StringArray>()
717 .unwrap()
718 .value(0),
719 "test"
720 );
721
722 assert_eq!(
724 batch
725 .column(1)
726 .as_any()
727 .downcast_ref::<arrow::array::Int8Array>()
728 .unwrap()
729 .value(0),
730 127
731 );
732
733 assert_eq!(
735 batch
736 .column(2)
737 .as_any()
738 .downcast_ref::<arrow::array::Int16Array>()
739 .unwrap()
740 .value(0),
741 32000
742 );
743
744 assert_eq!(
746 batch
747 .column(3)
748 .as_any()
749 .downcast_ref::<arrow::array::Int32Array>()
750 .unwrap()
751 .value(0),
752 1000000
753 );
754
755 assert_eq!(
757 batch
758 .column(4)
759 .as_any()
760 .downcast_ref::<Int64Array>()
761 .unwrap()
762 .value(0),
763 42
764 );
765
766 assert!(
768 (batch
769 .column(5)
770 .as_any()
771 .downcast_ref::<arrow::array::Float32Array>()
772 .unwrap()
773 .value(0)
774 - 3.15)
775 .abs()
776 < 0.001
777 );
778
779 assert!(
781 (batch
782 .column(6)
783 .as_any()
784 .downcast_ref::<Float64Array>()
785 .unwrap()
786 .value(0)
787 - 3.15)
788 .abs()
789 < 0.001
790 );
791
792 assert!(
794 batch
795 .column(7)
796 .as_any()
797 .downcast_ref::<BooleanArray>()
798 .unwrap()
799 .value(0),
800 "{}",
801 true
802 );
803
804 assert_eq!(
806 batch
807 .column(8)
808 .as_any()
809 .downcast_ref::<BinaryArray>()
810 .unwrap()
811 .value(0),
812 b"binary"
813 );
814
815 assert!(
817 !batch
818 .column(9)
819 .as_any()
820 .downcast_ref::<TimestampMillisecondArray>()
821 .unwrap()
822 .is_null(0)
823 );
824 }
825
826 #[test]
827 fn test_encode_null_values() {
828 let mut log1 = LogEvent::default();
829 log1.insert("field_a", 1);
830 let mut log2 = LogEvent::default();
833 log2.insert("field_b", 2);
834 let events = vec![Event::Log(log1), Event::Log(log2)];
837
838 let schema = Arc::new(Schema::new(vec![
839 Field::new("field_a", DataType::Int64, true),
840 Field::new("field_b", DataType::Int64, true),
841 ]));
842
843 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
844 assert!(result.is_ok());
845
846 let bytes = result.unwrap();
847 let cursor = Cursor::new(bytes);
848 let mut reader = StreamReader::try_new(cursor, None).unwrap();
849 let batch = reader.next().unwrap().unwrap();
850
851 assert_eq!(batch.num_rows(), 2);
852
853 let field_a = batch
854 .column(0)
855 .as_any()
856 .downcast_ref::<Int64Array>()
857 .unwrap();
858 assert_eq!(field_a.value(0), 1);
859 assert!(field_a.is_null(1));
860
861 let field_b = batch
862 .column(1)
863 .as_any()
864 .downcast_ref::<Int64Array>()
865 .unwrap();
866 assert!(field_b.is_null(0));
867 assert_eq!(field_b.value(1), 2);
868 }
869
870 #[test]
871 fn test_encode_type_mismatches() {
872 let mut log1 = LogEvent::default();
873 log1.insert("field", 42); let mut log2 = LogEvent::default();
876 log2.insert("field", 3.15); let events = vec![Event::Log(log1), Event::Log(log2)];
879
880 let schema = Arc::new(Schema::new(vec![Field::new(
882 "field",
883 DataType::Int64,
884 true,
885 )]));
886
887 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
888 assert!(result.is_ok());
889
890 let bytes = result.unwrap();
891 let cursor = Cursor::new(bytes);
892 let mut reader = StreamReader::try_new(cursor, None).unwrap();
893 let batch = reader.next().unwrap().unwrap();
894
895 assert_eq!(batch.num_rows(), 2);
896
897 let field_array = batch
898 .column(0)
899 .as_any()
900 .downcast_ref::<Int64Array>()
901 .unwrap();
902 assert_eq!(field_array.value(0), 42);
903 assert!(field_array.is_null(1)); }
905
906 #[test]
907 fn test_encode_complex_json_values() {
908 use serde_json::json;
909
910 let mut log = LogEvent::default();
911 log.insert(
912 "object_field",
913 json!({"key": "value", "nested": {"count": 42}}),
914 );
915 log.insert("array_field", json!([1, 2, 3]));
916
917 let events = vec![Event::Log(log)];
918
919 let schema = Arc::new(Schema::new(vec![
920 Field::new("object_field", DataType::Utf8, true),
921 Field::new("array_field", DataType::Utf8, true),
922 ]));
923
924 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
925 assert!(result.is_ok());
926
927 let bytes = result.unwrap();
928 let cursor = Cursor::new(bytes);
929 let mut reader = StreamReader::try_new(cursor, None).unwrap();
930 let batch = reader.next().unwrap().unwrap();
931
932 assert_eq!(batch.num_rows(), 1);
933
934 let object_array = batch
935 .column(0)
936 .as_any()
937 .downcast_ref::<StringArray>()
938 .unwrap();
939 let object_str = object_array.value(0);
940 assert!(object_str.contains("key"));
941 assert!(object_str.contains("value"));
942
943 let array_array = batch
944 .column(1)
945 .as_any()
946 .downcast_ref::<StringArray>()
947 .unwrap();
948 let array_str = array_array.value(0);
949 assert_eq!(array_str, "[1,2,3]");
950 }
951
952 #[test]
953 fn test_encode_unsupported_type() {
954 let mut log = LogEvent::default();
955 log.insert("field", "value");
956
957 let events = vec![Event::Log(log)];
958
959 let schema = Arc::new(Schema::new(vec![Field::new(
961 "field",
962 DataType::Duration(TimeUnit::Millisecond),
963 true,
964 )]));
965
966 let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
967 assert!(result.is_err());
968 assert!(matches!(
969 result.unwrap_err(),
970 ArrowEncodingError::UnsupportedType { .. }
971 ));
972 }
973
974 #[test]
975 fn test_encode_without_schema_fails() {
976 let mut log1 = LogEvent::default();
977 log1.insert("message", "hello");
978
979 let events = vec![Event::Log(log1)];
980
981 let result = encode_events_to_arrow_ipc_stream(&events, None);
982 assert!(result.is_err());
983 assert!(matches!(
984 result.unwrap_err(),
985 ArrowEncodingError::NoSchemaProvided
986 ));
987 }
988
989 #[test]
990 fn test_encode_empty_events() {
991 let events: Vec<Event> = vec![];
992 let result = encode_events_to_arrow_ipc_stream(&events, None);
993 assert!(result.is_err());
994 assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
995 }
996
997 #[test]
998 fn test_encode_timestamp_precisions() {
999 let now = Utc::now();
1000 let mut log = LogEvent::default();
1001 log.insert("ts_second", now);
1002 log.insert("ts_milli", now);
1003 log.insert("ts_micro", now);
1004 log.insert("ts_nano", now);
1005
1006 let events = vec![Event::Log(log)];
1007
1008 let schema = Arc::new(Schema::new(vec![
1009 Field::new(
1010 "ts_second",
1011 DataType::Timestamp(TimeUnit::Second, None),
1012 true,
1013 ),
1014 Field::new(
1015 "ts_milli",
1016 DataType::Timestamp(TimeUnit::Millisecond, None),
1017 true,
1018 ),
1019 Field::new(
1020 "ts_micro",
1021 DataType::Timestamp(TimeUnit::Microsecond, None),
1022 true,
1023 ),
1024 Field::new(
1025 "ts_nano",
1026 DataType::Timestamp(TimeUnit::Nanosecond, None),
1027 true,
1028 ),
1029 ]));
1030
1031 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1032 assert!(result.is_ok());
1033
1034 let bytes = result.unwrap();
1035 let cursor = Cursor::new(bytes);
1036 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1037 let batch = reader.next().unwrap().unwrap();
1038
1039 assert_eq!(batch.num_rows(), 1);
1040 assert_eq!(batch.num_columns(), 4);
1041
1042 let ts_second = batch
1043 .column(0)
1044 .as_any()
1045 .downcast_ref::<TimestampSecondArray>()
1046 .unwrap();
1047 assert!(!ts_second.is_null(0));
1048 assert_eq!(ts_second.value(0), now.timestamp());
1049
1050 let ts_milli = batch
1051 .column(1)
1052 .as_any()
1053 .downcast_ref::<TimestampMillisecondArray>()
1054 .unwrap();
1055 assert!(!ts_milli.is_null(0));
1056 assert_eq!(ts_milli.value(0), now.timestamp_millis());
1057
1058 let ts_micro = batch
1059 .column(2)
1060 .as_any()
1061 .downcast_ref::<TimestampMicrosecondArray>()
1062 .unwrap();
1063 assert!(!ts_micro.is_null(0));
1064 assert_eq!(ts_micro.value(0), now.timestamp_micros());
1065
1066 let ts_nano = batch
1067 .column(3)
1068 .as_any()
1069 .downcast_ref::<TimestampNanosecondArray>()
1070 .unwrap();
1071 assert!(!ts_nano.is_null(0));
1072 assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
1073 }
1074
1075 #[test]
1076 fn test_encode_mixed_timestamp_string_and_native() {
1077 let mut log1 = LogEvent::default();
1079 log1.insert("ts", "2025-10-22T10:18:44.256Z"); let mut log2 = LogEvent::default();
1082 log2.insert("ts", Utc::now()); let mut log3 = LogEvent::default();
1085 log3.insert("ts", 1729594724256000000_i64); let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1088
1089 let schema = Arc::new(Schema::new(vec![Field::new(
1090 "ts",
1091 DataType::Timestamp(TimeUnit::Nanosecond, None),
1092 true,
1093 )]));
1094
1095 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1096 assert!(result.is_ok());
1097
1098 let bytes = result.unwrap();
1099 let cursor = Cursor::new(bytes);
1100 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1101 let batch = reader.next().unwrap().unwrap();
1102
1103 assert_eq!(batch.num_rows(), 3);
1104
1105 let ts_array = batch
1106 .column(0)
1107 .as_any()
1108 .downcast_ref::<TimestampNanosecondArray>()
1109 .unwrap();
1110
1111 assert!(!ts_array.is_null(0));
1113 assert!(!ts_array.is_null(1));
1114 assert!(!ts_array.is_null(2));
1115
1116 let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
1118 .unwrap()
1119 .timestamp_nanos_opt()
1120 .unwrap();
1121 assert_eq!(ts_array.value(0), expected);
1122
1123 assert_eq!(ts_array.value(2), 1729594724256000000_i64);
1125 }
1126
1127 #[test]
1128 fn test_encode_invalid_string_timestamp() {
1129 let mut log1 = LogEvent::default();
1131 log1.insert("timestamp", "not-a-timestamp");
1132
1133 let mut log2 = LogEvent::default();
1134 log2.insert("timestamp", "2025-10-22T10:18:44.256Z"); let mut log3 = LogEvent::default();
1137 log3.insert("timestamp", "2025-99-99T99:99:99Z"); let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1140
1141 let schema = Arc::new(Schema::new(vec![Field::new(
1142 "timestamp",
1143 DataType::Timestamp(TimeUnit::Nanosecond, None),
1144 true,
1145 )]));
1146
1147 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1148 assert!(result.is_ok());
1149
1150 let bytes = result.unwrap();
1151 let cursor = Cursor::new(bytes);
1152 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1153 let batch = reader.next().unwrap().unwrap();
1154
1155 assert_eq!(batch.num_rows(), 3);
1156
1157 let ts_array = batch
1158 .column(0)
1159 .as_any()
1160 .downcast_ref::<TimestampNanosecondArray>()
1161 .unwrap();
1162
1163 assert!(ts_array.is_null(0));
1165 assert!(!ts_array.is_null(1)); assert!(ts_array.is_null(2));
1167 }
1168
1169 #[test]
1170 fn test_encode_decimal128_from_integer() {
1171 use arrow::array::Decimal128Array;
1172
1173 let mut log = LogEvent::default();
1174 log.insert("quantity", 1000_i64);
1176
1177 let events = vec![Event::Log(log)];
1178
1179 let schema = Arc::new(Schema::new(vec![Field::new(
1181 "quantity",
1182 DataType::Decimal128(10, 3),
1183 true,
1184 )]));
1185
1186 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1187 assert!(result.is_ok());
1188
1189 let bytes = result.unwrap();
1190 let cursor = Cursor::new(bytes);
1191 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1192 let batch = reader.next().unwrap().unwrap();
1193
1194 assert_eq!(batch.num_rows(), 1);
1195
1196 let decimal_array = batch
1197 .column(0)
1198 .as_any()
1199 .downcast_ref::<Decimal128Array>()
1200 .unwrap();
1201
1202 assert!(!decimal_array.is_null(0));
1203 assert_eq!(decimal_array.value(0), 1000000_i128);
1205 }
1206
1207 #[test]
1208 fn test_encode_decimal256() {
1209 use arrow::array::Decimal256Array;
1210
1211 let mut log = LogEvent::default();
1212 log.insert("big_value", 123456789.123456_f64);
1214
1215 let events = vec![Event::Log(log)];
1216
1217 let schema = Arc::new(Schema::new(vec![Field::new(
1219 "big_value",
1220 DataType::Decimal256(50, 6),
1221 true,
1222 )]));
1223
1224 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1225 assert!(result.is_ok());
1226
1227 let bytes = result.unwrap();
1228 let cursor = Cursor::new(bytes);
1229 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1230 let batch = reader.next().unwrap().unwrap();
1231
1232 assert_eq!(batch.num_rows(), 1);
1233
1234 let decimal_array = batch
1235 .column(0)
1236 .as_any()
1237 .downcast_ref::<Decimal256Array>()
1238 .unwrap();
1239
1240 assert!(!decimal_array.is_null(0));
1241 let value = decimal_array.value(0);
1243 assert!(value.to_i128().is_some());
1244 }
1245
1246 #[test]
1247 fn test_encode_decimal_null_values() {
1248 use arrow::array::Decimal128Array;
1249
1250 let mut log1 = LogEvent::default();
1251 log1.insert("price", 99.99_f64);
1252
1253 let log2 = LogEvent::default();
1254 let mut log3 = LogEvent::default();
1257 log3.insert("price", 50.00_f64);
1258
1259 let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1260
1261 let schema = Arc::new(Schema::new(vec![Field::new(
1262 "price",
1263 DataType::Decimal128(10, 2),
1264 true,
1265 )]));
1266
1267 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1268 assert!(result.is_ok());
1269
1270 let bytes = result.unwrap();
1271 let cursor = Cursor::new(bytes);
1272 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1273 let batch = reader.next().unwrap().unwrap();
1274
1275 assert_eq!(batch.num_rows(), 3);
1276
1277 let decimal_array = batch
1278 .column(0)
1279 .as_any()
1280 .downcast_ref::<Decimal128Array>()
1281 .unwrap();
1282
1283 assert!(!decimal_array.is_null(0));
1285 assert_eq!(decimal_array.value(0), 9999_i128);
1286
1287 assert!(decimal_array.is_null(1));
1289
1290 assert!(!decimal_array.is_null(2));
1292 assert_eq!(decimal_array.value(2), 5000_i128);
1293 }
1294
1295 #[test]
1296 fn test_encode_unsigned_integer_types() {
1297 use arrow::array::{UInt8Array, UInt16Array, UInt32Array, UInt64Array};
1298
1299 let mut log = LogEvent::default();
1300 log.insert("uint8_field", 255_i64);
1301 log.insert("uint16_field", 65535_i64);
1302 log.insert("uint32_field", 4294967295_i64);
1303 log.insert("uint64_field", 9223372036854775807_i64);
1304
1305 let events = vec![Event::Log(log)];
1306
1307 let schema = Arc::new(Schema::new(vec![
1308 Field::new("uint8_field", DataType::UInt8, true),
1309 Field::new("uint16_field", DataType::UInt16, true),
1310 Field::new("uint32_field", DataType::UInt32, true),
1311 Field::new("uint64_field", DataType::UInt64, true),
1312 ]));
1313
1314 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1315 assert!(result.is_ok());
1316
1317 let bytes = result.unwrap();
1318 let cursor = Cursor::new(bytes);
1319 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1320 let batch = reader.next().unwrap().unwrap();
1321
1322 assert_eq!(batch.num_rows(), 1);
1323 assert_eq!(batch.num_columns(), 4);
1324
1325 let uint8_array = batch
1327 .column(0)
1328 .as_any()
1329 .downcast_ref::<UInt8Array>()
1330 .unwrap();
1331 assert_eq!(uint8_array.value(0), 255_u8);
1332
1333 let uint16_array = batch
1335 .column(1)
1336 .as_any()
1337 .downcast_ref::<UInt16Array>()
1338 .unwrap();
1339 assert_eq!(uint16_array.value(0), 65535_u16);
1340
1341 let uint32_array = batch
1343 .column(2)
1344 .as_any()
1345 .downcast_ref::<UInt32Array>()
1346 .unwrap();
1347 assert_eq!(uint32_array.value(0), 4294967295_u32);
1348
1349 let uint64_array = batch
1351 .column(3)
1352 .as_any()
1353 .downcast_ref::<UInt64Array>()
1354 .unwrap();
1355 assert_eq!(uint64_array.value(0), 9223372036854775807_u64);
1356 }
1357
1358 #[test]
1359 fn test_encode_unsigned_integers_with_null_and_overflow() {
1360 use arrow::array::{UInt8Array, UInt32Array};
1361
1362 let mut log1 = LogEvent::default();
1363 log1.insert("uint8_field", 100_i64);
1364 log1.insert("uint32_field", 1000_i64);
1365
1366 let mut log2 = LogEvent::default();
1367 log2.insert("uint8_field", 300_i64); log2.insert("uint32_field", -1_i64); let log3 = LogEvent::default();
1371 let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1374
1375 let schema = Arc::new(Schema::new(vec![
1376 Field::new("uint8_field", DataType::UInt8, true),
1377 Field::new("uint32_field", DataType::UInt32, true),
1378 ]));
1379
1380 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1381 assert!(result.is_ok());
1382
1383 let bytes = result.unwrap();
1384 let cursor = Cursor::new(bytes);
1385 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1386 let batch = reader.next().unwrap().unwrap();
1387
1388 assert_eq!(batch.num_rows(), 3);
1389
1390 let uint8_array = batch
1392 .column(0)
1393 .as_any()
1394 .downcast_ref::<UInt8Array>()
1395 .unwrap();
1396 assert_eq!(uint8_array.value(0), 100_u8); assert!(uint8_array.is_null(1)); assert!(uint8_array.is_null(2)); let uint32_array = batch
1402 .column(1)
1403 .as_any()
1404 .downcast_ref::<UInt32Array>()
1405 .unwrap();
1406 assert_eq!(uint32_array.value(0), 1000_u32); assert!(uint32_array.is_null(1)); assert!(uint32_array.is_null(2)); }
1410
1411 #[test]
1412 fn test_encode_non_nullable_field_with_null_value() {
1413 let mut log1 = LogEvent::default();
1415 log1.insert("required_field", 42);
1416
1417 let log2 = LogEvent::default();
1418 let events = vec![Event::Log(log1), Event::Log(log2)];
1421
1422 let schema = Arc::new(Schema::new(vec![Field::new(
1424 "required_field",
1425 DataType::Int64,
1426 false, )]));
1428
1429 let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
1430 assert!(result.is_err());
1431
1432 match result.unwrap_err() {
1433 ArrowEncodingError::NullConstraint { field_name } => {
1434 assert_eq!(field_name, "required_field");
1435 }
1436 other => panic!("Expected NullConstraint error, got: {:?}", other),
1437 }
1438 }
1439
1440 #[test]
1441 fn test_encode_non_nullable_string_field_with_missing_value() {
1442 let mut log1 = LogEvent::default();
1444 log1.insert("name", "Alice");
1445
1446 let mut log2 = LogEvent::default();
1447 log2.insert("name", "Bob");
1448
1449 let log3 = LogEvent::default();
1450 let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1453
1454 let schema = Arc::new(Schema::new(vec![Field::new(
1455 "name",
1456 DataType::Utf8,
1457 false, )]));
1459
1460 let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
1461 assert!(result.is_err());
1462
1463 match result.unwrap_err() {
1464 ArrowEncodingError::NullConstraint { field_name } => {
1465 assert_eq!(field_name, "name");
1466 }
1467 other => panic!("Expected NullConstraint error, got: {:?}", other),
1468 }
1469 }
1470
1471 #[test]
1472 fn test_encode_non_nullable_field_all_values_present() {
1473 let mut log1 = LogEvent::default();
1475 log1.insert("id", 1);
1476
1477 let mut log2 = LogEvent::default();
1478 log2.insert("id", 2);
1479
1480 let mut log3 = LogEvent::default();
1481 log3.insert("id", 3);
1482
1483 let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1484
1485 let schema = Arc::new(Schema::new(vec![Field::new(
1486 "id",
1487 DataType::Int64,
1488 false, )]));
1490
1491 let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1492 assert!(result.is_ok());
1493
1494 let bytes = result.unwrap();
1495 let cursor = Cursor::new(bytes);
1496 let mut reader = StreamReader::try_new(cursor, None).unwrap();
1497 let batch = reader.next().unwrap().unwrap();
1498
1499 assert_eq!(batch.num_rows(), 3);
1500
1501 let id_array = batch
1502 .column(0)
1503 .as_any()
1504 .downcast_ref::<Int64Array>()
1505 .unwrap();
1506
1507 assert_eq!(id_array.value(0), 1);
1508 assert_eq!(id_array.value(1), 2);
1509 assert_eq!(id_array.value(2), 3);
1510 assert!(!id_array.is_null(0));
1511 assert!(!id_array.is_null(1));
1512 assert!(!id_array.is_null(2));
1513 }
1514
1515 #[test]
1516 fn test_config_allow_nullable_fields_overrides_schema() {
1517 use tokio_util::codec::Encoder;
1518
1519 let mut log1 = LogEvent::default();
1521 log1.insert("strict_field", 42);
1522 let log2 = LogEvent::default();
1523 let events = vec![Event::Log(log1), Event::Log(log2)];
1524
1525 let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
1526
1527 let mut config = ArrowStreamSerializerConfig::new(schema);
1528 config.allow_nullable_fields = true;
1529
1530 let mut serializer =
1531 ArrowStreamSerializer::new(config).expect("Failed to create serializer");
1532
1533 let mut buffer = BytesMut::new();
1534 serializer
1535 .encode(events, &mut buffer)
1536 .expect("Encoding should succeed when allow_nullable_fields is true");
1537
1538 let cursor = Cursor::new(buffer);
1539 let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
1540 let batch = reader.next().unwrap().expect("Failed to read batch");
1541
1542 assert_eq!(batch.num_rows(), 2);
1543
1544 let binding = batch.schema();
1545 let output_field = binding.field(0);
1546 assert!(
1547 output_field.is_nullable(),
1548 "The output schema field should have been transformed to nullable=true"
1549 );
1550
1551 let array = batch
1552 .column(0)
1553 .as_any()
1554 .downcast_ref::<Int64Array>()
1555 .unwrap();
1556
1557 assert_eq!(array.value(0), 42);
1558 assert!(!array.is_null(0));
1559 assert!(
1560 array.is_null(1),
1561 "The missing value should be encoded as null"
1562 );
1563 }
1564
1565 #[test]
1566 fn test_make_field_nullable_with_nested_types() {
1567 let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
1572 let inner_struct =
1573 DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
1574 let list_field = Field::new("item", inner_struct, false);
1575 let list_type = DataType::List(Arc::new(list_field));
1576 let outer_field = Field::new("inner_list", list_type, false);
1577 let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
1578
1579 let original_field = Field::new("root", outer_struct, false);
1580
1581 let nullable_field = make_field_nullable(&original_field);
1583
1584 assert!(
1586 nullable_field.is_nullable(),
1587 "Root field should be nullable"
1588 );
1589
1590 if let DataType::Struct(root_fields) = nullable_field.data_type() {
1592 let inner_list_field = &root_fields[0];
1593 assert!(
1594 inner_list_field.is_nullable(),
1595 "inner_list field should be nullable"
1596 );
1597
1598 if let DataType::List(list_item_field) = inner_list_field.data_type() {
1600 assert!(
1601 list_item_field.is_nullable(),
1602 "List item field should be nullable"
1603 );
1604
1605 if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
1607 let nested_field = &inner_struct_fields[0];
1608 assert!(
1609 nested_field.is_nullable(),
1610 "nested_field should be nullable"
1611 );
1612 } else {
1613 panic!("Expected Struct type for list items");
1614 }
1615 } else {
1616 panic!("Expected List type for inner_list");
1617 }
1618 } else {
1619 panic!("Expected Struct type for root field");
1620 }
1621 }
1622
1623 #[test]
1624 fn test_make_field_nullable_with_map_type() {
1625 let key_field = Field::new("key", DataType::Utf8, false);
1631 let value_field = Field::new("value", DataType::Int64, false);
1632 let entries_struct =
1633 DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
1634 let entries_field = Field::new("entries", entries_struct, false);
1635 let map_type = DataType::Map(Arc::new(entries_field), false);
1636
1637 let original_field = Field::new("my_map", map_type, false);
1638
1639 let nullable_field = make_field_nullable(&original_field);
1641
1642 assert!(
1644 nullable_field.is_nullable(),
1645 "Root map field should be nullable"
1646 );
1647
1648 if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
1650 assert!(
1651 entries_field.is_nullable(),
1652 "Map entries field should be nullable"
1653 );
1654
1655 if let DataType::Struct(struct_fields) = entries_field.data_type() {
1657 let key_field = &struct_fields[0];
1658 let value_field = &struct_fields[1];
1659 assert!(key_field.is_nullable(), "Map key field should be nullable");
1660 assert!(
1661 value_field.is_nullable(),
1662 "Map value field should be nullable"
1663 );
1664 } else {
1665 panic!("Expected Struct type for map entries");
1666 }
1667 } else {
1668 panic!("Expected Map type for my_map field");
1669 }
1670 }
1671}