codecs/encoding/
serializer.rs

1//! Serializer configuration and implementation for encoding structured events as bytes.
2
3use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7use super::chunking::Chunker;
8use super::format::{
9    AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
10    CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
11    JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig,
12    NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig,
13    ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig,
14    TextSerializer, TextSerializerConfig,
15};
16#[cfg(feature = "opentelemetry")]
17use super::format::{OtlpSerializer, OtlpSerializerConfig};
18use super::framing::{
19    CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
20    VarintLengthDelimitedEncoderConfig,
21};
22
23/// Serializer configuration.
24#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(tag = "codec", rename_all = "snake_case")]
27#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
28pub enum SerializerConfig {
29    /// Encodes an event as an [Apache Avro][apache_avro] message.
30    ///
31    /// [apache_avro]: https://avro.apache.org/
32    Avro {
33        /// Apache Avro-specific encoder options.
34        avro: AvroSerializerOptions,
35    },
36
37    /// Encodes an event as a CEF (Common Event Format) formatted message.
38    ///
39    Cef(
40        /// Options for the CEF encoder.
41        CefSerializerConfig,
42    ),
43
44    /// Encodes an event as a CSV message.
45    ///
46    /// This codec must be configured with fields to encode.
47    ///
48    Csv(CsvSerializerConfig),
49
50    /// Encodes an event as a [GELF][gelf] message.
51    ///
52    /// This codec is experimental for the following reason:
53    ///
54    /// The GELF specification is more strict than the actual Graylog receiver.
55    /// Vector's encoder currently adheres more strictly to the GELF spec, with
56    /// the exception that some characters such as `@`  are allowed in field names.
57    ///
58    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
59    /// by Graylog and is much more relaxed than the GELF spec.
60    ///
61    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
62    /// the codec might continue to relax the enforcement of the specification.
63    ///
64    /// [gelf]: https://docs.graylog.org/docs/gelf
65    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
66    Gelf(GelfSerializerConfig),
67
68    /// Encodes an event as [JSON][json].
69    ///
70    /// [json]: https://www.json.org/
71    Json(JsonSerializerConfig),
72
73    /// Encodes an event as a [logfmt][logfmt] message.
74    ///
75    /// [logfmt]: https://brandur.org/logfmt
76    Logfmt,
77
78    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
79    ///
80    /// This codec is **[experimental][experimental]**.
81    ///
82    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
83    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
84    Native,
85
86    /// Encodes an event in the [native JSON format][vector_native_json].
87    ///
88    /// This codec is **[experimental][experimental]**.
89    ///
90    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
91    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
92    NativeJson,
93
94    /// Encodes an event in the [OTLP (OpenTelemetry Protocol)][otlp] format.
95    ///
96    /// This codec uses protobuf encoding, which is the recommended format for OTLP.
97    /// The output is suitable for sending to OTLP-compatible endpoints with
98    /// `content-type: application/x-protobuf`.
99    ///
100    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
101    #[cfg(feature = "opentelemetry")]
102    Otlp,
103
104    /// Encodes an event as a [Protobuf][protobuf] message.
105    ///
106    /// [protobuf]: https://protobuf.dev/
107    Protobuf(ProtobufSerializerConfig),
108
109    /// No encoding.
110    ///
111    /// This encoding uses the `message` field of a log event.
112    ///
113    /// Be careful if you are modifying your log events (for example, by using a `remap`
114    /// transform) and removing the message field while doing additional parsing on it, as this
115    /// could lead to the encoding emitting empty strings for the given event.
116    RawMessage,
117
118    /// Plain text encoding.
119    ///
120    /// This encoding uses the `message` field of a log event. For metrics, it uses an
121    /// encoding that resembles the Prometheus export format.
122    ///
123    /// Be careful if you are modifying your log events (for example, by using a `remap`
124    /// transform) and removing the message field while doing additional parsing on it, as this
125    /// could lead to the encoding emitting empty strings for the given event.
126    Text(TextSerializerConfig),
127}
128
129impl Default for SerializerConfig {
130    fn default() -> Self {
131        Self::Json(JsonSerializerConfig::default())
132    }
133}
134
135impl From<AvroSerializerConfig> for SerializerConfig {
136    fn from(config: AvroSerializerConfig) -> Self {
137        Self::Avro { avro: config.avro }
138    }
139}
140
141impl From<CefSerializerConfig> for SerializerConfig {
142    fn from(config: CefSerializerConfig) -> Self {
143        Self::Cef(config)
144    }
145}
146
147impl From<CsvSerializerConfig> for SerializerConfig {
148    fn from(config: CsvSerializerConfig) -> Self {
149        Self::Csv(config)
150    }
151}
152
153impl From<GelfSerializerConfig> for SerializerConfig {
154    fn from(config: GelfSerializerConfig) -> Self {
155        Self::Gelf(config)
156    }
157}
158
159impl From<JsonSerializerConfig> for SerializerConfig {
160    fn from(config: JsonSerializerConfig) -> Self {
161        Self::Json(config)
162    }
163}
164
165impl From<LogfmtSerializerConfig> for SerializerConfig {
166    fn from(_: LogfmtSerializerConfig) -> Self {
167        Self::Logfmt
168    }
169}
170
171impl From<NativeSerializerConfig> for SerializerConfig {
172    fn from(_: NativeSerializerConfig) -> Self {
173        Self::Native
174    }
175}
176
177impl From<NativeJsonSerializerConfig> for SerializerConfig {
178    fn from(_: NativeJsonSerializerConfig) -> Self {
179        Self::NativeJson
180    }
181}
182
183#[cfg(feature = "opentelemetry")]
184impl From<OtlpSerializerConfig> for SerializerConfig {
185    fn from(_: OtlpSerializerConfig) -> Self {
186        Self::Otlp
187    }
188}
189
190impl From<ProtobufSerializerConfig> for SerializerConfig {
191    fn from(config: ProtobufSerializerConfig) -> Self {
192        Self::Protobuf(config)
193    }
194}
195
196impl From<RawMessageSerializerConfig> for SerializerConfig {
197    fn from(_: RawMessageSerializerConfig) -> Self {
198        Self::RawMessage
199    }
200}
201
202impl From<TextSerializerConfig> for SerializerConfig {
203    fn from(config: TextSerializerConfig) -> Self {
204        Self::Text(config)
205    }
206}
207
208impl SerializerConfig {
209    /// Build the `Serializer` from this configuration.
210    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
211        match self {
212            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
213                AvroSerializerConfig::new(avro.schema.clone()).build()?,
214            )),
215            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
216            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
217            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
218            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
219            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
220            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
221            SerializerConfig::NativeJson => {
222                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
223            }
224            #[cfg(feature = "opentelemetry")]
225            SerializerConfig::Otlp => {
226                Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
227            }
228            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
229            SerializerConfig::RawMessage => {
230                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
231            }
232            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
233        }
234    }
235
236    /// Return an appropriate default framer for the given serializer.
237    pub fn default_stream_framing(&self) -> FramingConfig {
238        match self {
239            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
240            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
241            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
242            // our length delimited framer obviously will not do.
243            //
244            // This is OK for now, because the Avro serializer is more ceremonial than anything
245            // else, existing to curry serializer config options to Pulsar's native client, not to
246            // actually serialize the bytes themselves... but we're still exposing this method and
247            // we should do so accurately, even if practically it doesn't need to be.
248            //
249            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
250            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
251                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
252            }
253            #[cfg(feature = "opentelemetry")]
254            SerializerConfig::Otlp => FramingConfig::Bytes,
255            SerializerConfig::Protobuf(_) => {
256                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
257            }
258            SerializerConfig::Cef(_)
259            | SerializerConfig::Csv(_)
260            | SerializerConfig::Json(_)
261            | SerializerConfig::Logfmt
262            | SerializerConfig::NativeJson
263            | SerializerConfig::RawMessage
264            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
265            SerializerConfig::Gelf(_) => {
266                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
267            }
268        }
269    }
270
271    /// The data type of events that are accepted by this `Serializer`.
272    pub fn input_type(&self) -> DataType {
273        match self {
274            SerializerConfig::Avro { avro } => {
275                AvroSerializerConfig::new(avro.schema.clone()).input_type()
276            }
277            SerializerConfig::Cef(config) => config.input_type(),
278            SerializerConfig::Csv(config) => config.input_type(),
279            SerializerConfig::Gelf(config) => config.input_type(),
280            SerializerConfig::Json(config) => config.input_type(),
281            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
282            SerializerConfig::Native => NativeSerializerConfig.input_type(),
283            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
284            #[cfg(feature = "opentelemetry")]
285            SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
286            SerializerConfig::Protobuf(config) => config.input_type(),
287            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
288            SerializerConfig::Text(config) => config.input_type(),
289        }
290    }
291
292    /// The schema required by the serializer.
293    pub fn schema_requirement(&self) -> schema::Requirement {
294        match self {
295            SerializerConfig::Avro { avro } => {
296                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
297            }
298            SerializerConfig::Cef(config) => config.schema_requirement(),
299            SerializerConfig::Csv(config) => config.schema_requirement(),
300            SerializerConfig::Gelf(config) => config.schema_requirement(),
301            SerializerConfig::Json(config) => config.schema_requirement(),
302            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
303            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
304            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
305            #[cfg(feature = "opentelemetry")]
306            SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
307            SerializerConfig::Protobuf(config) => config.schema_requirement(),
308            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
309            SerializerConfig::Text(config) => config.schema_requirement(),
310        }
311    }
312}
313
314/// Serialize structured events as bytes.
315#[derive(Debug, Clone)]
316pub enum Serializer {
317    /// Uses an `AvroSerializer` for serialization.
318    Avro(AvroSerializer),
319    /// Uses a `CefSerializer` for serialization.
320    Cef(CefSerializer),
321    /// Uses a `CsvSerializer` for serialization.
322    Csv(CsvSerializer),
323    /// Uses a `GelfSerializer` for serialization.
324    Gelf(GelfSerializer),
325    /// Uses a `JsonSerializer` for serialization.
326    Json(JsonSerializer),
327    /// Uses a `LogfmtSerializer` for serialization.
328    Logfmt(LogfmtSerializer),
329    /// Uses a `NativeSerializer` for serialization.
330    Native(NativeSerializer),
331    /// Uses a `NativeJsonSerializer` for serialization.
332    NativeJson(NativeJsonSerializer),
333    /// Uses an `OtlpSerializer` for serialization.
334    #[cfg(feature = "opentelemetry")]
335    Otlp(OtlpSerializer),
336    /// Uses a `ProtobufSerializer` for serialization.
337    Protobuf(ProtobufSerializer),
338    /// Uses a `RawMessageSerializer` for serialization.
339    RawMessage(RawMessageSerializer),
340    /// Uses a `TextSerializer` for serialization.
341    Text(TextSerializer),
342}
343
344impl Serializer {
345    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
346    pub fn supports_json(&self) -> bool {
347        match self {
348            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
349            Serializer::Avro(_)
350            | Serializer::Cef(_)
351            | Serializer::Csv(_)
352            | Serializer::Logfmt(_)
353            | Serializer::Text(_)
354            | Serializer::Native(_)
355            | Serializer::Protobuf(_)
356            | Serializer::RawMessage(_) => false,
357            #[cfg(feature = "opentelemetry")]
358            Serializer::Otlp(_) => false,
359        }
360    }
361
362    /// Encode event and represent it as JSON value.
363    ///
364    /// # Panics
365    ///
366    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
367    /// if you need to determine the capability to encode to JSON at runtime.
368    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
369        match self {
370            Serializer::Gelf(serializer) => serializer.to_json_value(event),
371            Serializer::Json(serializer) => serializer.to_json_value(event),
372            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
373            Serializer::Avro(_)
374            | Serializer::Cef(_)
375            | Serializer::Csv(_)
376            | Serializer::Logfmt(_)
377            | Serializer::Text(_)
378            | Serializer::Native(_)
379            | Serializer::Protobuf(_)
380            | Serializer::RawMessage(_) => {
381                panic!("Serializer does not support JSON")
382            }
383            #[cfg(feature = "opentelemetry")]
384            Serializer::Otlp(_) => {
385                panic!("Serializer does not support JSON")
386            }
387        }
388    }
389
390    /// Returns the chunking implementation for the serializer, if any is supported.
391    pub fn chunker(&self) -> Option<Chunker> {
392        match self {
393            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
394            _ => None,
395        }
396    }
397
398    /// Returns whether the serializer produces binary output.
399    ///
400    /// Binary serializers produce raw bytes that should not be interpreted as text,
401    /// while text serializers produce UTF-8 encoded strings.
402    pub const fn is_binary(&self) -> bool {
403        match self {
404            Serializer::RawMessage(_)
405            | Serializer::Avro(_)
406            | Serializer::Native(_)
407            | Serializer::Protobuf(_) => true,
408            #[cfg(feature = "opentelemetry")]
409            Serializer::Otlp(_) => true,
410            Serializer::Cef(_)
411            | Serializer::Csv(_)
412            | Serializer::Logfmt(_)
413            | Serializer::Gelf(_)
414            | Serializer::Json(_)
415            | Serializer::Text(_)
416            | Serializer::NativeJson(_) => false,
417        }
418    }
419}
420
421impl From<AvroSerializer> for Serializer {
422    fn from(serializer: AvroSerializer) -> Self {
423        Self::Avro(serializer)
424    }
425}
426
427impl From<CefSerializer> for Serializer {
428    fn from(serializer: CefSerializer) -> Self {
429        Self::Cef(serializer)
430    }
431}
432
433impl From<CsvSerializer> for Serializer {
434    fn from(serializer: CsvSerializer) -> Self {
435        Self::Csv(serializer)
436    }
437}
438
439impl From<GelfSerializer> for Serializer {
440    fn from(serializer: GelfSerializer) -> Self {
441        Self::Gelf(serializer)
442    }
443}
444
445impl From<JsonSerializer> for Serializer {
446    fn from(serializer: JsonSerializer) -> Self {
447        Self::Json(serializer)
448    }
449}
450
451impl From<LogfmtSerializer> for Serializer {
452    fn from(serializer: LogfmtSerializer) -> Self {
453        Self::Logfmt(serializer)
454    }
455}
456
457impl From<NativeSerializer> for Serializer {
458    fn from(serializer: NativeSerializer) -> Self {
459        Self::Native(serializer)
460    }
461}
462
463impl From<NativeJsonSerializer> for Serializer {
464    fn from(serializer: NativeJsonSerializer) -> Self {
465        Self::NativeJson(serializer)
466    }
467}
468
469#[cfg(feature = "opentelemetry")]
470impl From<OtlpSerializer> for Serializer {
471    fn from(serializer: OtlpSerializer) -> Self {
472        Self::Otlp(serializer)
473    }
474}
475
476impl From<ProtobufSerializer> for Serializer {
477    fn from(serializer: ProtobufSerializer) -> Self {
478        Self::Protobuf(serializer)
479    }
480}
481
482impl From<RawMessageSerializer> for Serializer {
483    fn from(serializer: RawMessageSerializer) -> Self {
484        Self::RawMessage(serializer)
485    }
486}
487
488impl From<TextSerializer> for Serializer {
489    fn from(serializer: TextSerializer) -> Self {
490        Self::Text(serializer)
491    }
492}
493
494impl tokio_util::codec::Encoder<Event> for Serializer {
495    type Error = vector_common::Error;
496
497    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
498        match self {
499            Serializer::Avro(serializer) => serializer.encode(event, buffer),
500            Serializer::Cef(serializer) => serializer.encode(event, buffer),
501            Serializer::Csv(serializer) => serializer.encode(event, buffer),
502            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
503            Serializer::Json(serializer) => serializer.encode(event, buffer),
504            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
505            Serializer::Native(serializer) => serializer.encode(event, buffer),
506            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
507            #[cfg(feature = "opentelemetry")]
508            Serializer::Otlp(serializer) => serializer.encode(event, buffer),
509            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
510            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
511            Serializer::Text(serializer) => serializer.encode(event, buffer),
512        }
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    #[test]
521    fn test_serializer_config_default() {
522        // SerializerConfig should default to Json
523        let config = SerializerConfig::default();
524        assert!(matches!(config, SerializerConfig::Json(_)));
525    }
526
527    #[test]
528    fn test_serializer_is_binary() {
529        // Test that is_binary correctly identifies binary serializers
530        let json_config = JsonSerializerConfig::default();
531        let json_serializer = Serializer::Json(json_config.build());
532        assert!(!json_serializer.is_binary());
533
534        let native_serializer = Serializer::Native(NativeSerializerConfig.build());
535        assert!(native_serializer.is_binary());
536
537        let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
538        assert!(raw_message_serializer.is_binary());
539    }
540
541    #[test]
542    fn test_serializer_supports_json() {
543        // Test that supports_json correctly identifies JSON-capable serializers
544        let json_config = JsonSerializerConfig::default();
545        let json_serializer = Serializer::Json(json_config.build());
546        assert!(json_serializer.supports_json());
547
548        let text_config = TextSerializerConfig::default();
549        let text_serializer = Serializer::Text(text_config.build());
550        assert!(!text_serializer.supports_json());
551    }
552
553    #[test]
554    fn test_serializer_config_build() {
555        // Test that SerializerConfig can be built successfully
556        let config = SerializerConfig::Json(JsonSerializerConfig::default());
557        let serializer = config.build();
558        assert!(serializer.is_ok());
559        assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
560    }
561
562    #[test]
563    fn test_serializer_config_default_framing() {
564        // Test that default framing is appropriate for each serializer type
565        let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
566        assert!(matches!(
567            json_config.default_stream_framing(),
568            FramingConfig::NewlineDelimited
569        ));
570
571        let native_config = SerializerConfig::Native;
572        assert!(matches!(
573            native_config.default_stream_framing(),
574            FramingConfig::LengthDelimited(_)
575        ));
576    }
577}