codecs/encoding/format/
arrow.rs

1//! Arrow IPC streaming format codec for batched event encoding
2//!
3//! Provides Apache Arrow IPC stream format encoding with static schema support.
4//! This implements the streaming variant of the Arrow IPC protocol, which writes
5//! a continuous stream of record batches without a file footer.
6
7use arrow::{
8    datatypes::{DataType, Field, Fields, Schema, SchemaRef},
9    error::ArrowError,
10    ipc::writer::StreamWriter,
11    json::reader::ReaderBuilder,
12    record_batch::RecordBatch,
13};
14use async_trait::async_trait;
15use bytes::{BufMut, Bytes, BytesMut};
16use snafu::{ResultExt, Snafu, ensure};
17use vector_config::configurable_component;
18use vector_core::event::Event;
19
20/// Provides Arrow schema for encoding.
21///
22/// Sinks can implement this trait to provide custom schema fetching logic.
23#[async_trait]
24pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
25    /// Fetch the Arrow schema from the data store.
26    ///
27    /// This is called during sink configuration build phase to fetch
28    /// the schema once at startup, rather than at runtime.
29    async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
30}
31
32/// Configuration for Arrow IPC stream serialization
33#[configurable_component]
34#[derive(Clone, Default)]
35pub struct ArrowStreamSerializerConfig {
36    /// The Arrow schema to use for encoding
37    #[serde(skip)]
38    #[configurable(derived)]
39    pub schema: Option<arrow::datatypes::Schema>,
40
41    /// Allow null values for non-nullable fields in the schema.
42    ///
43    /// When enabled, missing or incompatible values will be encoded as null even for fields
44    /// marked as non-nullable in the Arrow schema. This is useful when working with downstream
45    /// systems that can handle null values through defaults, computed columns, or other mechanisms.
46    ///
47    /// When disabled (default), missing values for non-nullable fields will cause encoding errors,
48    /// ensuring all required data is present before sending to the sink.
49    #[serde(default)]
50    #[configurable(derived)]
51    pub allow_nullable_fields: bool,
52}
53
54impl std::fmt::Debug for ArrowStreamSerializerConfig {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("ArrowStreamSerializerConfig")
57            .field(
58                "schema",
59                &self
60                    .schema
61                    .as_ref()
62                    .map(|s| format!("{} fields", s.fields().len())),
63            )
64            .field("allow_nullable_fields", &self.allow_nullable_fields)
65            .finish()
66    }
67}
68
69impl ArrowStreamSerializerConfig {
70    /// Create a new ArrowStreamSerializerConfig with a schema
71    pub fn new(schema: arrow::datatypes::Schema) -> Self {
72        Self {
73            schema: Some(schema),
74            allow_nullable_fields: false,
75        }
76    }
77
78    /// The data type of events that are accepted by `ArrowStreamEncoder`.
79    pub fn input_type(&self) -> vector_core::config::DataType {
80        vector_core::config::DataType::Log
81    }
82
83    /// The schema required by the serializer.
84    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
85        vector_core::schema::Requirement::empty()
86    }
87}
88
89/// Arrow IPC stream batch serializer that holds the schema
90#[derive(Clone, Debug)]
91pub struct ArrowStreamSerializer {
92    schema: SchemaRef,
93}
94
95impl ArrowStreamSerializer {
96    /// Create a new ArrowStreamSerializer with the given configuration
97    pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, ArrowEncodingError> {
98        let schema = config.schema.ok_or(ArrowEncodingError::MissingSchema)?;
99
100        // If allow_nullable_fields is enabled, transform the schema once here
101        // instead of on every batch encoding
102        let schema = if config.allow_nullable_fields {
103            let nullable_fields: Fields = schema
104                .fields()
105                .iter()
106                .map(|f| make_field_nullable(f))
107                .collect::<Result<Vec<_>, _>>()?
108                .into();
109            Schema::new_with_metadata(nullable_fields, schema.metadata().clone())
110        } else {
111            schema
112        };
113
114        Ok(Self {
115            schema: SchemaRef::new(schema),
116        })
117    }
118}
119
120impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
121    type Error = ArrowEncodingError;
122
123    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
124        if events.is_empty() {
125            return Err(ArrowEncodingError::NoEvents);
126        }
127
128        let bytes = encode_events_to_arrow_ipc_stream(&events, self.schema.clone())?;
129
130        buffer.extend_from_slice(&bytes);
131        Ok(())
132    }
133}
134
135/// Errors that can occur during Arrow encoding
136#[derive(Debug, Snafu)]
137pub enum ArrowEncodingError {
138    /// Failed to create Arrow record batch
139    #[snafu(display("Failed to create Arrow record batch: {source}"))]
140    RecordBatchCreation {
141        /// The underlying Arrow error
142        source: arrow::error::ArrowError,
143    },
144
145    /// Failed to write Arrow IPC data
146    #[snafu(display("Failed to write Arrow IPC data: {source}"))]
147    IpcWrite {
148        /// The underlying Arrow error
149        source: arrow::error::ArrowError,
150    },
151
152    /// No events provided for encoding
153    #[snafu(display("No events provided for encoding"))]
154    NoEvents,
155
156    /// Failed to fetch schema from provider
157    #[snafu(display("Failed to fetch schema from provider: {message}"))]
158    SchemaFetchError {
159        /// Error message from the provider
160        message: String,
161    },
162
163    /// Null value encountered for non-nullable field
164    #[snafu(display("Null value for non-nullable field '{field_name}'"))]
165    NullConstraint {
166        /// The field name
167        field_name: String,
168    },
169
170    /// Arrow serializer requires a schema
171    #[snafu(display("Arrow serializer requires a schema"))]
172    MissingSchema,
173
174    /// IO error during encoding
175    #[snafu(display("IO error: {source}"), context(false))]
176    Io {
177        /// The underlying IO error
178        source: std::io::Error,
179    },
180
181    /// Arrow JSON decoding error
182    #[snafu(display("Arrow JSON decoding error: {source}"))]
183    ArrowJsonDecode {
184        /// The underlying Arrow error
185        source: arrow::error::ArrowError,
186    },
187
188    /// Invalid Map schema structure
189    #[snafu(display("Invalid Map schema for field '{field_name}': {reason}"))]
190    InvalidMapSchema {
191        /// The field name
192        field_name: String,
193        /// Description of the schema violation
194        reason: String,
195    },
196}
197
198/// Encodes a batch of events into Arrow IPC streaming format
199pub fn encode_events_to_arrow_ipc_stream(
200    events: &[Event],
201    schema: SchemaRef,
202) -> Result<Bytes, ArrowEncodingError> {
203    if events.is_empty() {
204        return Err(ArrowEncodingError::NoEvents);
205    }
206
207    let json_values = vector_log_events_to_json_values(events).map_err(|e| {
208        ArrowEncodingError::RecordBatchCreation {
209            source: ArrowError::JsonError(e.to_string()),
210        }
211    })?;
212
213    let record_batch = build_record_batch(schema, &json_values)?;
214
215    let mut buffer = BytesMut::new().writer();
216    let mut writer =
217        StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).context(IpcWriteSnafu)?;
218    writer.write(&record_batch).context(IpcWriteSnafu)?;
219    writer.finish().context(IpcWriteSnafu)?;
220
221    Ok(buffer.into_inner().freeze())
222}
223
224/// Recursively makes a Field and all its nested fields nullable
225fn make_field_nullable(field: &Field) -> Result<Field, ArrowEncodingError> {
226    let new_data_type = match field.data_type() {
227        DataType::List(inner_field) => DataType::List(make_field_nullable(inner_field)?.into()),
228        DataType::Struct(fields) => DataType::Struct(
229            fields
230                .iter()
231                .map(|f| make_field_nullable(f))
232                .collect::<Result<Vec<_>, _>>()?
233                .into(),
234        ),
235        DataType::Map(inner, sorted) => {
236            // A Map's inner field is a "entries" Struct<Key, Value>
237            let DataType::Struct(fields) = inner.data_type() else {
238                return InvalidMapSchemaSnafu {
239                    field_name: field.name(),
240                    reason: format!("inner type must be Struct, found {:?}", inner.data_type()),
241                }
242                .fail();
243            };
244
245            ensure!(
246                fields.len() == 2,
247                InvalidMapSchemaSnafu {
248                    field_name: field.name(),
249                    reason: format!("expected 2 fields (key, value), found {}", fields.len()),
250                },
251            );
252            let key_field = &fields[0];
253            let value_field = &fields[1];
254
255            let new_struct_fields: Fields =
256                [key_field.clone(), make_field_nullable(value_field)?.into()].into();
257
258            // Reconstruct the inner "entries" field
259            // The inner field itself must be non-nullable (only the Map wrapper is nullable)
260            let new_inner_field = inner
261                .as_ref()
262                .clone()
263                .with_data_type(DataType::Struct(new_struct_fields))
264                .with_nullable(false);
265
266            DataType::Map(new_inner_field.into(), *sorted)
267        }
268        other => other.clone(),
269    };
270
271    Ok(field
272        .clone()
273        .with_data_type(new_data_type)
274        .with_nullable(true))
275}
276
277/// Returns true if the field is absent from the value's object map, or explicitly null.
278/// Find non-nullable schema fields that are missing or null in any of the given events.
279pub fn find_null_non_nullable_fields<'a>(
280    schema: &'a Schema,
281    values: &[serde_json::Value],
282) -> Vec<&'a str> {
283    schema
284        .fields()
285        .iter()
286        .filter(|field| {
287            !field.is_nullable()
288                && values.iter().any(|value| {
289                    value
290                        .as_object()
291                        .and_then(|map| map.get(field.name().as_str()))
292                        .is_none_or(serde_json::Value::is_null)
293                })
294        })
295        .map(|field| field.name().as_str())
296        .collect()
297}
298
299pub(crate) fn vector_log_events_to_json_values(
300    events: &[Event],
301) -> Result<Vec<serde_json::Value>, serde_json::Error> {
302    events
303        .iter()
304        .filter_map(Event::maybe_as_log)
305        .map(serde_json::to_value)
306        .collect()
307}
308
309/// Build an Arrow RecordBatch from a slice of events using the provided schema.
310pub(crate) fn build_record_batch(
311    schema: SchemaRef,
312    values: &[serde_json::Value],
313) -> Result<RecordBatch, ArrowEncodingError> {
314    if values.is_empty() {
315        return Err(ArrowEncodingError::NoEvents);
316    }
317
318    let missing = find_null_non_nullable_fields(&schema, values);
319    if !missing.is_empty() {
320        let error: vector_common::Error = Box::new(ArrowEncodingError::NullConstraint {
321            field_name: missing.join(", "),
322        });
323        vector_common::internal_event::emit(crate::internal_events::EncoderNullConstraintError {
324            error: &error,
325            count: values.len(),
326        });
327        return Err(ArrowEncodingError::NullConstraint {
328            field_name: missing.join(", "),
329        });
330    }
331
332    let mut decoder = ReaderBuilder::new(schema)
333        .build_decoder()
334        .context(RecordBatchCreationSnafu)?;
335
336    decoder.serialize(values).context(ArrowJsonDecodeSnafu)?;
337
338    decoder
339        .flush()
340        .context(ArrowJsonDecodeSnafu)?
341        .ok_or(ArrowEncodingError::NoEvents)
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use arrow::{
348        array::{Array, AsArray},
349        datatypes::TimeUnit,
350        ipc::reader::StreamReader,
351    };
352    use chrono::Utc;
353    use std::io::Cursor;
354    use vector_core::event::{LogEvent, Value};
355
356    /// Helper to encode events and return the decoded RecordBatch
357    fn encode_and_decode(
358        events: Vec<Event>,
359        schema: SchemaRef,
360    ) -> Result<RecordBatch, Box<dyn std::error::Error>> {
361        let bytes = encode_events_to_arrow_ipc_stream(&events, schema.clone())?;
362        let cursor = Cursor::new(bytes);
363        let mut reader = StreamReader::try_new(cursor, None)?;
364        Ok(reader.next().unwrap()?)
365    }
366
367    /// Create a simple event from key-value pairs
368    fn create_event<V>(fields: Vec<(&str, V)>) -> Event
369    where
370        V: Into<Value>,
371    {
372        let mut log = LogEvent::default();
373        for (key, value) in fields {
374            log.insert(key, value.into());
375        }
376        Event::Log(log)
377    }
378
379    mod comprehensive {
380        use super::*;
381
382        #[test]
383        fn test_encode_all_types() {
384            use arrow::datatypes::{
385                Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
386                Int64Type, TimestampMillisecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
387            };
388            use vrl::value::ObjectMap;
389
390            let now = Utc::now();
391
392            // Create a struct (tuple) value with unnamed fields
393            let mut tuple_value = ObjectMap::new();
394            tuple_value.insert("f0".into(), Value::Bytes("nested_str".into()));
395            tuple_value.insert("f1".into(), Value::Integer(999));
396
397            // Create a named struct (named tuple) value
398            let mut named_tuple_value = ObjectMap::new();
399            named_tuple_value.insert("category".into(), Value::Bytes("test_category".into()));
400            named_tuple_value.insert("tag".into(), Value::Bytes("test_tag".into()));
401
402            // Create a list value
403            let list_value = Value::Array(vec![
404                Value::Integer(1),
405                Value::Integer(2),
406                Value::Integer(3),
407            ]);
408
409            // Create a map value
410            let mut map_value = ObjectMap::new();
411            map_value.insert("key1".into(), Value::Integer(100));
412            map_value.insert("key2".into(), Value::Integer(200));
413
414            let mut log = LogEvent::default();
415            // Primitive types
416            log.insert("string_field", "test");
417            log.insert("int8_field", 127);
418            log.insert("int16_field", 32000);
419            log.insert("int32_field", 1000000);
420            log.insert("int64_field", 42);
421            log.insert("uint8_field", 255);
422            log.insert("uint16_field", 65535);
423            log.insert("uint32_field", 4000000);
424            log.insert("uint64_field", 9000000000_i64);
425            log.insert("float32_field", 3.15);
426            log.insert("float64_field", 3.15);
427            log.insert("bool_field", true);
428            log.insert("timestamp_field", now);
429            log.insert("decimal_field", 99.99);
430            // Complex types
431            log.insert("list_field", list_value);
432            log.insert("struct_field", Value::Object(tuple_value));
433            log.insert("named_struct_field", Value::Object(named_tuple_value));
434            log.insert("map_field", Value::Object(map_value));
435
436            let events = vec![Event::Log(log)];
437
438            // Build schema with all supported types
439            let struct_fields = arrow::datatypes::Fields::from(vec![
440                Field::new("f0", DataType::Utf8, true),
441                Field::new("f1", DataType::Int64, true),
442            ]);
443
444            let named_struct_fields = arrow::datatypes::Fields::from(vec![
445                Field::new("category", DataType::Utf8, true),
446                Field::new("tag", DataType::Utf8, true),
447            ]);
448
449            let map_entries = Field::new(
450                "entries",
451                DataType::Struct(arrow::datatypes::Fields::from(vec![
452                    Field::new("keys", DataType::Utf8, false),
453                    Field::new("values", DataType::Int64, true),
454                ])),
455                false,
456            );
457
458            let schema = Schema::new(vec![
459                Field::new("string_field", DataType::Utf8, true),
460                Field::new("int8_field", DataType::Int8, true),
461                Field::new("int16_field", DataType::Int16, true),
462                Field::new("int32_field", DataType::Int32, true),
463                Field::new("int64_field", DataType::Int64, true),
464                Field::new("uint8_field", DataType::UInt8, true),
465                Field::new("uint16_field", DataType::UInt16, true),
466                Field::new("uint32_field", DataType::UInt32, true),
467                Field::new("uint64_field", DataType::UInt64, true),
468                Field::new("float32_field", DataType::Float32, true),
469                Field::new("float64_field", DataType::Float64, true),
470                Field::new("bool_field", DataType::Boolean, true),
471                Field::new(
472                    "timestamp_field",
473                    DataType::Timestamp(TimeUnit::Millisecond, None),
474                    true,
475                ),
476                Field::new("decimal_field", DataType::Decimal128(10, 2), true),
477                Field::new(
478                    "list_field",
479                    DataType::List(Field::new("item", DataType::Int64, true).into()),
480                    true,
481                ),
482                Field::new("struct_field", DataType::Struct(struct_fields), true),
483                Field::new(
484                    "named_struct_field",
485                    DataType::Struct(named_struct_fields),
486                    true,
487                ),
488                Field::new("map_field", DataType::Map(map_entries.into(), false), true),
489            ])
490            .into();
491
492            let batch = encode_and_decode(events, schema).expect("Failed to encode");
493
494            assert_eq!(batch.num_rows(), 1);
495            assert_eq!(batch.num_columns(), 18);
496
497            // Verify all primitive types
498            assert_eq!(batch.column(0).as_string::<i32>().value(0), "test");
499            assert_eq!(batch.column(1).as_primitive::<Int8Type>().value(0), 127);
500            assert_eq!(batch.column(2).as_primitive::<Int16Type>().value(0), 32000);
501            assert_eq!(
502                batch.column(3).as_primitive::<Int32Type>().value(0),
503                1000000
504            );
505            assert_eq!(batch.column(4).as_primitive::<Int64Type>().value(0), 42);
506            assert_eq!(batch.column(5).as_primitive::<UInt8Type>().value(0), 255);
507            assert_eq!(batch.column(6).as_primitive::<UInt16Type>().value(0), 65535);
508            assert_eq!(
509                batch.column(7).as_primitive::<UInt32Type>().value(0),
510                4000000
511            );
512            assert_eq!(
513                batch.column(8).as_primitive::<UInt64Type>().value(0),
514                9000000000
515            );
516            assert!((batch.column(9).as_primitive::<Float32Type>().value(0) - 3.15).abs() < 0.001);
517            assert!((batch.column(10).as_primitive::<Float64Type>().value(0) - 3.15).abs() < 0.001);
518            assert!(batch.column(11).as_boolean().value(0));
519            assert_eq!(
520                batch
521                    .column(12)
522                    .as_primitive::<TimestampMillisecondType>()
523                    .value(0),
524                now.timestamp_millis()
525            );
526            assert_eq!(
527                batch.column(13).as_primitive::<Decimal128Type>().value(0),
528                9999
529            );
530
531            let list_array = batch.column(14).as_list::<i32>();
532            assert!(!list_array.is_null(0));
533            let list_values = list_array.value(0);
534            assert_eq!(list_values.len(), 3);
535            let int_array = list_values.as_primitive::<Int64Type>();
536            assert_eq!(int_array.value(0), 1);
537            assert_eq!(int_array.value(1), 2);
538            assert_eq!(int_array.value(2), 3);
539
540            // Verify struct field (unnamed)
541            let struct_array = batch.column(15).as_struct();
542            assert!(!struct_array.is_null(0));
543            assert_eq!(
544                struct_array.column(0).as_string::<i32>().value(0),
545                "nested_str"
546            );
547            assert_eq!(
548                struct_array.column(1).as_primitive::<Int64Type>().value(0),
549                999
550            );
551
552            // Verify named struct field (named tuple)
553            let named_struct_array = batch.column(16).as_struct();
554            assert!(!named_struct_array.is_null(0));
555            assert_eq!(
556                named_struct_array.column(0).as_string::<i32>().value(0),
557                "test_category"
558            );
559            assert_eq!(
560                named_struct_array.column(1).as_string::<i32>().value(0),
561                "test_tag"
562            );
563
564            // Verify map field
565            let map_array = batch.column(17).as_map();
566            assert!(!map_array.is_null(0));
567            let map_value = map_array.value(0);
568            assert_eq!(map_value.len(), 2);
569        }
570    }
571
572    mod error_handling {
573        use super::*;
574
575        #[test]
576        fn test_encode_empty_events() {
577            let schema = Schema::new(vec![Field::new("message", DataType::Utf8, true)]).into();
578            let events: Vec<Event> = vec![];
579            let result = encode_events_to_arrow_ipc_stream(&events, schema);
580            assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
581        }
582
583        #[test]
584        fn test_missing_non_nullable_field_errors() {
585            let events = vec![create_event(vec![("other_field", "value")])];
586
587            let schema = Schema::new(vec![Field::new(
588                "required_field",
589                DataType::Utf8,
590                false, // non-nullable
591            )])
592            .into();
593
594            let result = encode_events_to_arrow_ipc_stream(&events, schema);
595            assert!(result.is_err());
596        }
597    }
598
599    mod temporal_types {
600        use super::*;
601        use arrow::datatypes::{
602            TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
603            TimestampSecondType,
604        };
605
606        #[test]
607        fn test_encode_timestamp_precisions() {
608            let now = Utc::now();
609            let mut log = LogEvent::default();
610            log.insert("ts_second", now);
611            log.insert("ts_milli", now);
612            log.insert("ts_micro", now);
613            log.insert("ts_nano", now);
614
615            let events = vec![Event::Log(log)];
616
617            let schema = Schema::new(vec![
618                Field::new(
619                    "ts_second",
620                    DataType::Timestamp(TimeUnit::Second, None),
621                    true,
622                ),
623                Field::new(
624                    "ts_milli",
625                    DataType::Timestamp(TimeUnit::Millisecond, None),
626                    true,
627                ),
628                Field::new(
629                    "ts_micro",
630                    DataType::Timestamp(TimeUnit::Microsecond, None),
631                    true,
632                ),
633                Field::new(
634                    "ts_nano",
635                    DataType::Timestamp(TimeUnit::Nanosecond, None),
636                    true,
637                ),
638            ])
639            .into();
640
641            let batch = encode_and_decode(events, schema).unwrap();
642
643            assert_eq!(batch.num_rows(), 1);
644            assert_eq!(batch.num_columns(), 4);
645
646            let ts_second = batch.column(0).as_primitive::<TimestampSecondType>();
647            assert!(!ts_second.is_null(0));
648            assert_eq!(ts_second.value(0), now.timestamp());
649
650            let ts_milli = batch.column(1).as_primitive::<TimestampMillisecondType>();
651            assert!(!ts_milli.is_null(0));
652            assert_eq!(ts_milli.value(0), now.timestamp_millis());
653
654            let ts_micro = batch.column(2).as_primitive::<TimestampMicrosecondType>();
655            assert!(!ts_micro.is_null(0));
656            assert_eq!(ts_micro.value(0), now.timestamp_micros());
657
658            let ts_nano = batch.column(3).as_primitive::<TimestampNanosecondType>();
659            assert!(!ts_nano.is_null(0));
660            assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
661        }
662
663        #[test]
664        fn test_encode_mixed_timestamp_string_native_and_integer() {
665            let now = Utc::now();
666
667            let mut log1 = LogEvent::default();
668            log1.insert("ts", "2025-10-22T10:18:44.256Z"); // RFC3339 String
669
670            let mut log2 = LogEvent::default();
671            log2.insert("ts", now); // Native Timestamp
672
673            let mut log3 = LogEvent::default();
674            log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds)
675
676            let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
677
678            let schema = Schema::new(vec![Field::new(
679                "ts",
680                DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
681                true,
682            )])
683            .into();
684
685            let batch = encode_and_decode(events, schema).unwrap();
686
687            assert_eq!(batch.num_rows(), 3);
688
689            let ts_array = batch.column(0).as_primitive::<TimestampNanosecondType>();
690
691            // All three should be non-null
692            assert!(!ts_array.is_null(0));
693            assert!(!ts_array.is_null(1));
694            assert!(!ts_array.is_null(2));
695
696            // First one should match the parsed RFC3339 string
697            let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
698                .unwrap()
699                .timestamp_nanos_opt()
700                .unwrap();
701            assert_eq!(ts_array.value(0), expected);
702
703            // Second one should match the native timestamp
704            assert_eq!(ts_array.value(1), now.timestamp_nanos_opt().unwrap());
705
706            // Third one should match the integer
707            assert_eq!(ts_array.value(2), 1729594724256000000_i64);
708        }
709    }
710
711    mod config_tests {
712        use super::*;
713        use tokio_util::codec::Encoder;
714
715        #[test]
716        fn test_config_allow_nullable_fields_overrides_schema() {
717            let mut log1 = LogEvent::default();
718            log1.insert("strict_field", 42);
719            let log2 = LogEvent::default();
720            let events = vec![Event::Log(log1), Event::Log(log2)];
721
722            let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
723
724            let mut config = ArrowStreamSerializerConfig::new(schema);
725            config.allow_nullable_fields = true;
726
727            let mut serializer =
728                ArrowStreamSerializer::new(config).expect("Failed to create serializer");
729
730            let mut buffer = BytesMut::new();
731            serializer
732                .encode(events, &mut buffer)
733                .expect("Encoding should succeed when allow_nullable_fields is true");
734
735            let cursor = Cursor::new(buffer);
736            let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
737            let batch = reader.next().unwrap().expect("Failed to read batch");
738
739            assert_eq!(batch.num_rows(), 2);
740
741            let binding = batch.schema();
742            let output_field = binding.field(0);
743            assert!(
744                output_field.is_nullable(),
745                "The output schema field should have been transformed to nullable=true"
746            );
747
748            let array = batch
749                .column(0)
750                .as_primitive::<arrow::datatypes::Int64Type>();
751
752            assert_eq!(array.value(0), 42);
753            assert!(!array.is_null(0));
754            assert!(
755                array.is_null(1),
756                "The missing value should be encoded as null"
757            );
758        }
759
760        #[test]
761        fn test_make_field_nullable_with_nested_types() {
762            let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
763            let inner_struct =
764                DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
765            let list_field = Field::new("item", inner_struct, false);
766            let list_type = DataType::List(list_field.into());
767            let outer_field = Field::new("inner_list", list_type, false);
768            let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
769
770            let original_field = Field::new("root", outer_struct, false);
771            let nullable_field = make_field_nullable(&original_field).unwrap();
772
773            assert!(
774                nullable_field.is_nullable(),
775                "Root field should be nullable"
776            );
777
778            if let DataType::Struct(root_fields) = nullable_field.data_type() {
779                let inner_list_field = &root_fields[0];
780                assert!(inner_list_field.is_nullable());
781
782                if let DataType::List(list_item_field) = inner_list_field.data_type() {
783                    assert!(list_item_field.is_nullable());
784
785                    if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
786                        let nested_field = &inner_struct_fields[0];
787                        assert!(nested_field.is_nullable());
788                    } else {
789                        panic!("Expected Struct type for list items");
790                    }
791                } else {
792                    panic!("Expected List type for inner_list");
793                }
794            } else {
795                panic!("Expected Struct type for root field");
796            }
797        }
798
799        #[test]
800        fn test_make_field_nullable_with_map_type() {
801            let key_field = Field::new("key", DataType::Utf8, false);
802            let value_field = Field::new("value", DataType::Int64, false);
803            let entries_struct =
804                DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
805            let entries_field = Field::new("entries", entries_struct, false);
806            let map_type = DataType::Map(entries_field.into(), false);
807
808            let original_field = Field::new("my_map", map_type, false);
809            let nullable_field = make_field_nullable(&original_field).unwrap();
810
811            assert!(
812                nullable_field.is_nullable(),
813                "Root map field should be nullable"
814            );
815
816            if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
817                assert!(
818                    !entries_field.is_nullable(),
819                    "Map entries field should be non-nullable"
820                );
821
822                if let DataType::Struct(struct_fields) = entries_field.data_type() {
823                    let key_field = &struct_fields[0];
824                    let value_field = &struct_fields[1];
825                    assert!(
826                        !key_field.is_nullable(),
827                        "Map key field should be non-nullable"
828                    );
829                    assert!(
830                        value_field.is_nullable(),
831                        "Map value field should be nullable"
832                    );
833                } else {
834                    panic!("Expected Struct type for map entries");
835                }
836            } else {
837                panic!("Expected Map type for my_map field");
838            }
839        }
840    }
841
842    mod null_non_nullable {
843        use super::*;
844
845        #[test]
846        fn test_missing_non_nullable_field_error_names_fields() {
847            let schema: SchemaRef = Schema::new(vec![
848                Field::new("required_field", DataType::Utf8, false),
849                Field::new("optional_field", DataType::Utf8, true),
850            ])
851            .into();
852
853            // Event is missing "required_field" entirely
854            let event = create_event(vec![("optional_field", "hello")]);
855
856            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
857            let err = result.unwrap_err().to_string();
858            assert!(
859                err.contains("required_field"),
860                "Error should name the missing field, got: {err}"
861            );
862            assert!(
863                !err.contains("optional_field"),
864                "Error should not name nullable fields, got: {err}"
865            );
866        }
867
868        #[test]
869        fn test_null_value_in_non_nullable_field_error_names_fields() {
870            let schema: SchemaRef = Schema::new(vec![
871                Field::new("id", DataType::Int64, false),
872                Field::new("name", DataType::Utf8, false),
873            ])
874            .into();
875
876            // Event has "id" but "name" is null
877            let event = create_event(vec![("id", Value::Integer(1))]);
878
879            let result = encode_events_to_arrow_ipc_stream(&[event], schema);
880            let err = result.unwrap_err().to_string();
881            assert!(
882                err.contains("name"),
883                "Error should name the null field, got: {err}"
884            );
885        }
886
887        #[test]
888        fn test_find_null_non_nullable_fields_returns_empty_when_all_present() {
889            let schema = Schema::new(vec![
890                Field::new("a", DataType::Utf8, false),
891                Field::new("b", DataType::Int64, false),
892            ]);
893
894            let event = create_event(vec![
895                ("a", Value::Bytes("val".into())),
896                ("b", Value::Integer(42)),
897            ]);
898            let missing = find_null_non_nullable_fields(
899                &schema,
900                &vector_log_events_to_json_values(&[event]).unwrap(),
901            );
902            assert!(
903                missing.is_empty(),
904                "Expected no missing fields, got: {missing:?}"
905            );
906        }
907
908        #[test]
909        fn test_find_null_non_nullable_fields_detects_explicit_null() {
910            let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
911
912            let event = create_event(vec![("a", Value::Null)]);
913            let missing = find_null_non_nullable_fields(
914                &schema,
915                &vector_log_events_to_json_values(&[event]).unwrap(),
916            );
917            assert_eq!(missing, vec!["a"]);
918        }
919    }
920}