1use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11use super::{
12 chunking::Chunker,
13 format::{
14 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
15 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
16 GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
17 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
18 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
19 RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
20 },
21 framing::{
22 CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
23 VarintLengthDelimitedEncoderConfig,
24 },
25};
26
27#[configurable_component]
29#[derive(Clone, Debug)]
30#[serde(tag = "codec", rename_all = "snake_case")]
31#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
32pub enum SerializerConfig {
33 Avro {
37 avro: AvroSerializerOptions,
39 },
40
41 Cef(
44 CefSerializerConfig,
46 ),
47
48 Csv(CsvSerializerConfig),
53
54 Gelf(GelfSerializerConfig),
71
72 Json(JsonSerializerConfig),
76
77 Logfmt,
81
82 Native,
89
90 NativeJson,
97
98 #[cfg(feature = "opentelemetry")]
106 Otlp,
107
108 Protobuf(ProtobufSerializerConfig),
112
113 RawMessage,
121
122 Text(TextSerializerConfig),
131}
132
133impl Default for SerializerConfig {
134 fn default() -> Self {
135 Self::Json(JsonSerializerConfig::default())
136 }
137}
138
139#[configurable_component]
141#[derive(Clone, Debug)]
142#[serde(tag = "codec", rename_all = "snake_case")]
143#[configurable(metadata(
144 docs::enum_tag_description = "The codec to use for batch encoding events."
145))]
146pub enum BatchSerializerConfig {
147 #[cfg(feature = "arrow")]
154 #[serde(rename = "arrow_stream")]
155 ArrowStream(ArrowStreamSerializerConfig),
156}
157
158#[cfg(feature = "arrow")]
159impl BatchSerializerConfig {
160 pub fn build(
162 &self,
163 ) -> Result<ArrowStreamSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
164 match self {
165 BatchSerializerConfig::ArrowStream(arrow_config) => {
166 ArrowStreamSerializer::new(arrow_config.clone())
167 }
168 }
169 }
170
171 pub fn input_type(&self) -> DataType {
173 match self {
174 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
175 }
176 }
177
178 pub fn schema_requirement(&self) -> schema::Requirement {
180 match self {
181 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
182 }
183 }
184}
185
186impl From<AvroSerializerConfig> for SerializerConfig {
187 fn from(config: AvroSerializerConfig) -> Self {
188 Self::Avro { avro: config.avro }
189 }
190}
191
192impl From<CefSerializerConfig> for SerializerConfig {
193 fn from(config: CefSerializerConfig) -> Self {
194 Self::Cef(config)
195 }
196}
197
198impl From<CsvSerializerConfig> for SerializerConfig {
199 fn from(config: CsvSerializerConfig) -> Self {
200 Self::Csv(config)
201 }
202}
203
204impl From<GelfSerializerConfig> for SerializerConfig {
205 fn from(config: GelfSerializerConfig) -> Self {
206 Self::Gelf(config)
207 }
208}
209
210impl From<JsonSerializerConfig> for SerializerConfig {
211 fn from(config: JsonSerializerConfig) -> Self {
212 Self::Json(config)
213 }
214}
215
216impl From<LogfmtSerializerConfig> for SerializerConfig {
217 fn from(_: LogfmtSerializerConfig) -> Self {
218 Self::Logfmt
219 }
220}
221
222impl From<NativeSerializerConfig> for SerializerConfig {
223 fn from(_: NativeSerializerConfig) -> Self {
224 Self::Native
225 }
226}
227
228impl From<NativeJsonSerializerConfig> for SerializerConfig {
229 fn from(_: NativeJsonSerializerConfig) -> Self {
230 Self::NativeJson
231 }
232}
233
234#[cfg(feature = "opentelemetry")]
235impl From<OtlpSerializerConfig> for SerializerConfig {
236 fn from(_: OtlpSerializerConfig) -> Self {
237 Self::Otlp
238 }
239}
240
241impl From<ProtobufSerializerConfig> for SerializerConfig {
242 fn from(config: ProtobufSerializerConfig) -> Self {
243 Self::Protobuf(config)
244 }
245}
246
247impl From<RawMessageSerializerConfig> for SerializerConfig {
248 fn from(_: RawMessageSerializerConfig) -> Self {
249 Self::RawMessage
250 }
251}
252
253impl From<TextSerializerConfig> for SerializerConfig {
254 fn from(config: TextSerializerConfig) -> Self {
255 Self::Text(config)
256 }
257}
258
259impl SerializerConfig {
260 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
262 match self {
263 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
264 AvroSerializerConfig::new(avro.schema.clone()).build()?,
265 )),
266 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
267 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
268 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
269 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
270 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
271 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
272 SerializerConfig::NativeJson => {
273 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
274 }
275 #[cfg(feature = "opentelemetry")]
276 SerializerConfig::Otlp => {
277 Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
278 }
279 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
280 SerializerConfig::RawMessage => {
281 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
282 }
283 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
284 }
285 }
286
287 pub fn default_stream_framing(&self) -> FramingConfig {
289 match self {
290 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
302 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
303 }
304 #[cfg(feature = "opentelemetry")]
305 SerializerConfig::Otlp => FramingConfig::Bytes,
306 SerializerConfig::Protobuf(_) => {
307 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
308 }
309 SerializerConfig::Cef(_)
310 | SerializerConfig::Csv(_)
311 | SerializerConfig::Json(_)
312 | SerializerConfig::Logfmt
313 | SerializerConfig::NativeJson
314 | SerializerConfig::RawMessage
315 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
316 SerializerConfig::Gelf(_) => {
317 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
318 }
319 }
320 }
321
322 pub fn input_type(&self) -> DataType {
324 match self {
325 SerializerConfig::Avro { avro } => {
326 AvroSerializerConfig::new(avro.schema.clone()).input_type()
327 }
328 SerializerConfig::Cef(config) => config.input_type(),
329 SerializerConfig::Csv(config) => config.input_type(),
330 SerializerConfig::Gelf(config) => config.input_type(),
331 SerializerConfig::Json(config) => config.input_type(),
332 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
333 SerializerConfig::Native => NativeSerializerConfig.input_type(),
334 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
335 #[cfg(feature = "opentelemetry")]
336 SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
337 SerializerConfig::Protobuf(config) => config.input_type(),
338 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
339 SerializerConfig::Text(config) => config.input_type(),
340 }
341 }
342
343 pub fn schema_requirement(&self) -> schema::Requirement {
345 match self {
346 SerializerConfig::Avro { avro } => {
347 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
348 }
349 SerializerConfig::Cef(config) => config.schema_requirement(),
350 SerializerConfig::Csv(config) => config.schema_requirement(),
351 SerializerConfig::Gelf(config) => config.schema_requirement(),
352 SerializerConfig::Json(config) => config.schema_requirement(),
353 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
354 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
355 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
356 #[cfg(feature = "opentelemetry")]
357 SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
358 SerializerConfig::Protobuf(config) => config.schema_requirement(),
359 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
360 SerializerConfig::Text(config) => config.schema_requirement(),
361 }
362 }
363}
364
365#[derive(Debug, Clone)]
367pub enum Serializer {
368 Avro(AvroSerializer),
370 Cef(CefSerializer),
372 Csv(CsvSerializer),
374 Gelf(GelfSerializer),
376 Json(JsonSerializer),
378 Logfmt(LogfmtSerializer),
380 Native(NativeSerializer),
382 NativeJson(NativeJsonSerializer),
384 #[cfg(feature = "opentelemetry")]
386 Otlp(OtlpSerializer),
387 Protobuf(ProtobufSerializer),
389 RawMessage(RawMessageSerializer),
391 Text(TextSerializer),
393}
394
395impl Serializer {
396 pub fn supports_json(&self) -> bool {
398 match self {
399 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
400 Serializer::Avro(_)
401 | Serializer::Cef(_)
402 | Serializer::Csv(_)
403 | Serializer::Logfmt(_)
404 | Serializer::Text(_)
405 | Serializer::Native(_)
406 | Serializer::Protobuf(_)
407 | Serializer::RawMessage(_) => false,
408 #[cfg(feature = "opentelemetry")]
409 Serializer::Otlp(_) => false,
410 }
411 }
412
413 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
420 match self {
421 Serializer::Gelf(serializer) => serializer.to_json_value(event),
422 Serializer::Json(serializer) => serializer.to_json_value(event),
423 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
424 Serializer::Avro(_)
425 | Serializer::Cef(_)
426 | Serializer::Csv(_)
427 | Serializer::Logfmt(_)
428 | Serializer::Text(_)
429 | Serializer::Native(_)
430 | Serializer::Protobuf(_)
431 | Serializer::RawMessage(_) => {
432 panic!("Serializer does not support JSON")
433 }
434 #[cfg(feature = "opentelemetry")]
435 Serializer::Otlp(_) => {
436 panic!("Serializer does not support JSON")
437 }
438 }
439 }
440
441 pub fn chunker(&self) -> Option<Chunker> {
443 match self {
444 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
445 _ => None,
446 }
447 }
448
449 pub const fn is_binary(&self) -> bool {
454 match self {
455 Serializer::RawMessage(_)
456 | Serializer::Avro(_)
457 | Serializer::Native(_)
458 | Serializer::Protobuf(_) => true,
459 #[cfg(feature = "opentelemetry")]
460 Serializer::Otlp(_) => true,
461 Serializer::Cef(_)
462 | Serializer::Csv(_)
463 | Serializer::Logfmt(_)
464 | Serializer::Gelf(_)
465 | Serializer::Json(_)
466 | Serializer::Text(_)
467 | Serializer::NativeJson(_) => false,
468 }
469 }
470}
471
472impl From<AvroSerializer> for Serializer {
473 fn from(serializer: AvroSerializer) -> Self {
474 Self::Avro(serializer)
475 }
476}
477
478impl From<CefSerializer> for Serializer {
479 fn from(serializer: CefSerializer) -> Self {
480 Self::Cef(serializer)
481 }
482}
483
484impl From<CsvSerializer> for Serializer {
485 fn from(serializer: CsvSerializer) -> Self {
486 Self::Csv(serializer)
487 }
488}
489
490impl From<GelfSerializer> for Serializer {
491 fn from(serializer: GelfSerializer) -> Self {
492 Self::Gelf(serializer)
493 }
494}
495
496impl From<JsonSerializer> for Serializer {
497 fn from(serializer: JsonSerializer) -> Self {
498 Self::Json(serializer)
499 }
500}
501
502impl From<LogfmtSerializer> for Serializer {
503 fn from(serializer: LogfmtSerializer) -> Self {
504 Self::Logfmt(serializer)
505 }
506}
507
508impl From<NativeSerializer> for Serializer {
509 fn from(serializer: NativeSerializer) -> Self {
510 Self::Native(serializer)
511 }
512}
513
514impl From<NativeJsonSerializer> for Serializer {
515 fn from(serializer: NativeJsonSerializer) -> Self {
516 Self::NativeJson(serializer)
517 }
518}
519
520#[cfg(feature = "opentelemetry")]
521impl From<OtlpSerializer> for Serializer {
522 fn from(serializer: OtlpSerializer) -> Self {
523 Self::Otlp(serializer)
524 }
525}
526
527impl From<ProtobufSerializer> for Serializer {
528 fn from(serializer: ProtobufSerializer) -> Self {
529 Self::Protobuf(serializer)
530 }
531}
532
533impl From<RawMessageSerializer> for Serializer {
534 fn from(serializer: RawMessageSerializer) -> Self {
535 Self::RawMessage(serializer)
536 }
537}
538
539impl From<TextSerializer> for Serializer {
540 fn from(serializer: TextSerializer) -> Self {
541 Self::Text(serializer)
542 }
543}
544
545impl tokio_util::codec::Encoder<Event> for Serializer {
546 type Error = vector_common::Error;
547
548 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
549 match self {
550 Serializer::Avro(serializer) => serializer.encode(event, buffer),
551 Serializer::Cef(serializer) => serializer.encode(event, buffer),
552 Serializer::Csv(serializer) => serializer.encode(event, buffer),
553 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
554 Serializer::Json(serializer) => serializer.encode(event, buffer),
555 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
556 Serializer::Native(serializer) => serializer.encode(event, buffer),
557 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
558 #[cfg(feature = "opentelemetry")]
559 Serializer::Otlp(serializer) => serializer.encode(event, buffer),
560 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
561 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
562 Serializer::Text(serializer) => serializer.encode(event, buffer),
563 }
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_serializer_config_default() {
573 let config = SerializerConfig::default();
575 assert!(matches!(config, SerializerConfig::Json(_)));
576 }
577
578 #[test]
579 fn test_serializer_is_binary() {
580 let json_config = JsonSerializerConfig::default();
582 let json_serializer = Serializer::Json(json_config.build());
583 assert!(!json_serializer.is_binary());
584
585 let native_serializer = Serializer::Native(NativeSerializerConfig.build());
586 assert!(native_serializer.is_binary());
587
588 let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
589 assert!(raw_message_serializer.is_binary());
590 }
591
592 #[test]
593 fn test_serializer_supports_json() {
594 let json_config = JsonSerializerConfig::default();
596 let json_serializer = Serializer::Json(json_config.build());
597 assert!(json_serializer.supports_json());
598
599 let text_config = TextSerializerConfig::default();
600 let text_serializer = Serializer::Text(text_config.build());
601 assert!(!text_serializer.supports_json());
602 }
603
604 #[test]
605 fn test_serializer_config_build() {
606 let config = SerializerConfig::Json(JsonSerializerConfig::default());
608 let serializer = config.build();
609 assert!(serializer.is_ok());
610 assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
611 }
612
613 #[test]
614 fn test_serializer_config_default_framing() {
615 let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
617 assert!(matches!(
618 json_config.default_stream_framing(),
619 FramingConfig::NewlineDelimited
620 ));
621
622 let native_config = SerializerConfig::Native;
623 assert!(matches!(
624 native_config.default_stream_framing(),
625 FramingConfig::LengthDelimited(_)
626 ));
627 }
628}