1pub mod chunking;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::BytesMut;
11pub use chunking::{Chunker, Chunking, GelfChunker};
12pub use format::{
13 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
14 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
15 JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
16 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
17 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
18 ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
19 TextSerializerConfig,
20};
21pub use framing::{
22 BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
23 CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
24 LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
25 VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig,
26};
27use vector_config::configurable_component;
28use vector_core::{config::DataType, event::Event, schema};
29
30pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
32
33#[derive(Debug)]
35pub enum Error {
36 FramingError(BoxedFramingError),
38 SerializingError(vector_common::Error),
40}
41
42impl std::fmt::Display for Error {
43 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
46 Self::SerializingError(error) => write!(formatter, "SerializingError({error})"),
47 }
48 }
49}
50
51impl std::error::Error for Error {}
52
53impl From<std::io::Error> for Error {
54 fn from(error: std::io::Error) -> Self {
55 Self::FramingError(Box::new(error))
56 }
57}
58
59#[configurable_component]
61#[derive(Clone, Debug, Eq, PartialEq)]
62#[serde(tag = "method", rename_all = "snake_case")]
63#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
64pub enum FramingConfig {
65 Bytes,
67
68 CharacterDelimited(CharacterDelimitedEncoderConfig),
70
71 LengthDelimited(LengthDelimitedEncoderConfig),
75
76 NewlineDelimited,
78
79 VarintLengthDelimited(VarintLengthDelimitedEncoderConfig),
83}
84
85impl From<BytesEncoderConfig> for FramingConfig {
86 fn from(_: BytesEncoderConfig) -> Self {
87 Self::Bytes
88 }
89}
90
91impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
92 fn from(config: CharacterDelimitedEncoderConfig) -> Self {
93 Self::CharacterDelimited(config)
94 }
95}
96
97impl From<LengthDelimitedEncoderConfig> for FramingConfig {
98 fn from(config: LengthDelimitedEncoderConfig) -> Self {
99 Self::LengthDelimited(config)
100 }
101}
102
103impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
104 fn from(_: NewlineDelimitedEncoderConfig) -> Self {
105 Self::NewlineDelimited
106 }
107}
108
109impl From<VarintLengthDelimitedEncoderConfig> for FramingConfig {
110 fn from(config: VarintLengthDelimitedEncoderConfig) -> Self {
111 Self::VarintLengthDelimited(config)
112 }
113}
114
115impl FramingConfig {
116 pub fn build(&self) -> Framer {
118 match self {
119 FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
120 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
121 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
122 FramingConfig::NewlineDelimited => {
123 Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
124 }
125 FramingConfig::VarintLengthDelimited(config) => {
126 Framer::VarintLengthDelimited(config.build())
127 }
128 }
129 }
130}
131
132#[derive(Debug, Clone)]
134pub enum Framer {
135 Bytes(BytesEncoder),
137 CharacterDelimited(CharacterDelimitedEncoder),
139 LengthDelimited(LengthDelimitedEncoder),
141 NewlineDelimited(NewlineDelimitedEncoder),
143 VarintLengthDelimited(VarintLengthDelimitedEncoder),
145 Boxed(BoxedFramer),
147}
148
149impl From<BytesEncoder> for Framer {
150 fn from(encoder: BytesEncoder) -> Self {
151 Self::Bytes(encoder)
152 }
153}
154
155impl From<CharacterDelimitedEncoder> for Framer {
156 fn from(encoder: CharacterDelimitedEncoder) -> Self {
157 Self::CharacterDelimited(encoder)
158 }
159}
160
161impl From<LengthDelimitedEncoder> for Framer {
162 fn from(encoder: LengthDelimitedEncoder) -> Self {
163 Self::LengthDelimited(encoder)
164 }
165}
166
167impl From<NewlineDelimitedEncoder> for Framer {
168 fn from(encoder: NewlineDelimitedEncoder) -> Self {
169 Self::NewlineDelimited(encoder)
170 }
171}
172
173impl From<VarintLengthDelimitedEncoder> for Framer {
174 fn from(encoder: VarintLengthDelimitedEncoder) -> Self {
175 Self::VarintLengthDelimited(encoder)
176 }
177}
178
179impl From<BoxedFramer> for Framer {
180 fn from(encoder: BoxedFramer) -> Self {
181 Self::Boxed(encoder)
182 }
183}
184
185impl tokio_util::codec::Encoder<()> for Framer {
186 type Error = BoxedFramingError;
187
188 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
189 match self {
190 Framer::Bytes(framer) => framer.encode((), buffer),
191 Framer::CharacterDelimited(framer) => framer.encode((), buffer),
192 Framer::LengthDelimited(framer) => framer.encode((), buffer),
193 Framer::NewlineDelimited(framer) => framer.encode((), buffer),
194 Framer::VarintLengthDelimited(framer) => framer.encode((), buffer),
195 Framer::Boxed(framer) => framer.encode((), buffer),
196 }
197 }
198}
199
200#[configurable_component]
202#[derive(Clone, Debug)]
203#[serde(tag = "codec", rename_all = "snake_case")]
204#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
205pub enum SerializerConfig {
206 Avro {
210 avro: AvroSerializerOptions,
212 },
213
214 Cef(
217 CefSerializerConfig,
219 ),
220
221 Csv(CsvSerializerConfig),
226
227 Gelf(GelfSerializerConfig),
244
245 Json(JsonSerializerConfig),
249
250 Logfmt,
254
255 Native,
262
263 NativeJson,
270
271 Protobuf(ProtobufSerializerConfig),
275
276 RawMessage,
284
285 Text(TextSerializerConfig),
294}
295
296impl From<AvroSerializerConfig> for SerializerConfig {
297 fn from(config: AvroSerializerConfig) -> Self {
298 Self::Avro { avro: config.avro }
299 }
300}
301
302impl From<CefSerializerConfig> for SerializerConfig {
303 fn from(config: CefSerializerConfig) -> Self {
304 Self::Cef(config)
305 }
306}
307
308impl From<CsvSerializerConfig> for SerializerConfig {
309 fn from(config: CsvSerializerConfig) -> Self {
310 Self::Csv(config)
311 }
312}
313
314impl From<GelfSerializerConfig> for SerializerConfig {
315 fn from(config: GelfSerializerConfig) -> Self {
316 Self::Gelf(config)
317 }
318}
319
320impl From<JsonSerializerConfig> for SerializerConfig {
321 fn from(config: JsonSerializerConfig) -> Self {
322 Self::Json(config)
323 }
324}
325
326impl From<LogfmtSerializerConfig> for SerializerConfig {
327 fn from(_: LogfmtSerializerConfig) -> Self {
328 Self::Logfmt
329 }
330}
331
332impl From<NativeSerializerConfig> for SerializerConfig {
333 fn from(_: NativeSerializerConfig) -> Self {
334 Self::Native
335 }
336}
337
338impl From<NativeJsonSerializerConfig> for SerializerConfig {
339 fn from(_: NativeJsonSerializerConfig) -> Self {
340 Self::NativeJson
341 }
342}
343
344impl From<ProtobufSerializerConfig> for SerializerConfig {
345 fn from(config: ProtobufSerializerConfig) -> Self {
346 Self::Protobuf(config)
347 }
348}
349
350impl From<RawMessageSerializerConfig> for SerializerConfig {
351 fn from(_: RawMessageSerializerConfig) -> Self {
352 Self::RawMessage
353 }
354}
355
356impl From<TextSerializerConfig> for SerializerConfig {
357 fn from(config: TextSerializerConfig) -> Self {
358 Self::Text(config)
359 }
360}
361
362impl SerializerConfig {
363 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
365 match self {
366 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
367 AvroSerializerConfig::new(avro.schema.clone()).build()?,
368 )),
369 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
370 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
371 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
372 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
373 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
374 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
375 SerializerConfig::NativeJson => {
376 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
377 }
378 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
379 SerializerConfig::RawMessage => {
380 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
381 }
382 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
383 }
384 }
385
386 pub fn default_stream_framing(&self) -> FramingConfig {
388 match self {
389 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
401 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
402 }
403 SerializerConfig::Protobuf(_) => {
404 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
405 }
406 SerializerConfig::Cef(_)
407 | SerializerConfig::Csv(_)
408 | SerializerConfig::Json(_)
409 | SerializerConfig::Logfmt
410 | SerializerConfig::NativeJson
411 | SerializerConfig::RawMessage
412 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
413 SerializerConfig::Gelf(_) => {
414 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
415 }
416 }
417 }
418
419 pub fn input_type(&self) -> DataType {
421 match self {
422 SerializerConfig::Avro { avro } => {
423 AvroSerializerConfig::new(avro.schema.clone()).input_type()
424 }
425 SerializerConfig::Cef(config) => config.input_type(),
426 SerializerConfig::Csv(config) => config.input_type(),
427 SerializerConfig::Gelf(config) => config.input_type(),
428 SerializerConfig::Json(config) => config.input_type(),
429 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
430 SerializerConfig::Native => NativeSerializerConfig.input_type(),
431 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
432 SerializerConfig::Protobuf(config) => config.input_type(),
433 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
434 SerializerConfig::Text(config) => config.input_type(),
435 }
436 }
437
438 pub fn schema_requirement(&self) -> schema::Requirement {
440 match self {
441 SerializerConfig::Avro { avro } => {
442 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
443 }
444 SerializerConfig::Cef(config) => config.schema_requirement(),
445 SerializerConfig::Csv(config) => config.schema_requirement(),
446 SerializerConfig::Gelf(config) => config.schema_requirement(),
447 SerializerConfig::Json(config) => config.schema_requirement(),
448 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
449 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
450 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
451 SerializerConfig::Protobuf(config) => config.schema_requirement(),
452 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
453 SerializerConfig::Text(config) => config.schema_requirement(),
454 }
455 }
456}
457
458#[derive(Debug, Clone)]
460pub enum Serializer {
461 Avro(AvroSerializer),
463 Cef(CefSerializer),
465 Csv(CsvSerializer),
467 Gelf(GelfSerializer),
469 Json(JsonSerializer),
471 Logfmt(LogfmtSerializer),
473 Native(NativeSerializer),
475 NativeJson(NativeJsonSerializer),
477 Protobuf(ProtobufSerializer),
479 RawMessage(RawMessageSerializer),
481 Text(TextSerializer),
483}
484
485impl Serializer {
486 pub fn supports_json(&self) -> bool {
488 match self {
489 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
490 Serializer::Avro(_)
491 | Serializer::Cef(_)
492 | Serializer::Csv(_)
493 | Serializer::Logfmt(_)
494 | Serializer::Text(_)
495 | Serializer::Native(_)
496 | Serializer::Protobuf(_)
497 | Serializer::RawMessage(_) => false,
498 }
499 }
500
501 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
508 match self {
509 Serializer::Gelf(serializer) => serializer.to_json_value(event),
510 Serializer::Json(serializer) => serializer.to_json_value(event),
511 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
512 Serializer::Avro(_)
513 | Serializer::Cef(_)
514 | Serializer::Csv(_)
515 | Serializer::Logfmt(_)
516 | Serializer::Text(_)
517 | Serializer::Native(_)
518 | Serializer::Protobuf(_)
519 | Serializer::RawMessage(_) => {
520 panic!("Serializer does not support JSON")
521 }
522 }
523 }
524
525 pub fn chunker(&self) -> Option<Chunker> {
527 match self {
528 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
529 _ => None,
530 }
531 }
532}
533
534impl From<AvroSerializer> for Serializer {
535 fn from(serializer: AvroSerializer) -> Self {
536 Self::Avro(serializer)
537 }
538}
539
540impl From<CefSerializer> for Serializer {
541 fn from(serializer: CefSerializer) -> Self {
542 Self::Cef(serializer)
543 }
544}
545
546impl From<CsvSerializer> for Serializer {
547 fn from(serializer: CsvSerializer) -> Self {
548 Self::Csv(serializer)
549 }
550}
551
552impl From<GelfSerializer> for Serializer {
553 fn from(serializer: GelfSerializer) -> Self {
554 Self::Gelf(serializer)
555 }
556}
557
558impl From<JsonSerializer> for Serializer {
559 fn from(serializer: JsonSerializer) -> Self {
560 Self::Json(serializer)
561 }
562}
563
564impl From<LogfmtSerializer> for Serializer {
565 fn from(serializer: LogfmtSerializer) -> Self {
566 Self::Logfmt(serializer)
567 }
568}
569
570impl From<NativeSerializer> for Serializer {
571 fn from(serializer: NativeSerializer) -> Self {
572 Self::Native(serializer)
573 }
574}
575
576impl From<NativeJsonSerializer> for Serializer {
577 fn from(serializer: NativeJsonSerializer) -> Self {
578 Self::NativeJson(serializer)
579 }
580}
581
582impl From<ProtobufSerializer> for Serializer {
583 fn from(serializer: ProtobufSerializer) -> Self {
584 Self::Protobuf(serializer)
585 }
586}
587
588impl From<RawMessageSerializer> for Serializer {
589 fn from(serializer: RawMessageSerializer) -> Self {
590 Self::RawMessage(serializer)
591 }
592}
593
594impl From<TextSerializer> for Serializer {
595 fn from(serializer: TextSerializer) -> Self {
596 Self::Text(serializer)
597 }
598}
599
600impl tokio_util::codec::Encoder<Event> for Serializer {
601 type Error = vector_common::Error;
602
603 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
604 match self {
605 Serializer::Avro(serializer) => serializer.encode(event, buffer),
606 Serializer::Cef(serializer) => serializer.encode(event, buffer),
607 Serializer::Csv(serializer) => serializer.encode(event, buffer),
608 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
609 Serializer::Json(serializer) => serializer.encode(event, buffer),
610 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
611 Serializer::Native(serializer) => serializer.encode(event, buffer),
612 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
613 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
614 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
615 Serializer::Text(serializer) => serializer.encode(event, buffer),
616 }
617 }
618}