codecs/encoding/format/
arrow.rs

1//! Arrow IPC streaming format codec for batched event encoding
2//!
3//! Provides Apache Arrow IPC stream format encoding with static schema support.
4//! This implements the streaming variant of the Arrow IPC protocol, which writes
5//! a continuous stream of record batches without a file footer.
6
7use arrow::{
8    array::{
9        ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, Decimal256Builder,
10        Float32Builder, Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder,
11        StringBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
12        TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder, UInt16Builder,
13        UInt32Builder, UInt64Builder,
14    },
15    datatypes::{DataType, Schema, TimeUnit, i256},
16    ipc::writer::StreamWriter,
17    record_batch::RecordBatch,
18};
19use async_trait::async_trait;
20use bytes::{BufMut, Bytes, BytesMut};
21use chrono::{DateTime, Utc};
22use rust_decimal::Decimal;
23use snafu::Snafu;
24use std::sync::Arc;
25use vector_config::configurable_component;
26
27use vector_core::event::{Event, Value};
28
29/// Provides Arrow schema for encoding.
30///
31/// Sinks can implement this trait to provide custom schema fetching logic.
32#[async_trait]
33pub trait SchemaProvider: Send + Sync + std::fmt::Debug {
34    /// Fetch the Arrow schema from the data store.
35    ///
36    /// This is called during sink configuration build phase to fetch
37    /// the schema once at startup, rather than at runtime.
38    async fn get_schema(&self) -> Result<Schema, ArrowEncodingError>;
39}
40
41/// Configuration for Arrow IPC stream serialization
42#[configurable_component]
43#[derive(Clone, Default)]
44pub struct ArrowStreamSerializerConfig {
45    /// The Arrow schema to use for encoding
46    #[serde(skip)]
47    #[configurable(derived)]
48    pub schema: Option<arrow::datatypes::Schema>,
49
50    /// Allow null values for non-nullable fields in the schema.
51    ///
52    /// When enabled, missing or incompatible values will be encoded as null even for fields
53    /// marked as non-nullable in the Arrow schema. This is useful when working with downstream
54    /// systems that can handle null values through defaults, computed columns, or other mechanisms.
55    ///
56    /// When disabled (default), missing values for non-nullable fields will cause encoding errors,
57    /// ensuring all required data is present before sending to the sink.
58    #[serde(default)]
59    #[configurable(derived)]
60    pub allow_nullable_fields: bool,
61}
62
63impl std::fmt::Debug for ArrowStreamSerializerConfig {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        f.debug_struct("ArrowStreamSerializerConfig")
66            .field(
67                "schema",
68                &self
69                    .schema
70                    .as_ref()
71                    .map(|s| format!("{} fields", s.fields().len())),
72            )
73            .field("allow_nullable_fields", &self.allow_nullable_fields)
74            .finish()
75    }
76}
77
78impl ArrowStreamSerializerConfig {
79    /// Create a new ArrowStreamSerializerConfig with a schema
80    pub fn new(schema: arrow::datatypes::Schema) -> Self {
81        Self {
82            schema: Some(schema),
83            allow_nullable_fields: false,
84        }
85    }
86
87    /// The data type of events that are accepted by `ArrowStreamEncoder`.
88    pub fn input_type(&self) -> vector_core::config::DataType {
89        vector_core::config::DataType::Log
90    }
91
92    /// The schema required by the serializer.
93    pub fn schema_requirement(&self) -> vector_core::schema::Requirement {
94        vector_core::schema::Requirement::empty()
95    }
96}
97
98/// Arrow IPC stream batch serializer that holds the schema
99#[derive(Clone, Debug)]
100pub struct ArrowStreamSerializer {
101    schema: Arc<Schema>,
102}
103
104impl ArrowStreamSerializer {
105    /// Create a new ArrowStreamSerializer with the given configuration
106    pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, vector_common::Error> {
107        let schema = config
108            .schema
109            .ok_or_else(|| vector_common::Error::from("Arrow serializer requires a schema."))?;
110
111        // If allow_nullable_fields is enabled, transform the schema once here
112        // instead of on every batch encoding
113        let schema = if config.allow_nullable_fields {
114            Schema::new_with_metadata(
115                schema
116                    .fields()
117                    .iter()
118                    .map(|f| Arc::new(make_field_nullable(f)))
119                    .collect::<Vec<_>>(),
120                schema.metadata().clone(),
121            )
122        } else {
123            schema
124        };
125
126        Ok(Self {
127            schema: Arc::new(schema),
128        })
129    }
130}
131
132impl tokio_util::codec::Encoder<Vec<Event>> for ArrowStreamSerializer {
133    type Error = ArrowEncodingError;
134
135    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
136        if events.is_empty() {
137            return Err(ArrowEncodingError::NoEvents);
138        }
139
140        let bytes = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&self.schema)))?;
141
142        buffer.extend_from_slice(&bytes);
143        Ok(())
144    }
145}
146
147/// Errors that can occur during Arrow encoding
148#[derive(Debug, Snafu)]
149pub enum ArrowEncodingError {
150    /// Failed to create Arrow record batch
151    #[snafu(display("Failed to create Arrow record batch: {}", source))]
152    RecordBatchCreation {
153        /// The underlying Arrow error
154        source: arrow::error::ArrowError,
155    },
156
157    /// Failed to write Arrow IPC data
158    #[snafu(display("Failed to write Arrow IPC data: {}", source))]
159    IpcWrite {
160        /// The underlying Arrow error
161        source: arrow::error::ArrowError,
162    },
163
164    /// No events provided for encoding
165    #[snafu(display("No events provided for encoding"))]
166    NoEvents,
167
168    /// Schema must be provided before encoding
169    #[snafu(display("Schema must be provided before encoding"))]
170    NoSchemaProvided,
171
172    /// Failed to fetch schema from provider
173    #[snafu(display("Failed to fetch schema from provider: {}", message))]
174    SchemaFetchError {
175        /// Error message from the provider
176        message: String,
177    },
178
179    /// Unsupported Arrow data type for field
180    #[snafu(display(
181        "Unsupported Arrow data type for field '{}': {:?}",
182        field_name,
183        data_type
184    ))]
185    UnsupportedType {
186        /// The field name
187        field_name: String,
188        /// The unsupported data type
189        data_type: DataType,
190    },
191
192    /// Null value encountered for non-nullable field
193    #[snafu(display("Null value for non-nullable field '{}'", field_name))]
194    NullConstraint {
195        /// The field name
196        field_name: String,
197    },
198
199    /// IO error during encoding
200    #[snafu(display("IO error: {}", source))]
201    Io {
202        /// The underlying IO error
203        source: std::io::Error,
204    },
205}
206
207impl From<std::io::Error> for ArrowEncodingError {
208    fn from(error: std::io::Error) -> Self {
209        Self::Io { source: error }
210    }
211}
212
213/// Encodes a batch of events into Arrow IPC streaming format
214pub fn encode_events_to_arrow_ipc_stream(
215    events: &[Event],
216    schema: Option<Arc<Schema>>,
217) -> Result<Bytes, ArrowEncodingError> {
218    if events.is_empty() {
219        return Err(ArrowEncodingError::NoEvents);
220    }
221
222    let schema_ref = schema.ok_or(ArrowEncodingError::NoSchemaProvided)?;
223
224    let record_batch = build_record_batch(schema_ref, events)?;
225
226    let ipc_err = |source| ArrowEncodingError::IpcWrite { source };
227
228    let mut buffer = BytesMut::new().writer();
229    let mut writer =
230        StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).map_err(ipc_err)?;
231    writer.write(&record_batch).map_err(ipc_err)?;
232    writer.finish().map_err(ipc_err)?;
233
234    Ok(buffer.into_inner().freeze())
235}
236
237/// Recursively makes a Field and all its nested fields nullable
238fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
239    let new_data_type = match field.data_type() {
240        DataType::List(inner_field) => DataType::List(Arc::new(make_field_nullable(inner_field))),
241        DataType::Struct(fields) => {
242            DataType::Struct(fields.iter().map(|f| make_field_nullable(f)).collect())
243        }
244        DataType::Map(inner_field, sorted) => {
245            DataType::Map(Arc::new(make_field_nullable(inner_field)), *sorted)
246        }
247        other => other.clone(),
248    };
249
250    field
251        .clone()
252        .with_data_type(new_data_type)
253        .with_nullable(true)
254}
255
256/// Builds an Arrow RecordBatch from events
257fn build_record_batch(
258    schema: Arc<Schema>,
259    events: &[Event],
260) -> Result<RecordBatch, ArrowEncodingError> {
261    let num_fields = schema.fields().len();
262    let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_fields);
263
264    for field in schema.fields() {
265        let field_name = field.name();
266        let nullable = field.is_nullable();
267        let array: ArrayRef = match field.data_type() {
268            DataType::Timestamp(time_unit, _) => {
269                build_timestamp_array(events, field_name, *time_unit, nullable)?
270            }
271            DataType::Utf8 => build_string_array(events, field_name, nullable)?,
272            DataType::Int8 => build_int8_array(events, field_name, nullable)?,
273            DataType::Int16 => build_int16_array(events, field_name, nullable)?,
274            DataType::Int32 => build_int32_array(events, field_name, nullable)?,
275            DataType::Int64 => build_int64_array(events, field_name, nullable)?,
276            DataType::UInt8 => build_uint8_array(events, field_name, nullable)?,
277            DataType::UInt16 => build_uint16_array(events, field_name, nullable)?,
278            DataType::UInt32 => build_uint32_array(events, field_name, nullable)?,
279            DataType::UInt64 => build_uint64_array(events, field_name, nullable)?,
280            DataType::Float32 => build_float32_array(events, field_name, nullable)?,
281            DataType::Float64 => build_float64_array(events, field_name, nullable)?,
282            DataType::Boolean => build_boolean_array(events, field_name, nullable)?,
283            DataType::Binary => build_binary_array(events, field_name, nullable)?,
284            DataType::Decimal128(precision, scale) => {
285                build_decimal128_array(events, field_name, *precision, *scale, nullable)?
286            }
287            DataType::Decimal256(precision, scale) => {
288                build_decimal256_array(events, field_name, *precision, *scale, nullable)?
289            }
290            other_type => {
291                return Err(ArrowEncodingError::UnsupportedType {
292                    field_name: field_name.into(),
293                    data_type: other_type.clone(),
294                });
295            }
296        };
297
298        columns.push(array);
299    }
300
301    RecordBatch::try_new(schema, columns)
302        .map_err(|source| ArrowEncodingError::RecordBatchCreation { source })
303}
304
305/// Macro to handle appending null or returning an error for non-nullable fields.
306macro_rules! handle_null_constraints {
307    ($builder:expr, $nullable:expr, $field_name:expr) => {{
308        if !$nullable {
309            return Err(ArrowEncodingError::NullConstraint {
310                field_name: $field_name.into(),
311            });
312        }
313        $builder.append_null();
314    }};
315}
316
317/// Macro to generate a `build_*_array` function for primitive types.
318macro_rules! define_build_primitive_array_fn {
319    (
320        $fn_name:ident, // The function name (e.g., build_int8_array)
321        $builder_ty:ty, // The builder type (e.g., Int8Builder)
322        // One or more match arms for valid Value types
323        $( $value_pat:pat $(if $guard:expr)? => $append_expr:expr ),+
324    ) => {
325        fn $fn_name(
326            events: &[Event],
327            field_name: &str,
328            nullable: bool,
329        ) -> Result<ArrayRef, ArrowEncodingError> {
330            let mut builder = <$builder_ty>::with_capacity(events.len());
331
332            for event in events {
333                if let Event::Log(log) = event {
334                    match log.get(field_name) {
335                        $(
336                            $value_pat $(if $guard)? => builder.append_value($append_expr),
337                        )+
338                        // All other patterns are treated as null/invalid
339                        _ => handle_null_constraints!(builder, nullable, field_name),
340                    }
341                }
342            }
343            Ok(Arc::new(builder.finish()))
344        }
345    };
346}
347
348fn extract_timestamp(value: &Value) -> Option<DateTime<Utc>> {
349    match value {
350        Value::Timestamp(ts) => Some(*ts),
351        Value::Bytes(bytes) => std::str::from_utf8(bytes)
352            .ok()
353            .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
354            .map(|dt| dt.with_timezone(&Utc)),
355        _ => None,
356    }
357}
358
359fn build_timestamp_array(
360    events: &[Event],
361    field_name: &str,
362    time_unit: TimeUnit,
363    nullable: bool,
364) -> Result<ArrayRef, ArrowEncodingError> {
365    macro_rules! build_array {
366        ($builder:ty, $converter:expr) => {{
367            let mut builder = <$builder>::with_capacity(events.len());
368            for event in events {
369                if let Event::Log(log) = event {
370                    let value_to_append = log.get(field_name).and_then(|value| {
371                        // First, try to extract it as a native or string timestamp
372                        if let Some(ts) = extract_timestamp(value) {
373                            $converter(&ts)
374                        }
375                        // Else, fall back to a raw integer
376                        else if let Value::Integer(i) = value {
377                            Some(*i)
378                        }
379                        // Else, it's an unsupported type (e.g., Bool, Float)
380                        else {
381                            None
382                        }
383                    });
384
385                    if value_to_append.is_none() && !nullable {
386                        return Err(ArrowEncodingError::NullConstraint {
387                            field_name: field_name.into(),
388                        });
389                    }
390
391                    builder.append_option(value_to_append);
392                }
393            }
394            Ok(Arc::new(builder.finish()))
395        }};
396    }
397
398    match time_unit {
399        TimeUnit::Second => {
400            build_array!(TimestampSecondBuilder, |ts: &DateTime<Utc>| Some(
401                ts.timestamp()
402            ))
403        }
404        TimeUnit::Millisecond => {
405            build_array!(TimestampMillisecondBuilder, |ts: &DateTime<Utc>| Some(
406                ts.timestamp_millis()
407            ))
408        }
409        TimeUnit::Microsecond => {
410            build_array!(TimestampMicrosecondBuilder, |ts: &DateTime<Utc>| Some(
411                ts.timestamp_micros()
412            ))
413        }
414        TimeUnit::Nanosecond => {
415            build_array!(TimestampNanosecondBuilder, |ts: &DateTime<Utc>| ts
416                .timestamp_nanos_opt())
417        }
418    }
419}
420
421fn build_string_array(
422    events: &[Event],
423    field_name: &str,
424    nullable: bool,
425) -> Result<ArrayRef, ArrowEncodingError> {
426    let mut builder = StringBuilder::with_capacity(events.len(), 0);
427
428    for event in events {
429        if let Event::Log(log) = event {
430            let mut appended = false;
431            if let Some(value) = log.get(field_name) {
432                match value {
433                    Value::Bytes(bytes) => {
434                        // Attempt direct UTF-8 conversion first, fallback to lossy
435                        match std::str::from_utf8(bytes) {
436                            Ok(s) => builder.append_value(s),
437                            Err(_) => builder.append_value(&String::from_utf8_lossy(bytes)),
438                        }
439                        appended = true;
440                    }
441                    Value::Object(obj) => {
442                        if let Ok(s) = serde_json::to_string(&obj) {
443                            builder.append_value(s);
444                            appended = true;
445                        }
446                    }
447                    Value::Array(arr) => {
448                        if let Ok(s) = serde_json::to_string(&arr) {
449                            builder.append_value(s);
450                            appended = true;
451                        }
452                    }
453                    _ => {
454                        builder.append_value(&value.to_string_lossy());
455                        appended = true;
456                    }
457                }
458            }
459
460            if !appended {
461                handle_null_constraints!(builder, nullable, field_name);
462            }
463        }
464    }
465
466    Ok(Arc::new(builder.finish()))
467}
468
469define_build_primitive_array_fn!(
470    build_int8_array,
471    Int8Builder,
472    Some(Value::Integer(i)) if *i >= i8::MIN as i64 && *i <= i8::MAX as i64 => *i as i8
473);
474
475define_build_primitive_array_fn!(
476    build_int16_array,
477    Int16Builder,
478    Some(Value::Integer(i)) if *i >= i16::MIN as i64 && *i <= i16::MAX as i64 => *i as i16
479);
480
481define_build_primitive_array_fn!(
482    build_int32_array,
483    Int32Builder,
484    Some(Value::Integer(i)) if *i >= i32::MIN as i64 && *i <= i32::MAX as i64 => *i as i32
485);
486
487define_build_primitive_array_fn!(
488    build_int64_array,
489    Int64Builder,
490    Some(Value::Integer(i)) => *i
491);
492
493define_build_primitive_array_fn!(
494    build_uint8_array,
495    UInt8Builder,
496    Some(Value::Integer(i)) if *i >= 0 && *i <= u8::MAX as i64 => *i as u8
497);
498
499define_build_primitive_array_fn!(
500    build_uint16_array,
501    UInt16Builder,
502    Some(Value::Integer(i)) if *i >= 0 && *i <= u16::MAX as i64 => *i as u16
503);
504
505define_build_primitive_array_fn!(
506    build_uint32_array,
507    UInt32Builder,
508    Some(Value::Integer(i)) if *i >= 0 && *i <= u32::MAX as i64 => *i as u32
509);
510
511define_build_primitive_array_fn!(
512    build_uint64_array,
513    UInt64Builder,
514    Some(Value::Integer(i)) if *i >= 0 => *i as u64
515);
516
517define_build_primitive_array_fn!(
518    build_float32_array,
519    Float32Builder,
520    Some(Value::Float(f)) => f.into_inner() as f32,
521    Some(Value::Integer(i)) => *i as f32
522);
523
524define_build_primitive_array_fn!(
525    build_float64_array,
526    Float64Builder,
527    Some(Value::Float(f)) => f.into_inner(),
528    Some(Value::Integer(i)) => *i as f64
529);
530
531define_build_primitive_array_fn!(
532    build_boolean_array,
533    BooleanBuilder,
534    Some(Value::Boolean(b)) => *b
535);
536
537fn build_binary_array(
538    events: &[Event],
539    field_name: &str,
540    nullable: bool,
541) -> Result<ArrayRef, ArrowEncodingError> {
542    let mut builder = BinaryBuilder::with_capacity(events.len(), 0);
543
544    for event in events {
545        if let Event::Log(log) = event {
546            match log.get(field_name) {
547                Some(Value::Bytes(bytes)) => builder.append_value(bytes),
548                _ => handle_null_constraints!(builder, nullable, field_name),
549            }
550        }
551    }
552
553    Ok(Arc::new(builder.finish()))
554}
555
556fn build_decimal128_array(
557    events: &[Event],
558    field_name: &str,
559    precision: u8,
560    scale: i8,
561    nullable: bool,
562) -> Result<ArrayRef, ArrowEncodingError> {
563    let mut builder = Decimal128Builder::with_capacity(events.len())
564        .with_precision_and_scale(precision, scale)
565        .map_err(|_| ArrowEncodingError::UnsupportedType {
566            field_name: field_name.into(),
567            data_type: DataType::Decimal128(precision, scale),
568        })?;
569
570    let target_scale = scale.unsigned_abs() as u32;
571
572    for event in events {
573        if let Event::Log(log) = event {
574            let mut appended = false;
575            match log.get(field_name) {
576                Some(Value::Float(f)) => {
577                    if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) {
578                        decimal.rescale(target_scale);
579                        let mantissa = decimal.mantissa();
580                        builder.append_value(mantissa);
581                        appended = true;
582                    }
583                }
584                Some(Value::Integer(i)) => {
585                    let mut decimal = Decimal::from(*i);
586                    decimal.rescale(target_scale);
587                    let mantissa = decimal.mantissa();
588                    builder.append_value(mantissa);
589                    appended = true;
590                }
591                _ => {}
592            }
593
594            if !appended {
595                handle_null_constraints!(builder, nullable, field_name);
596            }
597        }
598    }
599
600    Ok(Arc::new(builder.finish()))
601}
602
603fn build_decimal256_array(
604    events: &[Event],
605    field_name: &str,
606    precision: u8,
607    scale: i8,
608    nullable: bool,
609) -> Result<ArrayRef, ArrowEncodingError> {
610    let mut builder = Decimal256Builder::with_capacity(events.len())
611        .with_precision_and_scale(precision, scale)
612        .map_err(|_| ArrowEncodingError::UnsupportedType {
613            field_name: field_name.into(),
614            data_type: DataType::Decimal256(precision, scale),
615        })?;
616
617    let target_scale = scale.unsigned_abs() as u32;
618
619    for event in events {
620        if let Event::Log(log) = event {
621            let mut appended = false;
622            match log.get(field_name) {
623                Some(Value::Float(f)) => {
624                    if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) {
625                        decimal.rescale(target_scale);
626                        let mantissa = decimal.mantissa();
627                        // rust_decimal does not support i256 natively so we upcast here
628                        builder.append_value(i256::from_i128(mantissa));
629                        appended = true;
630                    }
631                }
632                Some(Value::Integer(i)) => {
633                    let mut decimal = Decimal::from(*i);
634                    decimal.rescale(target_scale);
635                    let mantissa = decimal.mantissa();
636                    builder.append_value(i256::from_i128(mantissa));
637                    appended = true;
638                }
639                _ => {}
640            }
641
642            if !appended {
643                handle_null_constraints!(builder, nullable, field_name);
644            }
645        }
646    }
647
648    Ok(Arc::new(builder.finish()))
649}
650
651#[cfg(test)]
652mod tests {
653    use super::*;
654    use arrow::{
655        array::{
656            Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray,
657            TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
658            TimestampSecondArray,
659        },
660        datatypes::Field,
661        ipc::reader::StreamReader,
662    };
663    use chrono::Utc;
664    use std::io::Cursor;
665    use vector_core::event::LogEvent;
666
667    #[test]
668    fn test_encode_all_types() {
669        let mut log = LogEvent::default();
670        log.insert("string_field", "test");
671        log.insert("int8_field", 127);
672        log.insert("int16_field", 32000);
673        log.insert("int32_field", 1000000);
674        log.insert("int64_field", 42);
675        log.insert("float32_field", 3.15);
676        log.insert("float64_field", 3.15);
677        log.insert("bool_field", true);
678        log.insert("bytes_field", bytes::Bytes::from("binary"));
679        log.insert("timestamp_field", Utc::now());
680
681        let events = vec![Event::Log(log)];
682
683        let schema = Arc::new(Schema::new(vec![
684            Field::new("string_field", DataType::Utf8, true),
685            Field::new("int8_field", DataType::Int8, true),
686            Field::new("int16_field", DataType::Int16, true),
687            Field::new("int32_field", DataType::Int32, true),
688            Field::new("int64_field", DataType::Int64, true),
689            Field::new("float32_field", DataType::Float32, true),
690            Field::new("float64_field", DataType::Float64, true),
691            Field::new("bool_field", DataType::Boolean, true),
692            Field::new("bytes_field", DataType::Binary, true),
693            Field::new(
694                "timestamp_field",
695                DataType::Timestamp(TimeUnit::Millisecond, None),
696                true,
697            ),
698        ]));
699
700        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
701        assert!(result.is_ok());
702
703        let bytes = result.unwrap();
704        let cursor = Cursor::new(bytes);
705        let mut reader = StreamReader::try_new(cursor, None).unwrap();
706        let batch = reader.next().unwrap().unwrap();
707
708        assert_eq!(batch.num_rows(), 1);
709        assert_eq!(batch.num_columns(), 10);
710
711        // Verify string field
712        assert_eq!(
713            batch
714                .column(0)
715                .as_any()
716                .downcast_ref::<StringArray>()
717                .unwrap()
718                .value(0),
719            "test"
720        );
721
722        // Verify int8 field
723        assert_eq!(
724            batch
725                .column(1)
726                .as_any()
727                .downcast_ref::<arrow::array::Int8Array>()
728                .unwrap()
729                .value(0),
730            127
731        );
732
733        // Verify int16 field
734        assert_eq!(
735            batch
736                .column(2)
737                .as_any()
738                .downcast_ref::<arrow::array::Int16Array>()
739                .unwrap()
740                .value(0),
741            32000
742        );
743
744        // Verify int32 field
745        assert_eq!(
746            batch
747                .column(3)
748                .as_any()
749                .downcast_ref::<arrow::array::Int32Array>()
750                .unwrap()
751                .value(0),
752            1000000
753        );
754
755        // Verify int64 field
756        assert_eq!(
757            batch
758                .column(4)
759                .as_any()
760                .downcast_ref::<Int64Array>()
761                .unwrap()
762                .value(0),
763            42
764        );
765
766        // Verify float32 field
767        assert!(
768            (batch
769                .column(5)
770                .as_any()
771                .downcast_ref::<arrow::array::Float32Array>()
772                .unwrap()
773                .value(0)
774                - 3.15)
775                .abs()
776                < 0.001
777        );
778
779        // Verify float64 field
780        assert!(
781            (batch
782                .column(6)
783                .as_any()
784                .downcast_ref::<Float64Array>()
785                .unwrap()
786                .value(0)
787                - 3.15)
788                .abs()
789                < 0.001
790        );
791
792        // Verify boolean field
793        assert!(
794            batch
795                .column(7)
796                .as_any()
797                .downcast_ref::<BooleanArray>()
798                .unwrap()
799                .value(0),
800            "{}",
801            true
802        );
803
804        // Verify binary field
805        assert_eq!(
806            batch
807                .column(8)
808                .as_any()
809                .downcast_ref::<BinaryArray>()
810                .unwrap()
811                .value(0),
812            b"binary"
813        );
814
815        // Verify timestamp field
816        assert!(
817            !batch
818                .column(9)
819                .as_any()
820                .downcast_ref::<TimestampMillisecondArray>()
821                .unwrap()
822                .is_null(0)
823        );
824    }
825
826    #[test]
827    fn test_encode_null_values() {
828        let mut log1 = LogEvent::default();
829        log1.insert("field_a", 1);
830        // field_b is missing
831
832        let mut log2 = LogEvent::default();
833        log2.insert("field_b", 2);
834        // field_a is missing
835
836        let events = vec![Event::Log(log1), Event::Log(log2)];
837
838        let schema = Arc::new(Schema::new(vec![
839            Field::new("field_a", DataType::Int64, true),
840            Field::new("field_b", DataType::Int64, true),
841        ]));
842
843        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
844        assert!(result.is_ok());
845
846        let bytes = result.unwrap();
847        let cursor = Cursor::new(bytes);
848        let mut reader = StreamReader::try_new(cursor, None).unwrap();
849        let batch = reader.next().unwrap().unwrap();
850
851        assert_eq!(batch.num_rows(), 2);
852
853        let field_a = batch
854            .column(0)
855            .as_any()
856            .downcast_ref::<Int64Array>()
857            .unwrap();
858        assert_eq!(field_a.value(0), 1);
859        assert!(field_a.is_null(1));
860
861        let field_b = batch
862            .column(1)
863            .as_any()
864            .downcast_ref::<Int64Array>()
865            .unwrap();
866        assert!(field_b.is_null(0));
867        assert_eq!(field_b.value(1), 2);
868    }
869
870    #[test]
871    fn test_encode_type_mismatches() {
872        let mut log1 = LogEvent::default();
873        log1.insert("field", 42); // Integer
874
875        let mut log2 = LogEvent::default();
876        log2.insert("field", 3.15); // Float - type mismatch!
877
878        let events = vec![Event::Log(log1), Event::Log(log2)];
879
880        // Schema expects Int64
881        let schema = Arc::new(Schema::new(vec![Field::new(
882            "field",
883            DataType::Int64,
884            true,
885        )]));
886
887        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
888        assert!(result.is_ok());
889
890        let bytes = result.unwrap();
891        let cursor = Cursor::new(bytes);
892        let mut reader = StreamReader::try_new(cursor, None).unwrap();
893        let batch = reader.next().unwrap().unwrap();
894
895        assert_eq!(batch.num_rows(), 2);
896
897        let field_array = batch
898            .column(0)
899            .as_any()
900            .downcast_ref::<Int64Array>()
901            .unwrap();
902        assert_eq!(field_array.value(0), 42);
903        assert!(field_array.is_null(1)); // Type mismatch becomes null
904    }
905
906    #[test]
907    fn test_encode_complex_json_values() {
908        use serde_json::json;
909
910        let mut log = LogEvent::default();
911        log.insert(
912            "object_field",
913            json!({"key": "value", "nested": {"count": 42}}),
914        );
915        log.insert("array_field", json!([1, 2, 3]));
916
917        let events = vec![Event::Log(log)];
918
919        let schema = Arc::new(Schema::new(vec![
920            Field::new("object_field", DataType::Utf8, true),
921            Field::new("array_field", DataType::Utf8, true),
922        ]));
923
924        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
925        assert!(result.is_ok());
926
927        let bytes = result.unwrap();
928        let cursor = Cursor::new(bytes);
929        let mut reader = StreamReader::try_new(cursor, None).unwrap();
930        let batch = reader.next().unwrap().unwrap();
931
932        assert_eq!(batch.num_rows(), 1);
933
934        let object_array = batch
935            .column(0)
936            .as_any()
937            .downcast_ref::<StringArray>()
938            .unwrap();
939        let object_str = object_array.value(0);
940        assert!(object_str.contains("key"));
941        assert!(object_str.contains("value"));
942
943        let array_array = batch
944            .column(1)
945            .as_any()
946            .downcast_ref::<StringArray>()
947            .unwrap();
948        let array_str = array_array.value(0);
949        assert_eq!(array_str, "[1,2,3]");
950    }
951
952    #[test]
953    fn test_encode_unsupported_type() {
954        let mut log = LogEvent::default();
955        log.insert("field", "value");
956
957        let events = vec![Event::Log(log)];
958
959        // Use an unsupported type
960        let schema = Arc::new(Schema::new(vec![Field::new(
961            "field",
962            DataType::Duration(TimeUnit::Millisecond),
963            true,
964        )]));
965
966        let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
967        assert!(result.is_err());
968        assert!(matches!(
969            result.unwrap_err(),
970            ArrowEncodingError::UnsupportedType { .. }
971        ));
972    }
973
974    #[test]
975    fn test_encode_without_schema_fails() {
976        let mut log1 = LogEvent::default();
977        log1.insert("message", "hello");
978
979        let events = vec![Event::Log(log1)];
980
981        let result = encode_events_to_arrow_ipc_stream(&events, None);
982        assert!(result.is_err());
983        assert!(matches!(
984            result.unwrap_err(),
985            ArrowEncodingError::NoSchemaProvided
986        ));
987    }
988
989    #[test]
990    fn test_encode_empty_events() {
991        let events: Vec<Event> = vec![];
992        let result = encode_events_to_arrow_ipc_stream(&events, None);
993        assert!(result.is_err());
994        assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents));
995    }
996
997    #[test]
998    fn test_encode_timestamp_precisions() {
999        let now = Utc::now();
1000        let mut log = LogEvent::default();
1001        log.insert("ts_second", now);
1002        log.insert("ts_milli", now);
1003        log.insert("ts_micro", now);
1004        log.insert("ts_nano", now);
1005
1006        let events = vec![Event::Log(log)];
1007
1008        let schema = Arc::new(Schema::new(vec![
1009            Field::new(
1010                "ts_second",
1011                DataType::Timestamp(TimeUnit::Second, None),
1012                true,
1013            ),
1014            Field::new(
1015                "ts_milli",
1016                DataType::Timestamp(TimeUnit::Millisecond, None),
1017                true,
1018            ),
1019            Field::new(
1020                "ts_micro",
1021                DataType::Timestamp(TimeUnit::Microsecond, None),
1022                true,
1023            ),
1024            Field::new(
1025                "ts_nano",
1026                DataType::Timestamp(TimeUnit::Nanosecond, None),
1027                true,
1028            ),
1029        ]));
1030
1031        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1032        assert!(result.is_ok());
1033
1034        let bytes = result.unwrap();
1035        let cursor = Cursor::new(bytes);
1036        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1037        let batch = reader.next().unwrap().unwrap();
1038
1039        assert_eq!(batch.num_rows(), 1);
1040        assert_eq!(batch.num_columns(), 4);
1041
1042        let ts_second = batch
1043            .column(0)
1044            .as_any()
1045            .downcast_ref::<TimestampSecondArray>()
1046            .unwrap();
1047        assert!(!ts_second.is_null(0));
1048        assert_eq!(ts_second.value(0), now.timestamp());
1049
1050        let ts_milli = batch
1051            .column(1)
1052            .as_any()
1053            .downcast_ref::<TimestampMillisecondArray>()
1054            .unwrap();
1055        assert!(!ts_milli.is_null(0));
1056        assert_eq!(ts_milli.value(0), now.timestamp_millis());
1057
1058        let ts_micro = batch
1059            .column(2)
1060            .as_any()
1061            .downcast_ref::<TimestampMicrosecondArray>()
1062            .unwrap();
1063        assert!(!ts_micro.is_null(0));
1064        assert_eq!(ts_micro.value(0), now.timestamp_micros());
1065
1066        let ts_nano = batch
1067            .column(3)
1068            .as_any()
1069            .downcast_ref::<TimestampNanosecondArray>()
1070            .unwrap();
1071        assert!(!ts_nano.is_null(0));
1072        assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap());
1073    }
1074
1075    #[test]
1076    fn test_encode_mixed_timestamp_string_and_native() {
1077        // Test mixing string timestamps with native Timestamp values
1078        let mut log1 = LogEvent::default();
1079        log1.insert("ts", "2025-10-22T10:18:44.256Z"); // String
1080
1081        let mut log2 = LogEvent::default();
1082        log2.insert("ts", Utc::now()); // Native Timestamp
1083
1084        let mut log3 = LogEvent::default();
1085        log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds)
1086
1087        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1088
1089        let schema = Arc::new(Schema::new(vec![Field::new(
1090            "ts",
1091            DataType::Timestamp(TimeUnit::Nanosecond, None),
1092            true,
1093        )]));
1094
1095        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1096        assert!(result.is_ok());
1097
1098        let bytes = result.unwrap();
1099        let cursor = Cursor::new(bytes);
1100        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1101        let batch = reader.next().unwrap().unwrap();
1102
1103        assert_eq!(batch.num_rows(), 3);
1104
1105        let ts_array = batch
1106            .column(0)
1107            .as_any()
1108            .downcast_ref::<TimestampNanosecondArray>()
1109            .unwrap();
1110
1111        // All three should be non-null
1112        assert!(!ts_array.is_null(0));
1113        assert!(!ts_array.is_null(1));
1114        assert!(!ts_array.is_null(2));
1115
1116        // First one should match the parsed string
1117        let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z")
1118            .unwrap()
1119            .timestamp_nanos_opt()
1120            .unwrap();
1121        assert_eq!(ts_array.value(0), expected);
1122
1123        // Third one should match the integer
1124        assert_eq!(ts_array.value(2), 1729594724256000000_i64);
1125    }
1126
1127    #[test]
1128    fn test_encode_invalid_string_timestamp() {
1129        // Test that invalid timestamp strings become null
1130        let mut log1 = LogEvent::default();
1131        log1.insert("timestamp", "not-a-timestamp");
1132
1133        let mut log2 = LogEvent::default();
1134        log2.insert("timestamp", "2025-10-22T10:18:44.256Z"); // Valid
1135
1136        let mut log3 = LogEvent::default();
1137        log3.insert("timestamp", "2025-99-99T99:99:99Z"); // Invalid
1138
1139        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1140
1141        let schema = Arc::new(Schema::new(vec![Field::new(
1142            "timestamp",
1143            DataType::Timestamp(TimeUnit::Nanosecond, None),
1144            true,
1145        )]));
1146
1147        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1148        assert!(result.is_ok());
1149
1150        let bytes = result.unwrap();
1151        let cursor = Cursor::new(bytes);
1152        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1153        let batch = reader.next().unwrap().unwrap();
1154
1155        assert_eq!(batch.num_rows(), 3);
1156
1157        let ts_array = batch
1158            .column(0)
1159            .as_any()
1160            .downcast_ref::<TimestampNanosecondArray>()
1161            .unwrap();
1162
1163        // Invalid timestamps should be null
1164        assert!(ts_array.is_null(0));
1165        assert!(!ts_array.is_null(1)); // Valid one
1166        assert!(ts_array.is_null(2));
1167    }
1168
1169    #[test]
1170    fn test_encode_decimal128_from_integer() {
1171        use arrow::array::Decimal128Array;
1172
1173        let mut log = LogEvent::default();
1174        // Store quantity as integer: 1000
1175        log.insert("quantity", 1000_i64);
1176
1177        let events = vec![Event::Log(log)];
1178
1179        // Decimal(10, 3) - will represent 1000 as 1000.000
1180        let schema = Arc::new(Schema::new(vec![Field::new(
1181            "quantity",
1182            DataType::Decimal128(10, 3),
1183            true,
1184        )]));
1185
1186        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1187        assert!(result.is_ok());
1188
1189        let bytes = result.unwrap();
1190        let cursor = Cursor::new(bytes);
1191        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1192        let batch = reader.next().unwrap().unwrap();
1193
1194        assert_eq!(batch.num_rows(), 1);
1195
1196        let decimal_array = batch
1197            .column(0)
1198            .as_any()
1199            .downcast_ref::<Decimal128Array>()
1200            .unwrap();
1201
1202        assert!(!decimal_array.is_null(0));
1203        // 1000 with scale 3 = 1000 * 10^3 = 1000000
1204        assert_eq!(decimal_array.value(0), 1000000_i128);
1205    }
1206
1207    #[test]
1208    fn test_encode_decimal256() {
1209        use arrow::array::Decimal256Array;
1210
1211        let mut log = LogEvent::default();
1212        // Very large precision number
1213        log.insert("big_value", 123456789.123456_f64);
1214
1215        let events = vec![Event::Log(log)];
1216
1217        // Decimal256(50, 6) - high precision decimal
1218        let schema = Arc::new(Schema::new(vec![Field::new(
1219            "big_value",
1220            DataType::Decimal256(50, 6),
1221            true,
1222        )]));
1223
1224        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1225        assert!(result.is_ok());
1226
1227        let bytes = result.unwrap();
1228        let cursor = Cursor::new(bytes);
1229        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1230        let batch = reader.next().unwrap().unwrap();
1231
1232        assert_eq!(batch.num_rows(), 1);
1233
1234        let decimal_array = batch
1235            .column(0)
1236            .as_any()
1237            .downcast_ref::<Decimal256Array>()
1238            .unwrap();
1239
1240        assert!(!decimal_array.is_null(0));
1241        // Value should be non-null and encoded
1242        let value = decimal_array.value(0);
1243        assert!(value.to_i128().is_some());
1244    }
1245
1246    #[test]
1247    fn test_encode_decimal_null_values() {
1248        use arrow::array::Decimal128Array;
1249
1250        let mut log1 = LogEvent::default();
1251        log1.insert("price", 99.99_f64);
1252
1253        let log2 = LogEvent::default();
1254        // No price field - should be null
1255
1256        let mut log3 = LogEvent::default();
1257        log3.insert("price", 50.00_f64);
1258
1259        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1260
1261        let schema = Arc::new(Schema::new(vec![Field::new(
1262            "price",
1263            DataType::Decimal128(10, 2),
1264            true,
1265        )]));
1266
1267        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1268        assert!(result.is_ok());
1269
1270        let bytes = result.unwrap();
1271        let cursor = Cursor::new(bytes);
1272        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1273        let batch = reader.next().unwrap().unwrap();
1274
1275        assert_eq!(batch.num_rows(), 3);
1276
1277        let decimal_array = batch
1278            .column(0)
1279            .as_any()
1280            .downcast_ref::<Decimal128Array>()
1281            .unwrap();
1282
1283        // First row: 99.99
1284        assert!(!decimal_array.is_null(0));
1285        assert_eq!(decimal_array.value(0), 9999_i128);
1286
1287        // Second row: null
1288        assert!(decimal_array.is_null(1));
1289
1290        // Third row: 50.00
1291        assert!(!decimal_array.is_null(2));
1292        assert_eq!(decimal_array.value(2), 5000_i128);
1293    }
1294
1295    #[test]
1296    fn test_encode_unsigned_integer_types() {
1297        use arrow::array::{UInt8Array, UInt16Array, UInt32Array, UInt64Array};
1298
1299        let mut log = LogEvent::default();
1300        log.insert("uint8_field", 255_i64);
1301        log.insert("uint16_field", 65535_i64);
1302        log.insert("uint32_field", 4294967295_i64);
1303        log.insert("uint64_field", 9223372036854775807_i64);
1304
1305        let events = vec![Event::Log(log)];
1306
1307        let schema = Arc::new(Schema::new(vec![
1308            Field::new("uint8_field", DataType::UInt8, true),
1309            Field::new("uint16_field", DataType::UInt16, true),
1310            Field::new("uint32_field", DataType::UInt32, true),
1311            Field::new("uint64_field", DataType::UInt64, true),
1312        ]));
1313
1314        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1315        assert!(result.is_ok());
1316
1317        let bytes = result.unwrap();
1318        let cursor = Cursor::new(bytes);
1319        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1320        let batch = reader.next().unwrap().unwrap();
1321
1322        assert_eq!(batch.num_rows(), 1);
1323        assert_eq!(batch.num_columns(), 4);
1324
1325        // Verify uint8
1326        let uint8_array = batch
1327            .column(0)
1328            .as_any()
1329            .downcast_ref::<UInt8Array>()
1330            .unwrap();
1331        assert_eq!(uint8_array.value(0), 255_u8);
1332
1333        // Verify uint16
1334        let uint16_array = batch
1335            .column(1)
1336            .as_any()
1337            .downcast_ref::<UInt16Array>()
1338            .unwrap();
1339        assert_eq!(uint16_array.value(0), 65535_u16);
1340
1341        // Verify uint32
1342        let uint32_array = batch
1343            .column(2)
1344            .as_any()
1345            .downcast_ref::<UInt32Array>()
1346            .unwrap();
1347        assert_eq!(uint32_array.value(0), 4294967295_u32);
1348
1349        // Verify uint64
1350        let uint64_array = batch
1351            .column(3)
1352            .as_any()
1353            .downcast_ref::<UInt64Array>()
1354            .unwrap();
1355        assert_eq!(uint64_array.value(0), 9223372036854775807_u64);
1356    }
1357
1358    #[test]
1359    fn test_encode_unsigned_integers_with_null_and_overflow() {
1360        use arrow::array::{UInt8Array, UInt32Array};
1361
1362        let mut log1 = LogEvent::default();
1363        log1.insert("uint8_field", 100_i64);
1364        log1.insert("uint32_field", 1000_i64);
1365
1366        let mut log2 = LogEvent::default();
1367        log2.insert("uint8_field", 300_i64); // Overflow - should be null
1368        log2.insert("uint32_field", -1_i64); // Negative - should be null
1369
1370        let log3 = LogEvent::default();
1371        // Missing fields - should be null
1372
1373        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1374
1375        let schema = Arc::new(Schema::new(vec![
1376            Field::new("uint8_field", DataType::UInt8, true),
1377            Field::new("uint32_field", DataType::UInt32, true),
1378        ]));
1379
1380        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1381        assert!(result.is_ok());
1382
1383        let bytes = result.unwrap();
1384        let cursor = Cursor::new(bytes);
1385        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1386        let batch = reader.next().unwrap().unwrap();
1387
1388        assert_eq!(batch.num_rows(), 3);
1389
1390        // Check uint8 column
1391        let uint8_array = batch
1392            .column(0)
1393            .as_any()
1394            .downcast_ref::<UInt8Array>()
1395            .unwrap();
1396        assert_eq!(uint8_array.value(0), 100_u8); // Valid
1397        assert!(uint8_array.is_null(1)); // Overflow
1398        assert!(uint8_array.is_null(2)); // Missing
1399
1400        // Check uint32 column
1401        let uint32_array = batch
1402            .column(1)
1403            .as_any()
1404            .downcast_ref::<UInt32Array>()
1405            .unwrap();
1406        assert_eq!(uint32_array.value(0), 1000_u32); // Valid
1407        assert!(uint32_array.is_null(1)); // Negative
1408        assert!(uint32_array.is_null(2)); // Missing
1409    }
1410
1411    #[test]
1412    fn test_encode_non_nullable_field_with_null_value() {
1413        // Test that encoding fails when a non-nullable field encounters a null value
1414        let mut log1 = LogEvent::default();
1415        log1.insert("required_field", 42);
1416
1417        let log2 = LogEvent::default();
1418        // log2 is missing required_field - should cause an error
1419
1420        let events = vec![Event::Log(log1), Event::Log(log2)];
1421
1422        // Create schema with non-nullable field
1423        let schema = Arc::new(Schema::new(vec![Field::new(
1424            "required_field",
1425            DataType::Int64,
1426            false, // Not nullable
1427        )]));
1428
1429        let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
1430        assert!(result.is_err());
1431
1432        match result.unwrap_err() {
1433            ArrowEncodingError::NullConstraint { field_name } => {
1434                assert_eq!(field_name, "required_field");
1435            }
1436            other => panic!("Expected NullConstraint error, got: {:?}", other),
1437        }
1438    }
1439
1440    #[test]
1441    fn test_encode_non_nullable_string_field_with_missing_value() {
1442        // Test that encoding fails for non-nullable string field
1443        let mut log1 = LogEvent::default();
1444        log1.insert("name", "Alice");
1445
1446        let mut log2 = LogEvent::default();
1447        log2.insert("name", "Bob");
1448
1449        let log3 = LogEvent::default();
1450        // log3 is missing name field
1451
1452        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1453
1454        let schema = Arc::new(Schema::new(vec![Field::new(
1455            "name",
1456            DataType::Utf8,
1457            false, // Not nullable
1458        )]));
1459
1460        let result = encode_events_to_arrow_ipc_stream(&events, Some(schema));
1461        assert!(result.is_err());
1462
1463        match result.unwrap_err() {
1464            ArrowEncodingError::NullConstraint { field_name } => {
1465                assert_eq!(field_name, "name");
1466            }
1467            other => panic!("Expected NullConstraint error, got: {:?}", other),
1468        }
1469    }
1470
1471    #[test]
1472    fn test_encode_non_nullable_field_all_values_present() {
1473        // Test that encoding succeeds when all values are present for non-nullable field
1474        let mut log1 = LogEvent::default();
1475        log1.insert("id", 1);
1476
1477        let mut log2 = LogEvent::default();
1478        log2.insert("id", 2);
1479
1480        let mut log3 = LogEvent::default();
1481        log3.insert("id", 3);
1482
1483        let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)];
1484
1485        let schema = Arc::new(Schema::new(vec![Field::new(
1486            "id",
1487            DataType::Int64,
1488            false, // Not nullable
1489        )]));
1490
1491        let result = encode_events_to_arrow_ipc_stream(&events, Some(Arc::clone(&schema)));
1492        assert!(result.is_ok());
1493
1494        let bytes = result.unwrap();
1495        let cursor = Cursor::new(bytes);
1496        let mut reader = StreamReader::try_new(cursor, None).unwrap();
1497        let batch = reader.next().unwrap().unwrap();
1498
1499        assert_eq!(batch.num_rows(), 3);
1500
1501        let id_array = batch
1502            .column(0)
1503            .as_any()
1504            .downcast_ref::<Int64Array>()
1505            .unwrap();
1506
1507        assert_eq!(id_array.value(0), 1);
1508        assert_eq!(id_array.value(1), 2);
1509        assert_eq!(id_array.value(2), 3);
1510        assert!(!id_array.is_null(0));
1511        assert!(!id_array.is_null(1));
1512        assert!(!id_array.is_null(2));
1513    }
1514
1515    #[test]
1516    fn test_config_allow_nullable_fields_overrides_schema() {
1517        use tokio_util::codec::Encoder;
1518
1519        // Create events: One valid, one missing the "required" field
1520        let mut log1 = LogEvent::default();
1521        log1.insert("strict_field", 42);
1522        let log2 = LogEvent::default();
1523        let events = vec![Event::Log(log1), Event::Log(log2)];
1524
1525        let schema = Schema::new(vec![Field::new("strict_field", DataType::Int64, false)]);
1526
1527        let mut config = ArrowStreamSerializerConfig::new(schema);
1528        config.allow_nullable_fields = true;
1529
1530        let mut serializer =
1531            ArrowStreamSerializer::new(config).expect("Failed to create serializer");
1532
1533        let mut buffer = BytesMut::new();
1534        serializer
1535            .encode(events, &mut buffer)
1536            .expect("Encoding should succeed when allow_nullable_fields is true");
1537
1538        let cursor = Cursor::new(buffer);
1539        let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
1540        let batch = reader.next().unwrap().expect("Failed to read batch");
1541
1542        assert_eq!(batch.num_rows(), 2);
1543
1544        let binding = batch.schema();
1545        let output_field = binding.field(0);
1546        assert!(
1547            output_field.is_nullable(),
1548            "The output schema field should have been transformed to nullable=true"
1549        );
1550
1551        let array = batch
1552            .column(0)
1553            .as_any()
1554            .downcast_ref::<Int64Array>()
1555            .unwrap();
1556
1557        assert_eq!(array.value(0), 42);
1558        assert!(!array.is_null(0));
1559        assert!(
1560            array.is_null(1),
1561            "The missing value should be encoded as null"
1562        );
1563    }
1564
1565    #[test]
1566    fn test_make_field_nullable_with_nested_types() {
1567        // Test that make_field_nullable recursively handles List and Struct types
1568
1569        // Create a nested structure: Struct containing a List of Structs
1570        // struct { inner_list: [{ nested_field: Int64 }] }
1571        let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
1572        let inner_struct =
1573            DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
1574        let list_field = Field::new("item", inner_struct, false);
1575        let list_type = DataType::List(Arc::new(list_field));
1576        let outer_field = Field::new("inner_list", list_type, false);
1577        let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));
1578
1579        let original_field = Field::new("root", outer_struct, false);
1580
1581        // Apply make_field_nullable
1582        let nullable_field = make_field_nullable(&original_field);
1583
1584        // Verify root field is nullable
1585        assert!(
1586            nullable_field.is_nullable(),
1587            "Root field should be nullable"
1588        );
1589
1590        // Verify nested struct is nullable
1591        if let DataType::Struct(root_fields) = nullable_field.data_type() {
1592            let inner_list_field = &root_fields[0];
1593            assert!(
1594                inner_list_field.is_nullable(),
1595                "inner_list field should be nullable"
1596            );
1597
1598            // Verify list element is nullable
1599            if let DataType::List(list_item_field) = inner_list_field.data_type() {
1600                assert!(
1601                    list_item_field.is_nullable(),
1602                    "List item field should be nullable"
1603                );
1604
1605                // Verify inner struct fields are nullable
1606                if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
1607                    let nested_field = &inner_struct_fields[0];
1608                    assert!(
1609                        nested_field.is_nullable(),
1610                        "nested_field should be nullable"
1611                    );
1612                } else {
1613                    panic!("Expected Struct type for list items");
1614                }
1615            } else {
1616                panic!("Expected List type for inner_list");
1617            }
1618        } else {
1619            panic!("Expected Struct type for root field");
1620        }
1621    }
1622
1623    #[test]
1624    fn test_make_field_nullable_with_map_type() {
1625        // Test that make_field_nullable handles Map types
1626        // Map is internally represented as List<Struct<key, value>>
1627
1628        // Create a map: Map<Utf8, Int64>
1629        // Internally: List<Struct<entries: {key: Utf8, value: Int64}>>
1630        let key_field = Field::new("key", DataType::Utf8, false);
1631        let value_field = Field::new("value", DataType::Int64, false);
1632        let entries_struct =
1633            DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
1634        let entries_field = Field::new("entries", entries_struct, false);
1635        let map_type = DataType::Map(Arc::new(entries_field), false);
1636
1637        let original_field = Field::new("my_map", map_type, false);
1638
1639        // Apply make_field_nullable
1640        let nullable_field = make_field_nullable(&original_field);
1641
1642        // Verify root field is nullable
1643        assert!(
1644            nullable_field.is_nullable(),
1645            "Root map field should be nullable"
1646        );
1647
1648        // Verify map entries are nullable
1649        if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
1650            assert!(
1651                entries_field.is_nullable(),
1652                "Map entries field should be nullable"
1653            );
1654
1655            // Verify the struct inside the map is nullable
1656            if let DataType::Struct(struct_fields) = entries_field.data_type() {
1657                let key_field = &struct_fields[0];
1658                let value_field = &struct_fields[1];
1659                assert!(key_field.is_nullable(), "Map key field should be nullable");
1660                assert!(
1661                    value_field.is_nullable(),
1662                    "Map value field should be nullable"
1663                );
1664            } else {
1665                panic!("Expected Struct type for map entries");
1666            }
1667        } else {
1668            panic!("Expected Map type for my_map field");
1669        }
1670    }
1671}