codecs/encoding/format/
arrow.rs

1//! Arrow IPC streaming format codec for batched event encoding
2//!
3//! Provides Apache Arrow IPC stream format encoding with static schema support.
4//! This implements the streaming variant of the Arrow IPC protocol, which writes
5//! a continuous stream of record batches without a file footer.
6
7use arrow::{
8    datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9    ipc::writer::StreamWriter,
10    json::reader::ReaderBuilder,
11    record_batch::RecordBatch,
12};
13use async_trait::async_trait;
14use bytes::{BufMut, Bytes, BytesMut};
15use snafu::{ResultExt, Snafu, ensure};
16use vector_config::configurable_component;
17use vector_core::event::Event;
18
19/// Provides Arrow schema for encoding.
20///
21/// Sinks can implement this trait to provide custom schema fetching logic.
22#[async_trait]
23pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
24    /// Fetch the Arrow schema from the data store.
25    ///
26    /// This is called during sink configuration build phase to fetch
27    /// the schema once at startup, rather than at runtime.
28    async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
29}
30
31/// Configuration for Arrow IPC stream serialization
32#[configurable_component]
33#[derive(Clone, Default)]
34pub struct ArrowStreamSerializerConfig {
35    /// The Arrow schema to use for encoding
36    #[serde(skip)]
37    #[configurable(derived)]
38    pub schema: Option<arrow::datatypes::Schema>,
39
40    /// Allow null values for non-nullable fields in the schema.
41    ///
42    /// When enabled, missing or incompatible values will be encoded as null even for fields
43    /// marked as non-nullable in the Arrow schema. This is useful when working with downstream
44    /// systems that can handle null values through defaults, computed columns, or other mechanisms.
45    ///
46    /// When disabled (default), missing values for non-nullable fields will cause encoding errors,
47    /// ensuring all required data is present before sending to the sink.
48    #[serde(default)]
49    #[configurable(derived)]
50    pub allow_nullable_fields: bool,
51}
52
53impl std::fmt::Debug for ArrowStreamSerializerConfig {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct("ArrowStreamSerializerConfig")
56            .field(
57                "schema",
58                &self
59                    .schema
60                    .as_ref()
61                    .map(|s| format!("{} fields", s.fields().len())),
62            )
63            .field("allow_nullable_fields", &self.allow_nullable_fields)
64            .finish()
65    }
66}
67
68impl ArrowStreamSerializerConfig {
69    /// Create a new ArrowStreamSerializerConfig with a schema
70    pub fn new(schema: arrow::datatypes::Schema) -> Self {
71        Self {
72            schema: Some(schema),
73            allow_nullable_fields: false,
74        }
75    }
76
77    /// The data type of events that are accepted by `ArrowStreamEncoder`.
78    pub fn input_type(&self) -> vector_core::config::DataType {
79        vector_core::config::DataType::Log
80    }
81
82    /// The schema required by the serializer.
83    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
84        vector_core::schema::Requirement::empty()
85    }
86}
87
88/// Arrow IPC stream batch serializer that holds the schema
89#[derive(Clone, Debug)]
90pub struct ArrowStreamSerializer {
91    schema: SchemaRef,
92}
93
94impl ArrowStreamSerializer {
95    /// Create a new ArrowStreamSerializer with the given configuration
96    pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
97        let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
98
99        // If allow_nullable_fields is enabled, transform the schema once here
100        // instead of on every batch encoding
101        let schema = if config.allow_nullable_fields {
102            let nullable_fields: Fields = schema
103                .fields()
104                .iter()
105                .map(|f| make_field_nullable(f))
106                .collect::<Result<Vec<_>, _>>()?
107                .into();
108            Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
109        } else {
110            schema
111        };
112
113        Ok(Self {
114            schema: SchemaRef::new(schema),
115        })
116    }
117}
118
119impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
120    type Error = ArrowEncodingError;
121
122    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
123        if events.is_empty() {
124            return Err(ArrowEncodingError::NoEvents);
125        }
126
127        let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
128
129        buffer.extend_from_slice(&bytes);
130        Ok(())
131    }
132}
133
134/// Errors that can occur during Arrow encoding
135#[derive(Debug, Snafu)]
136pub enum ArrowEncodingError {
137    /// Failed to create Arrow record batch
138    #[snafu(display("Failed to create Arrow record batch: {source}"))]
139    RecordBatchCreation {
140        /// The underlying Arrow error
141        source: arrow::error::ArrowError,
142    },
143
144    /// Failed to write Arrow IPC data
145    #[snafu(display("Failed to write Arrow IPC data: {source}"))]
146    IpcWrite {
147        /// The underlying Arrow error
148        source: arrow::error::ArrowError,
149    },
150
151    /// No events provided for encoding
152    #[snafu(display("No events provided for encoding"))]
153    NoEvents,
154
155    /// Failed to fetch schema from provider
156    #[snafu(display("Failed to fetch schema from provider: {message}"))]
157    SchemaFetchError {
158        /// Error message from the provider
159        message: String,
160    },
161
162    /// Null value encountered for non-nullable field
163    #[snafu(display("Null value for non-nullable field '{field_name}'"))]
164    NullConstraint {
165        /// The field name
166        field_name: String,
167    },
168
169    /// Arrow serializer requires a schema
170    #[snafu(display("Arrow serializer requires a schema"))]
171    MissingSchema,
172
173    /// IO error during encoding
174    #[snafu(display("IO error: {source}"), context(false))]
175    Io {
176        /// The underlying IO error
177        source: std::io::Error,
178    },
179
180    /// Arrow JSON decoding error
181    #[snafu(display("Arrow JSON decoding error: {source}"))]
182    ArrowJsonDecode {
183        /// The underlying Arrow error
184        source: arrow::error::ArrowError,
185    },
186
187    /// Invalid Map schema structure
188    #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
189    InvalidMapSchema {
190        /// The field name
191        field_name: String,
192        /// Description of the schema violation
193        reason: String,
194    },
195}
196
197/// Encodes a batch of events into Arrow IPC streaming format
198pub fn encode_events_to_arrow_ipc_stream(
199    events: &[Event],
200    schema: SchemaRef,
201) -> Result<Bytes, ArrowEncodingError> {
202    if events.is_empty() {
203        return Err(ArrowEncodingError::NoEvents);
204    }
205
206    let record_batch = build_record_batch(schema, events)?;
207
208    let mut buffer = BytesMut::new().writer();
209    let mut writer =
210        StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
211    writer.write(&record_batch).context(IpcWriteSnafu)?;
212    writer.finish().context(IpcWriteSnafu)?;
213
214    Ok(buffer.into_inner().freeze())
215}
216
217/// Recursively makes a Field and all its nested fields nullable
218fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
219    let new_data_type = match field.data_type() {
220        DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
221        DataType::Struct(fields) => DataType::Struct(
222            fields
223                .iter()
224                .map(|f| make_field_nullable(f))
225                .collect::<Result<Vec<_>, _>>()?
226                .into(),
227        ),
228        DataType::Map(inner, sorted) => {
229            // A Map's inner field is a "entries" Struct<Key, Value>
230            let DataType::Struct(fields) = inner.data_type() else {
231                return InvalidMapSchemaSnafu {
232                    field_name: field.name(),
233                    reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
234                }
235                .fail();
236            };
237
238            ensure!(
239                fields.len() == 2,
240                InvalidMapSchemaSnafu {
241                    field_name: field.name(),
242                    reason: format!("expected 2 fields (key, value), found {}", fields.len()),
243                },
244            );
245            let key_field = &fields[0];
246            let value_field = &fields[1];
247
248            let new_struct_fields: Fields =
249                [key_field.clone(), make_field_nullable(value_field)?.into()].into();
250
251            // Reconstruct the inner "entries" field
252            // The inner field itself must be non-nullable (only the Map wrapper is nullable)
253            let new_inner_field = inner
254                .as_ref()
255                .clone()
256                .with_data_type(DataType::Struct(new_struct_fields))
257                .with_nullable(false);
258
259            DataType::Map(new_inner_field.into(), *sorted)
260        }
261        other => other.clone(),
262    };
263
264    Ok(field
265        .clone()
266        .with_data_type(new_data_type)
267        .with_nullable(true))
268}
269
270/// Returns true if the field is absent from the value's object map, or explicitly null.
271/// Find non-nullable schema fields that are missing or null in any of the given events.
272pub fn find_null_non_nullable_fields<'a>(
273    schema: &'a Schema,
274    values: &[&vrl::value::Value],
275) -> Vec<&'a str> {
276    schema
277        .fields()
278        .iter()
279        .filter(|field| {
280            !field.is_nullable()
281                && values.iter().any(|value| {
282                    value
283                        .as_object()
284                        .and_then(|map| map.get(field.name().as_str()))
285                        .is_none_or(vrl::value::Value::is_null)
286                })
287        })
288        .map(|field| field.name().as_str())
289        .collect()
290}
291
292/// Build an Arrow RecordBatch from a slice of events using the provided schema.
293fn build_record_batch(
294    schema: SchemaRef,
295    events: &[Event],
296) -> Result<RecordBatch, ArrowEncodingError> {
297    let values: Vec<_> = events
298        .iter()
299        .filter_map(Event::maybe_as_log)
300        .map(|log| log.value())
301        .collect();
302
303    if values.is_empty() {
304        return Err(ArrowEncodingError::NoEvents);
305    }
306
307    let missing = find_null_non_nullable_fields(&schema, &values);
308    if !missing.is_empty() {
309        for field_name in &missing {
310            let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
311                field_name: field_name.to_string(),
312            });
313            vector_common::internal_event::emit(
314                crate::internal_events::EncoderNullConstraintError { error: &error },
315            );
316        }
317        return Err(ArrowEncodingError::NullConstraint {
318            field_name: missing.join(", "),
319        });
320    }
321
322    let mut decoder = ReaderBuilder::new(schema)
323        .build_decoder()
324        .context(RecordBatchCreationSnafu)?;
325
326    decoder.serialize(&values).context(ArrowJsonDecodeSnafu)?;
327
328    decoder
329        .flush()
330        .context(ArrowJsonDecodeSnafu)?
331        .ok_or(ArrowEncodingError::NoEvents)
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use arrow::{
338        array::{Array, AsArray},
339        datatypes::TimeUnit,
340        ipc::reader::StreamReader,
341    };
342    use chrono::Utc;
343    use std::io::Cursor;
344    use vector_core::event::{LogEvent, Value};
345
346    /// Helper to encode events and return the decoded RecordBatch
347    fn encode_and_decode(
348        events: Vec<Event>,
349        schema: SchemaRef,
350    ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
351        let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
352        let cursor = Cursor::new(bytes);
353        let mut reader = StreamReader::try_new(cursor, None)?;
354        Ok(reader.next().unwrap()?)
355    }
356
357    /// Create a simple event from key-value pairs
358    fn create_event<V>(fields: Vec<(&str, V)>) -> Event
359    where
360        V: Into<Value>,
361    {
362        let mut log = LogEvent::default();
363        for (key, value) in fields {
364            log.insert(key, value.into());
365        }
366        Event::Log(log)
367    }
368
369    mod comprehensive {
370        use super::*;
371
372        #[test]
373        fn test_encode_all_types() {
374            use arrow::datatypes::{
375                Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
376                Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
377            };
378            use vrl::value::ObjectMap;
379
380            let now = Utc::now();
381
382            // Create a struct (tuple) value with unnamed fields
383            let mut tuple_value = ObjectMap::new();
384            tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
385            tuple_value.insert("f1".into(), Value::Integer(999));
386
387            // Create a named struct (named tuple) value
388            let mut named_tuple_value = ObjectMap::new();
389            named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
390            named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
391
392            // Create a list value
393            let list_value = Value::Array(vec![
394                Value::Integer(1),
395                Value::Integer(2),
396                Value::Integer(3),
397            ]);
398
399            // Create a map value
400            let mut map_value = ObjectMap::new();
401            map_value.insert("key1".into(), Value::Integer(100));
402            map_value.insert("key2".into(), Value::Integer(200));
403
404            let mut log = LogEvent::default();
405            // Primitive types
406            log.insert("string_field", "test");
407            log.insert("int8_field", 127);
408            log.insert("int16_field", 32000);
409            log.insert("int32_field", 1000000);
410            log.insert("int64_field", 42);
411            log.insert("uint8_field", 255);
412            log.insert("uint16_field", 65535);
413            log.insert("uint32_field", 4000000);
414            log.insert("uint64_field", 9000000000_i64);
415            log.insert("float32_field", 3.15);
416            log.insert("float64_field", 3.15);
417            log.insert("bool_field", true);
418            log.insert("timestamp_field", now);
419            log.insert("decimal_field", 99.99);
420            // Complex types
421            log.insert("list_field", list_value);
422            log.insert("struct_field", Value::Object(tuple_value));
423            log.insert("named_struct_field", Value::Object(named_tuple_value));
424            log.insert("map_field", Value::Object(map_value));
425
426            let events = vec![Event::Log(log)];
427
428            // Build schema with all supported types
429            let struct_fields = arrow::datatypes::Fields::from(vec![
430                Field::new("f0", DataType::Utf8, true),
431                Field::new("f1", DataType::Int64, true),
432            ]);
433
434            let named_struct_fields = arrow::datatypes::Fields::from(vec![
435                Field::new("category", DataType::Utf8, true),
436                Field::new("tag", DataType::Utf8, true),
437            ]);
438
439            let map_entries = Field::new(
440                "entries",
441                DataType::Struct(arrow::datatypes::Fields::from(vec![
442                    Field::new("keys", DataType::Utf8, false),
443                    Field::new("values", DataType::Int64, true),
444                ])),
445                false,
446            );
447
448            let schema = Schema::new(vec![
449                Field::new("string_field", DataType::Utf8, true),
450                Field::new("int8_field", DataType::Int8, true),
451                Field::new("int16_field", DataType::Int16, true),
452                Field::new("int32_field", DataType::Int32, true),
453                Field::new("int64_field", DataType::Int64, true),
454                Field::new("uint8_field", DataType::UInt8, true),
455                Field::new("uint16_field", DataType::UInt16, true),
456                Field::new("uint32_field", DataType::UInt32, true),
457                Field::new("uint64_field", DataType::UInt64, true),
458                Field::new("float32_field", DataType::Float32, true),
459                Field::new("float64_field", DataType::Float64, true),
460                Field::new("bool_field", DataType::Boolean, true),
461                Field::new(
462                    "timestamp_field",
463                    DataType::Timestamp(TimeUnit::Millisecond, None),
464                    true,
465                ),
466                Field::new("decimal_field", DataType::Decimal128(10, 2), true),
467                Field::new(
468                    "list_field",
469                    DataType::List(Field::new("item", DataType::Int64, true).into()),
470                    true,
471                ),
472                Field::new("struct_field", DataType::Struct(struct_fields), true),
473                Field::new(
474                    "named_struct_field",
475                    DataType::Struct(named_struct_fields),
476                    true,
477                ),
478                Field::new("map_field", DataType::Map(map_entries.into(), false), true),
479            ])
480            .into();
481
482            let batch = encode_and_decode(events, schema).expect("Failed to encode");
483
484            assert_eq!(batch.num_rows(), 1);
485            assert_eq!(batch.num_columns(), 18);
486
487            // Verify all primitive types
488            assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
489            assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
490            assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
491            assert_eq!(
492                batch.column(3).as_primitive::<Int32Type>().value(0),
493                1000000
494            );
495            assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
496            assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
497            assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
498            assert_eq!(
499                batch.column(7).as_primitive::<UInt32Type>().value(0),
500                4000000
501            );
502            assert_eq!(
503                batch.column(8).as_primitive::<UInt64Type>().value(0),
504                9000000000
505            );
506            assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
507            assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
508            assert!(batch.column(11).as_boolean().value(0));
509            assert_eq!(
510                batch
511                    .column(12)
512                    .as_primitive::<TimestampMillisecondType>()
513                    .value(0),
514                now.timestamp_millis()
515            );
516            assert_eq!(
517                batch.column(13).as_primitive::<Decimal128Type>().value(0),
518                9999
519            );
520
521            let list_array = batch.column(14).as_list::<i32>();
522            assert!(!list_array.is_null(0));
523            let list_values = list_array.value(0);
524            assert_eq!(list_values.len(), 3);
525            let int_array = list_values.as_primitive::<Int64Type>();
526            assert_eq!(int_array.value(0), 1);
527            assert_eq!(int_array.value(1), 2);
528            assert_eq!(int_array.value(2), 3);
529
530            // Verify struct field (unnamed)
531            let struct_array = batch.column(15).as_struct();
532            assert!(!struct_array.is_null(0));
533            assert_eq!(
534                struct_array.column(0).as_string::<i32>().value(0),
535                "nested_str"
536            );
537            assert_eq!(
538                struct_array.column(1).as_primitive::<Int64Type>().value(0),
539                999
540            );
541
542            // Verify named struct field (named tuple)
543            let named_struct_array = batch.column(16).as_struct();
544            assert!(!named_struct_array.is_null(0));
545            assert_eq!(
546                named_struct_array.column(0).as_string::<i32>().value(0),
547                "test_category"
548            );
549            assert_eq!(
550                named_struct_array.column(1).as_string::<i32>().value(0),
551                "test_tag"
552            );
553
554            // Verify map field
555            let map_array = batch.column(17).as_map();
556            assert!(!map_array.is_null(0));
557            let map_value = map_array.value(0);
558            assert_eq!(map_value.len(), 2);
559        }
560    }
561
562    mod error_handling {
563        use super::*;
564
565        #[test]
566        fn test_encode_empty_events() {
567            let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
568            let events: Vec<Event> = vec![];
569            let result = encode_events_to_arrow_ipc_stream(&events, schema);
570            assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
571        }
572
573        #[test]
574        fn test_missing_non_nullable_field_errors() {
575            let events = vec![create_event(vec![("other_field", "value")])];
576
577            let schema = Schema::new(vec![Field::new(
578                "required_field",
579                DataType::Utf8,
580                false, // non-nullable
581            )])
582            .into();
583
584            let result = encode_events_to_arrow_ipc_stream(&events, schema);
585            assert!(result.is_err());
586        }
587    }
588
589    mod temporal_types {
590        use super::*;
591        use arrow::datatypes::{
592            TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
593            TimestampSecondType,
594        };
595
596        #[test]
597        fn test_encode_timestamp_precisions() {
598            let now = Utc::now();
599            let mut log = LogEvent::default();
600            log.insert("ts_second", now);
601            log.insert("ts_milli", now);
602            log.insert("ts_micro", now);
603            log.insert("ts_nano", now);
604
605            let events = vec![Event::Log(log)];
606
607            let schema = Schema::new(vec![
608                Field::new(
609                    "ts_second",
610                    DataType::Timestamp(TimeUnit::Second, None),
611                    true,
612                ),
613                Field::new(
614                    "ts_milli",
615                    DataType::Timestamp(TimeUnit::Millisecond, None),
616                    true,
617                ),
618                Field::new(
619                    "ts_micro",
620                    DataType::Timestamp(TimeUnit::Microsecond, None),
621                    true,
622                ),
623                Field::new(
624                    "ts_nano",
625                    DataType::Timestamp(TimeUnit::Nanosecond, None),
626                    true,
627                ),
628            ])
629            .into();
630
631            let batch = encode_and_decode(events, schema).unwrap();
632
633            assert_eq!(batch.num_rows(), 1);
634            assert_eq!(batch.num_columns(), 4);
635
636            let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
637            assert!(!ts_second.is_null(0));
638            assert_eq!(ts_second.value(0), now.timestamp());
639
640            let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
641            assert!(!ts_milli.is_null(0));
642            assert_eq!(ts_milli.value(0), now.timestamp_millis());
643
644            let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
645            assert!(!ts_micro.is_null(0));
646            assert_eq!(ts_micro.value(0), now.timestamp_micros());
647
648            let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
649            assert!(!ts_nano.is_null(0));
650            assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
651        }
652
653        #[test]
654        fn test_encode_mixed_timestamp_string_native_and_integer() {
655            let now = Utc::now();
656
657            let mut log1 = LogEvent::default();
658            log1.insert("ts", "2025-10-22T10:18:44.256Z"); // RFC3339 String
659
660            let mut log2 = LogEvent::default();
661            log2.insert("ts", now); // Native Timestamp
662
663            let mut log3 = LogEvent::default();
664            log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds)
665
666            let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
667
668            let schema = Schema::new(vec![Field::new(
669                "ts",
670                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
671                true,
672            )])
673            .into();
674
675            let batch = encode_and_decode(events, schema).unwrap();
676
677            assert_eq!(batch.num_rows(), 3);
678
679            let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
680
681            // All three should be non-null
682            assert!(!ts_array.is_null(0));
683            assert!(!ts_array.is_null(1));
684            assert!(!ts_array.is_null(2));
685
686            // First one should match the parsed RFC3339 string
687            let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
688                .unwrap()
689                .timestamp_nanos_opt()
690                .unwrap();
691            assert_eq!(ts_array.value(0), expected);
692
693            // Second one should match the native timestamp
694            assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
695
696            // Third one should match the integer
697            assert_eq!(ts_array.value(2), 1729594724256000000_i64);
698        }
699    }
700
701    mod config_tests {
702        use super::*;
703        use tokio_util::codec::Encoder;
704
705        #[test]
706        fn test_config_allow_nullable_fields_overrides_schema() {
707            let mut log1 = LogEvent::default();
708            log1.insert("strict_field", 42);
709            let log2 = LogEvent::default();
710            let events = vec![Event::Log(log1), Event::Log(log2)];
711
712            let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
713
714            let mut config = ArrowStreamSerializerConfig::new(schema);
715            config.allow_nullable_fields = true;
716
717            let mut serializer =
718                ArrowStreamSerializer::new(config).expect("Failed to create serializer");
719
720            let mut buffer = BytesMut::new();
721            serializer
722                .encode(events, &mut buffer)
723                .expect("Encoding should succeed when allow_nullable_fields is true");
724
725            let cursor = Cursor::new(buffer);
726            let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
727            let batch = reader.next().unwrap().expect("Failed to read batch");
728
729            assert_eq!(batch.num_rows(), 2);
730
731            let binding = batch.schema();
732            let output_field = binding.field(0);
733            assert!(
734                output_field.is_nullable(),
735                "The output schema field should have been transformed to nullable=true"
736            );
737
738            let array = batch
739                .column(0)
740                .as_primitive::<arrow::datatypes::Int64Type>();
741
742            assert_eq!(array.value(0), 42);
743            assert!(!array.is_null(0));
744            assert!(
745                array.is_null(1),
746                "The missing value should be encoded as null"
747            );
748        }
749
750        #[test]
751        fn test_make_field_nullable_with_nested_types() {
752            let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
753            let inner_struct =
754                DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
755            let list_field = Field::new("item", inner_struct, false);
756            let list_type = DataType::List(list_field.into());
757            let outer_field = Field::new("inner_list", list_type, false);
758            let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
759
760            let original_field = Field::new("root", outer_struct, false);
761            let nullable_field = make_field_nullable(&original_field).unwrap();
762
763            assert!(
764                nullable_field.is_nullable(),
765                "Root field should be nullable"
766            );
767
768            if let DataType::Struct(root_fields) = nullable_field.data_type() {
769                let inner_list_field = &root_fields[0];
770                assert!(inner_list_field.is_nullable());
771
772                if let DataType::List(list_item_field) = inner_list_field.data_type() {
773                    assert!(list_item_field.is_nullable());
774
775                    if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
776                        let nested_field = &inner_struct_fields[0];
777                        assert!(nested_field.is_nullable());
778                    } else {
779                        panic!("Expected Struct type for list items");
780                    }
781                } else {
782                    panic!("Expected List type for inner_list");
783                }
784            } else {
785                panic!("Expected Struct type for root field");
786            }
787        }
788
789        #[test]
790        fn test_make_field_nullable_with_map_type() {
791            let key_field = Field::new("key", DataType::Utf8, false);
792            let value_field = Field::new("value", DataType::Int64, false);
793            let entries_struct =
794                DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
795            let entries_field = Field::new("entries", entries_struct, false);
796            let map_type = DataType::Map(entries_field.into(), false);
797
798            let original_field = Field::new("my_map", map_type, false);
799            let nullable_field = make_field_nullable(&original_field).unwrap();
800
801            assert!(
802                nullable_field.is_nullable(),
803                "Root map field should be nullable"
804            );
805
806            if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
807                assert!(
808                    !entries_field.is_nullable(),
809                    "Map entries field should be non-nullable"
810                );
811
812                if let DataType::Struct(struct_fields) = entries_field.data_type() {
813                    let key_field = &struct_fields[0];
814                    let value_field = &struct_fields[1];
815                    assert!(
816                        !key_field.is_nullable(),
817                        "Map key field should be non-nullable"
818                    );
819                    assert!(
820                        value_field.is_nullable(),
821                        "Map value field should be nullable"
822                    );
823                } else {
824                    panic!("Expected Struct type for map entries");
825                }
826            } else {
827                panic!("Expected Map type for my_map field");
828            }
829        }
830    }
831
832    mod null_non_nullable {
833        use super::*;
834
835        #[test]
836        fn test_missing_non_nullable_field_error_names_fields() {
837            let schema: SchemaRef = Schema::new(vec![
838                Field::new("required_field", DataType::Utf8, false),
839                Field::new("optional_field", DataType::Utf8, true),
840            ])
841            .into();
842
843            // Event is missing "required_field" entirely
844            let event = create_event(vec![("optional_field", "hello")]);
845
846            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
847            let err = result.unwrap_err().to_string();
848            assert!(
849                err.contains("required_field"),
850                "Error should name the missing field, got: {err}"
851            );
852            assert!(
853                !err.contains("optional_field"),
854                "Error should not name nullable fields, got: {err}"
855            );
856        }
857
858        #[test]
859        fn test_null_value_in_non_nullable_field_error_names_fields() {
860            let schema: SchemaRef = Schema::new(vec![
861                Field::new("id", DataType::Int64, false),
862                Field::new("name", DataType::Utf8, false),
863            ])
864            .into();
865
866            // Event has "id" but "name" is null
867            let event = create_event(vec![("id", Value::Integer(1))]);
868
869            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
870            let err = result.unwrap_err().to_string();
871            assert!(
872                err.contains("name"),
873                "Error should name the null field, got: {err}"
874            );
875        }
876
877        #[test]
878        fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
879            let schema = Schema::new(vec![
880                Field::new("a", DataType::Utf8, false),
881                Field::new("b", DataType::Int64, false),
882            ]);
883
884            let event = create_event(vec![
885                ("a", Value::Bytes("val".into())),
886                ("b", Value::Integer(42)),
887            ]);
888            let value = event.as_log().value();
889            let missing = find_null_non_nullable_fields(&schema, &[value]);
890            assert!(
891                missing.is_empty(),
892                "Expected no missing fields, got: {missing:?}"
893            );
894        }
895
896        #[test]
897        fn test_find_null_non_nullable_fields_detects_explicit_null() {
898            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
899
900            let event = create_event(vec![("a", Value::Null)]);
901            let value = event.as_log().value();
902            let missing = find_null_non_nullable_fields(&schema, &[value]);
903            assert_eq!(missing, vec!["a"]);
904        }
905    }
906}