codecs/encoding/
serializer.rs

1//! Serializer configuration and implementation for encoding structured events as bytes.
2
3use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "syslog")]
12use super::format::{SyslogSerializer, SyslogSerializerConfig};
13use super::{
14    chunking::Chunker,
15    format::{
16        AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
17        CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
18        GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
19        LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
20        NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
21        RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
22    },
23    framing::{
24        CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
25        VarintLengthDelimitedEncoderConfig,
26    },
27};
28
29/// Serializer configuration.
30#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "codec", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
34pub enum SerializerConfig {
35    /// Encodes an event as an [Apache Avro][apache_avro] message.
36    ///
37    /// [apache_avro]: https://avro.apache.org/
38    Avro {
39        /// Apache Avro-specific encoder options.
40        avro: AvroSerializerOptions,
41    },
42
43    /// Encodes an event as a CEF (Common Event Format) formatted message.
44    ///
45    Cef(
46        /// Options for the CEF encoder.
47        CefSerializerConfig,
48    ),
49
50    /// Encodes an event as a CSV message.
51    ///
52    /// This codec must be configured with fields to encode.
53    ///
54    Csv(CsvSerializerConfig),
55
56    /// Encodes an event as a [GELF][gelf] message.
57    ///
58    /// This codec is experimental for the following reason:
59    ///
60    /// The GELF specification is more strict than the actual Graylog receiver.
61    /// Vector's encoder currently adheres more strictly to the GELF spec, with
62    /// the exception that some characters such as `@`  are allowed in field names.
63    ///
64    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
65    /// by Graylog and is much more relaxed than the GELF spec.
66    ///
67    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
68    /// the codec might continue to relax the enforcement of the specification.
69    ///
70    /// [gelf]: https://docs.graylog.org/docs/gelf
71    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
72    Gelf(GelfSerializerConfig),
73
74    /// Encodes an event as [JSON][json].
75    ///
76    /// [json]: https://www.json.org/
77    Json(JsonSerializerConfig),
78
79    /// Encodes an event as a [logfmt][logfmt] message.
80    ///
81    /// [logfmt]: https://brandur.org/logfmt
82    Logfmt,
83
84    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
85    ///
86    /// This codec is **[experimental][experimental]**.
87    ///
88    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
89    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
90    Native,
91
92    /// Encodes an event in the [native JSON format][vector_native_json].
93    ///
94    /// This codec is **[experimental][experimental]**.
95    ///
96    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
97    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
98    NativeJson,
99
100    /// Encodes an event in the [OTLP (OpenTelemetry Protocol)][otlp] format.
101    ///
102    /// This codec uses protobuf encoding, which is the recommended format for OTLP.
103    /// The output is suitable for sending to OTLP-compatible endpoints with
104    /// `content-type: application/x-protobuf`.
105    ///
106    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
107    #[cfg(feature = "opentelemetry")]
108    Otlp,
109
110    /// Encodes an event as a [Protobuf][protobuf] message.
111    ///
112    /// [protobuf]: https://protobuf.dev/
113    Protobuf(ProtobufSerializerConfig),
114
115    /// No encoding.
116    ///
117    /// This encoding uses the `message` field of a log event.
118    ///
119    /// Be careful if you are modifying your log events (for example, by using a `remap`
120    /// transform) and removing the message field while doing additional parsing on it, as this
121    /// could lead to the encoding emitting empty strings for the given event.
122    RawMessage,
123
124    /// Plain text encoding.
125    ///
126    /// This encoding uses the `message` field of a log event. For metrics, it uses an
127    /// encoding that resembles the Prometheus export format.
128    ///
129    /// Be careful if you are modifying your log events (for example, by using a `remap`
130    /// transform) and removing the message field while doing additional parsing on it, as this
131    /// could lead to the encoding emitting empty strings for the given event.
132    Text(TextSerializerConfig),
133
134    /// Syslog encoding
135    /// RFC 3164 and 5424 are supported
136    #[cfg(feature = "syslog")]
137    Syslog(SyslogSerializerConfig),
138}
139
140impl Default for SerializerConfig {
141    fn default() -> Self {
142        Self::Json(JsonSerializerConfig::default())
143    }
144}
145
146/// Batch serializer configuration.
147#[configurable_component]
148#[derive(Clone, Debug)]
149#[serde(tag = "codec", rename_all = "snake_case")]
150#[configurable(metadata(
151    docs::enum_tag_description = "The codec to use for batch encoding events."
152))]
153pub enum BatchSerializerConfig {
154    /// Encodes events in [Apache Arrow][apache_arrow] IPC streaming format.
155    ///
156    /// This is the streaming variant of the Arrow IPC format, which writes
157    /// a continuous stream of record batches.
158    ///
159    /// [apache_arrow]: https://arrow.apache.org/
160    #[cfg(feature = "arrow")]
161    #[serde(rename = "arrow_stream")]
162    ArrowStream(ArrowStreamSerializerConfig),
163}
164
165#[cfg(feature = "arrow")]
166impl BatchSerializerConfig {
167    /// Build the `ArrowStreamSerializer` from this configuration.
168    pub fn build(
169        &self,
170    ) -> Result<ArrowStreamSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
171        match self {
172            BatchSerializerConfig::ArrowStream(arrow_config) => {
173                ArrowStreamSerializer::new(arrow_config.clone())
174            }
175        }
176    }
177
178    /// The data type of events that are accepted by this batch serializer.
179    pub fn input_type(&self) -> DataType {
180        match self {
181            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
182        }
183    }
184
185    /// The schema required by the batch serializer.
186    pub fn schema_requirement(&self) -> schema::Requirement {
187        match self {
188            BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
189        }
190    }
191}
192
193impl From<AvroSerializerConfig> for SerializerConfig {
194    fn from(config: AvroSerializerConfig) -> Self {
195        Self::Avro { avro: config.avro }
196    }
197}
198
199impl From<CefSerializerConfig> for SerializerConfig {
200    fn from(config: CefSerializerConfig) -> Self {
201        Self::Cef(config)
202    }
203}
204
205impl From<CsvSerializerConfig> for SerializerConfig {
206    fn from(config: CsvSerializerConfig) -> Self {
207        Self::Csv(config)
208    }
209}
210
211impl From<GelfSerializerConfig> for SerializerConfig {
212    fn from(config: GelfSerializerConfig) -> Self {
213        Self::Gelf(config)
214    }
215}
216
217impl From<JsonSerializerConfig> for SerializerConfig {
218    fn from(config: JsonSerializerConfig) -> Self {
219        Self::Json(config)
220    }
221}
222
223impl From<LogfmtSerializerConfig> for SerializerConfig {
224    fn from(_: LogfmtSerializerConfig) -> Self {
225        Self::Logfmt
226    }
227}
228
229impl From<NativeSerializerConfig> for SerializerConfig {
230    fn from(_: NativeSerializerConfig) -> Self {
231        Self::Native
232    }
233}
234
235impl From<NativeJsonSerializerConfig> for SerializerConfig {
236    fn from(_: NativeJsonSerializerConfig) -> Self {
237        Self::NativeJson
238    }
239}
240
241#[cfg(feature = "opentelemetry")]
242impl From<OtlpSerializerConfig> for SerializerConfig {
243    fn from(_: OtlpSerializerConfig) -> Self {
244        Self::Otlp
245    }
246}
247
248impl From<ProtobufSerializerConfig> for SerializerConfig {
249    fn from(config: ProtobufSerializerConfig) -> Self {
250        Self::Protobuf(config)
251    }
252}
253
254impl From<RawMessageSerializerConfig> for SerializerConfig {
255    fn from(_: RawMessageSerializerConfig) -> Self {
256        Self::RawMessage
257    }
258}
259
260impl From<TextSerializerConfig> for SerializerConfig {
261    fn from(config: TextSerializerConfig) -> Self {
262        Self::Text(config)
263    }
264}
265
266impl SerializerConfig {
267    /// Build the `Serializer` from this configuration.
268    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
269        match self {
270            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
271                AvroSerializerConfig::new(avro.schema.clone()).build()?,
272            )),
273            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
274            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
275            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
276            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
277            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
278            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
279            SerializerConfig::NativeJson => {
280                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
281            }
282            #[cfg(feature = "opentelemetry")]
283            SerializerConfig::Otlp => {
284                Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
285            }
286            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
287            SerializerConfig::RawMessage => {
288                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
289            }
290            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
291            #[cfg(feature = "syslog")]
292            SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
293        }
294    }
295
296    /// Return an appropriate default framer for the given serializer.
297    pub fn default_stream_framing(&self) -> FramingConfig {
298        match self {
299            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
300            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
301            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
302            // our length delimited framer obviously will not do.
303            //
304            // This is OK for now, because the Avro serializer is more ceremonial than anything
305            // else, existing to curry serializer config options to Pulsar's native client, not to
306            // actually serialize the bytes themselves... but we're still exposing this method and
307            // we should do so accurately, even if practically it doesn't need to be.
308            //
309            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
310            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
311                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
312            }
313            #[cfg(feature = "opentelemetry")]
314            SerializerConfig::Otlp => FramingConfig::Bytes,
315            SerializerConfig::Protobuf(_) => {
316                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
317            }
318            SerializerConfig::Cef(_)
319            | SerializerConfig::Csv(_)
320            | SerializerConfig::Json(_)
321            | SerializerConfig::Logfmt
322            | SerializerConfig::NativeJson
323            | SerializerConfig::RawMessage
324            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
325            #[cfg(feature = "syslog")]
326            SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
327            SerializerConfig::Gelf(_) => {
328                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
329            }
330        }
331    }
332
333    /// The data type of events that are accepted by this `Serializer`.
334    pub fn input_type(&self) -> DataType {
335        match self {
336            SerializerConfig::Avro { avro } => {
337                AvroSerializerConfig::new(avro.schema.clone()).input_type()
338            }
339            SerializerConfig::Cef(config) => config.input_type(),
340            SerializerConfig::Csv(config) => config.input_type(),
341            SerializerConfig::Gelf(config) => config.input_type(),
342            SerializerConfig::Json(config) => config.input_type(),
343            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
344            SerializerConfig::Native => NativeSerializerConfig.input_type(),
345            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
346            #[cfg(feature = "opentelemetry")]
347            SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
348            SerializerConfig::Protobuf(config) => config.input_type(),
349            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
350            SerializerConfig::Text(config) => config.input_type(),
351            #[cfg(feature = "syslog")]
352            SerializerConfig::Syslog(config) => config.input_type(),
353        }
354    }
355
356    /// The schema required by the serializer.
357    pub fn schema_requirement(&self) -> schema::Requirement {
358        match self {
359            SerializerConfig::Avro { avro } => {
360                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
361            }
362            SerializerConfig::Cef(config) => config.schema_requirement(),
363            SerializerConfig::Csv(config) => config.schema_requirement(),
364            SerializerConfig::Gelf(config) => config.schema_requirement(),
365            SerializerConfig::Json(config) => config.schema_requirement(),
366            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
367            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
368            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
369            #[cfg(feature = "opentelemetry")]
370            SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
371            SerializerConfig::Protobuf(config) => config.schema_requirement(),
372            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
373            SerializerConfig::Text(config) => config.schema_requirement(),
374            #[cfg(feature = "syslog")]
375            SerializerConfig::Syslog(config) => config.schema_requirement(),
376        }
377    }
378}
379
380/// Serialize structured events as bytes.
381#[derive(Debug, Clone)]
382pub enum Serializer {
383    /// Uses an `AvroSerializer` for serialization.
384    Avro(AvroSerializer),
385    /// Uses a `CefSerializer` for serialization.
386    Cef(CefSerializer),
387    /// Uses a `CsvSerializer` for serialization.
388    Csv(CsvSerializer),
389    /// Uses a `GelfSerializer` for serialization.
390    Gelf(GelfSerializer),
391    /// Uses a `JsonSerializer` for serialization.
392    Json(JsonSerializer),
393    /// Uses a `LogfmtSerializer` for serialization.
394    Logfmt(LogfmtSerializer),
395    /// Uses a `NativeSerializer` for serialization.
396    Native(NativeSerializer),
397    /// Uses a `NativeJsonSerializer` for serialization.
398    NativeJson(NativeJsonSerializer),
399    /// Uses an `OtlpSerializer` for serialization.
400    #[cfg(feature = "opentelemetry")]
401    Otlp(OtlpSerializer),
402    /// Uses a `ProtobufSerializer` for serialization.
403    Protobuf(ProtobufSerializer),
404    /// Uses a `RawMessageSerializer` for serialization.
405    RawMessage(RawMessageSerializer),
406    /// Uses a `TextSerializer` for serialization.
407    Text(TextSerializer),
408    /// Uses a `SyslogSerializer` for serialization.
409    #[cfg(feature = "syslog")]
410    Syslog(SyslogSerializer),
411}
412
413impl Serializer {
414    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
415    pub fn supports_json(&self) -> bool {
416        match self {
417            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
418            Serializer::Avro(_)
419            | Serializer::Cef(_)
420            | Serializer::Csv(_)
421            | Serializer::Logfmt(_)
422            | Serializer::Text(_)
423            | Serializer::Native(_)
424            | Serializer::Protobuf(_)
425            | Serializer::RawMessage(_) => false,
426            #[cfg(feature = "syslog")]
427            Serializer::Syslog(_) => false,
428            #[cfg(feature = "opentelemetry")]
429            Serializer::Otlp(_) => false,
430        }
431    }
432
433    /// Encode event and represent it as JSON value.
434    ///
435    /// # Panics
436    ///
437    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
438    /// if you need to determine the capability to encode to JSON at runtime.
439    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
440        match self {
441            Serializer::Gelf(serializer) => serializer.to_json_value(event),
442            Serializer::Json(serializer) => serializer.to_json_value(event),
443            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
444            Serializer::Avro(_)
445            | Serializer::Cef(_)
446            | Serializer::Csv(_)
447            | Serializer::Logfmt(_)
448            | Serializer::Text(_)
449            | Serializer::Native(_)
450            | Serializer::Protobuf(_)
451            | Serializer::RawMessage(_) => {
452                panic!("Serializer does not support JSON")
453            }
454            #[cfg(feature = "syslog")]
455            Serializer::Syslog(_) => {
456                panic!("Serializer does not support JSON")
457            }
458            #[cfg(feature = "opentelemetry")]
459            Serializer::Otlp(_) => {
460                panic!("Serializer does not support JSON")
461            }
462        }
463    }
464
465    /// Returns the chunking implementation for the serializer, if any is supported.
466    pub fn chunker(&self) -> Option<Chunker> {
467        match self {
468            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
469            _ => None,
470        }
471    }
472
473    /// Returns whether the serializer produces binary output.
474    ///
475    /// Binary serializers produce raw bytes that should not be interpreted as text,
476    /// while text serializers produce UTF-8 encoded strings.
477    pub const fn is_binary(&self) -> bool {
478        match self {
479            Serializer::RawMessage(_)
480            | Serializer::Avro(_)
481            | Serializer::Native(_)
482            | Serializer::Protobuf(_) => true,
483            #[cfg(feature = "opentelemetry")]
484            Serializer::Otlp(_) => true,
485            #[cfg(feature = "syslog")]
486            Serializer::Syslog(_) => false,
487            Serializer::Cef(_)
488            | Serializer::Csv(_)
489            | Serializer::Logfmt(_)
490            | Serializer::Gelf(_)
491            | Serializer::Json(_)
492            | Serializer::Text(_)
493            | Serializer::NativeJson(_) => false,
494        }
495    }
496}
497
498impl From<AvroSerializer> for Serializer {
499    fn from(serializer: AvroSerializer) -> Self {
500        Self::Avro(serializer)
501    }
502}
503
504impl From<CefSerializer> for Serializer {
505    fn from(serializer: CefSerializer) -> Self {
506        Self::Cef(serializer)
507    }
508}
509
510impl From<CsvSerializer> for Serializer {
511    fn from(serializer: CsvSerializer) -> Self {
512        Self::Csv(serializer)
513    }
514}
515
516impl From<GelfSerializer> for Serializer {
517    fn from(serializer: GelfSerializer) -> Self {
518        Self::Gelf(serializer)
519    }
520}
521
522impl From<JsonSerializer> for Serializer {
523    fn from(serializer: JsonSerializer) -> Self {
524        Self::Json(serializer)
525    }
526}
527
528impl From<LogfmtSerializer> for Serializer {
529    fn from(serializer: LogfmtSerializer) -> Self {
530        Self::Logfmt(serializer)
531    }
532}
533
534impl From<NativeSerializer> for Serializer {
535    fn from(serializer: NativeSerializer) -> Self {
536        Self::Native(serializer)
537    }
538}
539
540impl From<NativeJsonSerializer> for Serializer {
541    fn from(serializer: NativeJsonSerializer) -> Self {
542        Self::NativeJson(serializer)
543    }
544}
545
546#[cfg(feature = "opentelemetry")]
547impl From<OtlpSerializer> for Serializer {
548    fn from(serializer: OtlpSerializer) -> Self {
549        Self::Otlp(serializer)
550    }
551}
552
553impl From<ProtobufSerializer> for Serializer {
554    fn from(serializer: ProtobufSerializer) -> Self {
555        Self::Protobuf(serializer)
556    }
557}
558
559impl From<RawMessageSerializer> for Serializer {
560    fn from(serializer: RawMessageSerializer) -> Self {
561        Self::RawMessage(serializer)
562    }
563}
564
565impl From<TextSerializer> for Serializer {
566    fn from(serializer: TextSerializer) -> Self {
567        Self::Text(serializer)
568    }
569}
570#[cfg(feature = "syslog")]
571impl From<SyslogSerializer> for Serializer {
572    fn from(serializer: SyslogSerializer) -> Self {
573        Self::Syslog(serializer)
574    }
575}
576
577impl tokio_util::codec::Encoder<Event> for Serializer {
578    type Error = vector_common::Error;
579
580    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
581        match self {
582            Serializer::Avro(serializer) => serializer.encode(event, buffer),
583            Serializer::Cef(serializer) => serializer.encode(event, buffer),
584            Serializer::Csv(serializer) => serializer.encode(event, buffer),
585            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
586            Serializer::Json(serializer) => serializer.encode(event, buffer),
587            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
588            Serializer::Native(serializer) => serializer.encode(event, buffer),
589            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
590            #[cfg(feature = "opentelemetry")]
591            Serializer::Otlp(serializer) => serializer.encode(event, buffer),
592            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
593            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
594            Serializer::Text(serializer) => serializer.encode(event, buffer),
595            #[cfg(feature = "syslog")]
596            Serializer::Syslog(serializer) => serializer.encode(event, buffer),
597        }
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604
605    #[test]
606    fn test_serializer_config_default() {
607        // SerializerConfig should default to Json
608        let config = SerializerConfig::default();
609        assert!(matches!(config, SerializerConfig::Json(_)));
610    }
611
612    #[test]
613    fn test_serializer_is_binary() {
614        // Test that is_binary correctly identifies binary serializers
615        let json_config = JsonSerializerConfig::default();
616        let json_serializer = Serializer::Json(json_config.build());
617        assert!(!json_serializer.is_binary());
618
619        let native_serializer = Serializer::Native(NativeSerializerConfig.build());
620        assert!(native_serializer.is_binary());
621
622        let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
623        assert!(raw_message_serializer.is_binary());
624    }
625
626    #[test]
627    fn test_serializer_supports_json() {
628        // Test that supports_json correctly identifies JSON-capable serializers
629        let json_config = JsonSerializerConfig::default();
630        let json_serializer = Serializer::Json(json_config.build());
631        assert!(json_serializer.supports_json());
632
633        let text_config = TextSerializerConfig::default();
634        let text_serializer = Serializer::Text(text_config.build());
635        assert!(!text_serializer.supports_json());
636    }
637
638    #[test]
639    fn test_serializer_config_build() {
640        // Test that SerializerConfig can be built successfully
641        let config = SerializerConfig::Json(JsonSerializerConfig::default());
642        let serializer = config.build();
643        assert!(serializer.is_ok());
644        assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
645    }
646
647    #[test]
648    fn test_serializer_config_default_framing() {
649        // Test that default framing is appropriate for each serializer type
650        let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
651        assert!(matches!(
652            json_config.default_stream_framing(),
653            FramingConfig::NewlineDelimited
654        ));
655
656        let native_config = SerializerConfig::Native;
657        assert!(matches!(
658            native_config.default_stream_framing(),
659            FramingConfig::LengthDelimited(_)
660        ));
661    }
662}