1pub mod format;
5pub mod framing;
6
7use std::fmt::Debug;
8
9use bytes::BytesMut;
10pub use format::{
11 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
12 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
13 JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
14 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
15 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
16 ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
17 TextSerializerConfig,
18};
19pub use framing::{
20 BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
21 CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
22 LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
23};
24use vector_config::configurable_component;
25use vector_core::{config::DataType, event::Event, schema};
26
27pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30#[derive(Debug)]
32pub enum Error {
33 FramingError(BoxedFramingError),
35 SerializingError(vector_common::Error),
37}
38
39impl std::fmt::Display for Error {
40 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
43 Self::SerializingError(error) => write!(formatter, "SerializingError({error})"),
44 }
45 }
46}
47
48impl std::error::Error for Error {}
49
50impl From<std::io::Error> for Error {
51 fn from(error: std::io::Error) -> Self {
52 Self::FramingError(Box::new(error))
53 }
54}
55
56#[configurable_component]
58#[derive(Clone, Debug, Eq, PartialEq)]
59#[serde(tag = "method", rename_all = "snake_case")]
60#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
61pub enum FramingConfig {
62 Bytes,
64
65 CharacterDelimited(CharacterDelimitedEncoderConfig),
67
68 LengthDelimited(LengthDelimitedEncoderConfig),
72
73 NewlineDelimited,
75}
76
77impl From<BytesEncoderConfig> for FramingConfig {
78 fn from(_: BytesEncoderConfig) -> Self {
79 Self::Bytes
80 }
81}
82
83impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
84 fn from(config: CharacterDelimitedEncoderConfig) -> Self {
85 Self::CharacterDelimited(config)
86 }
87}
88
89impl From<LengthDelimitedEncoderConfig> for FramingConfig {
90 fn from(config: LengthDelimitedEncoderConfig) -> Self {
91 Self::LengthDelimited(config)
92 }
93}
94
95impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
96 fn from(_: NewlineDelimitedEncoderConfig) -> Self {
97 Self::NewlineDelimited
98 }
99}
100
101impl FramingConfig {
102 pub fn build(&self) -> Framer {
104 match self {
105 FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
106 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
107 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
108 FramingConfig::NewlineDelimited => {
109 Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
110 }
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
117pub enum Framer {
118 Bytes(BytesEncoder),
120 CharacterDelimited(CharacterDelimitedEncoder),
122 LengthDelimited(LengthDelimitedEncoder),
124 NewlineDelimited(NewlineDelimitedEncoder),
126 Boxed(BoxedFramer),
128}
129
130impl From<BytesEncoder> for Framer {
131 fn from(encoder: BytesEncoder) -> Self {
132 Self::Bytes(encoder)
133 }
134}
135
136impl From<CharacterDelimitedEncoder> for Framer {
137 fn from(encoder: CharacterDelimitedEncoder) -> Self {
138 Self::CharacterDelimited(encoder)
139 }
140}
141
142impl From<LengthDelimitedEncoder> for Framer {
143 fn from(encoder: LengthDelimitedEncoder) -> Self {
144 Self::LengthDelimited(encoder)
145 }
146}
147
148impl From<NewlineDelimitedEncoder> for Framer {
149 fn from(encoder: NewlineDelimitedEncoder) -> Self {
150 Self::NewlineDelimited(encoder)
151 }
152}
153
154impl From<BoxedFramer> for Framer {
155 fn from(encoder: BoxedFramer) -> Self {
156 Self::Boxed(encoder)
157 }
158}
159
160impl tokio_util::codec::Encoder<()> for Framer {
161 type Error = BoxedFramingError;
162
163 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
164 match self {
165 Framer::Bytes(framer) => framer.encode((), buffer),
166 Framer::CharacterDelimited(framer) => framer.encode((), buffer),
167 Framer::LengthDelimited(framer) => framer.encode((), buffer),
168 Framer::NewlineDelimited(framer) => framer.encode((), buffer),
169 Framer::Boxed(framer) => framer.encode((), buffer),
170 }
171 }
172}
173
174#[configurable_component]
176#[derive(Clone, Debug)]
177#[serde(tag = "codec", rename_all = "snake_case")]
178#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
179pub enum SerializerConfig {
180 Avro {
184 avro: AvroSerializerOptions,
186 },
187
188 Cef(
191 CefSerializerConfig,
193 ),
194
195 Csv(CsvSerializerConfig),
200
201 Gelf,
218
219 Json(JsonSerializerConfig),
223
224 Logfmt,
228
229 Native,
236
237 NativeJson,
244
245 Protobuf(ProtobufSerializerConfig),
249
250 RawMessage,
258
259 Text(TextSerializerConfig),
268}
269
270impl From<AvroSerializerConfig> for SerializerConfig {
271 fn from(config: AvroSerializerConfig) -> Self {
272 Self::Avro { avro: config.avro }
273 }
274}
275
276impl From<CefSerializerConfig> for SerializerConfig {
277 fn from(config: CefSerializerConfig) -> Self {
278 Self::Cef(config)
279 }
280}
281
282impl From<CsvSerializerConfig> for SerializerConfig {
283 fn from(config: CsvSerializerConfig) -> Self {
284 Self::Csv(config)
285 }
286}
287
288impl From<GelfSerializerConfig> for SerializerConfig {
289 fn from(_: GelfSerializerConfig) -> Self {
290 Self::Gelf
291 }
292}
293
294impl From<JsonSerializerConfig> for SerializerConfig {
295 fn from(config: JsonSerializerConfig) -> Self {
296 Self::Json(config)
297 }
298}
299
300impl From<LogfmtSerializerConfig> for SerializerConfig {
301 fn from(_: LogfmtSerializerConfig) -> Self {
302 Self::Logfmt
303 }
304}
305
306impl From<NativeSerializerConfig> for SerializerConfig {
307 fn from(_: NativeSerializerConfig) -> Self {
308 Self::Native
309 }
310}
311
312impl From<NativeJsonSerializerConfig> for SerializerConfig {
313 fn from(_: NativeJsonSerializerConfig) -> Self {
314 Self::NativeJson
315 }
316}
317
318impl From<ProtobufSerializerConfig> for SerializerConfig {
319 fn from(config: ProtobufSerializerConfig) -> Self {
320 Self::Protobuf(config)
321 }
322}
323
324impl From<RawMessageSerializerConfig> for SerializerConfig {
325 fn from(_: RawMessageSerializerConfig) -> Self {
326 Self::RawMessage
327 }
328}
329
330impl From<TextSerializerConfig> for SerializerConfig {
331 fn from(config: TextSerializerConfig) -> Self {
332 Self::Text(config)
333 }
334}
335
336impl SerializerConfig {
337 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
339 match self {
340 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
341 AvroSerializerConfig::new(avro.schema.clone()).build()?,
342 )),
343 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
344 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
345 SerializerConfig::Gelf => Ok(Serializer::Gelf(GelfSerializerConfig::new().build())),
346 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
347 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
348 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
349 SerializerConfig::NativeJson => {
350 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
351 }
352 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
353 SerializerConfig::RawMessage => {
354 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
355 }
356 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
357 }
358 }
359
360 pub fn default_stream_framing(&self) -> FramingConfig {
362 match self {
363 SerializerConfig::Avro { .. }
375 | SerializerConfig::Native
376 | SerializerConfig::Protobuf(_) => {
377 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
378 }
379 SerializerConfig::Cef(_)
380 | SerializerConfig::Csv(_)
381 | SerializerConfig::Json(_)
382 | SerializerConfig::Logfmt
383 | SerializerConfig::NativeJson
384 | SerializerConfig::RawMessage
385 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
386 SerializerConfig::Gelf => {
387 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
388 }
389 }
390 }
391
392 pub fn input_type(&self) -> DataType {
394 match self {
395 SerializerConfig::Avro { avro } => {
396 AvroSerializerConfig::new(avro.schema.clone()).input_type()
397 }
398 SerializerConfig::Cef(config) => config.input_type(),
399 SerializerConfig::Csv(config) => config.input_type(),
400 SerializerConfig::Gelf => GelfSerializerConfig::input_type(),
401 SerializerConfig::Json(config) => config.input_type(),
402 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
403 SerializerConfig::Native => NativeSerializerConfig.input_type(),
404 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
405 SerializerConfig::Protobuf(config) => config.input_type(),
406 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
407 SerializerConfig::Text(config) => config.input_type(),
408 }
409 }
410
411 pub fn schema_requirement(&self) -> schema::Requirement {
413 match self {
414 SerializerConfig::Avro { avro } => {
415 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
416 }
417 SerializerConfig::Cef(config) => config.schema_requirement(),
418 SerializerConfig::Csv(config) => config.schema_requirement(),
419 SerializerConfig::Gelf => GelfSerializerConfig::schema_requirement(),
420 SerializerConfig::Json(config) => config.schema_requirement(),
421 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
422 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
423 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
424 SerializerConfig::Protobuf(config) => config.schema_requirement(),
425 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
426 SerializerConfig::Text(config) => config.schema_requirement(),
427 }
428 }
429}
430
431#[derive(Debug, Clone)]
433pub enum Serializer {
434 Avro(AvroSerializer),
436 Cef(CefSerializer),
438 Csv(CsvSerializer),
440 Gelf(GelfSerializer),
442 Json(JsonSerializer),
444 Logfmt(LogfmtSerializer),
446 Native(NativeSerializer),
448 NativeJson(NativeJsonSerializer),
450 Protobuf(ProtobufSerializer),
452 RawMessage(RawMessageSerializer),
454 Text(TextSerializer),
456}
457
458impl Serializer {
459 pub fn supports_json(&self) -> bool {
461 match self {
462 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
463 Serializer::Avro(_)
464 | Serializer::Cef(_)
465 | Serializer::Csv(_)
466 | Serializer::Logfmt(_)
467 | Serializer::Text(_)
468 | Serializer::Native(_)
469 | Serializer::Protobuf(_)
470 | Serializer::RawMessage(_) => false,
471 }
472 }
473
474 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
481 match self {
482 Serializer::Gelf(serializer) => serializer.to_json_value(event),
483 Serializer::Json(serializer) => serializer.to_json_value(event),
484 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
485 Serializer::Avro(_)
486 | Serializer::Cef(_)
487 | Serializer::Csv(_)
488 | Serializer::Logfmt(_)
489 | Serializer::Text(_)
490 | Serializer::Native(_)
491 | Serializer::Protobuf(_)
492 | Serializer::RawMessage(_) => {
493 panic!("Serializer does not support JSON")
494 }
495 }
496 }
497}
498
499impl From<AvroSerializer> for Serializer {
500 fn from(serializer: AvroSerializer) -> Self {
501 Self::Avro(serializer)
502 }
503}
504
505impl From<CefSerializer> for Serializer {
506 fn from(serializer: CefSerializer) -> Self {
507 Self::Cef(serializer)
508 }
509}
510
511impl From<CsvSerializer> for Serializer {
512 fn from(serializer: CsvSerializer) -> Self {
513 Self::Csv(serializer)
514 }
515}
516
517impl From<GelfSerializer> for Serializer {
518 fn from(serializer: GelfSerializer) -> Self {
519 Self::Gelf(serializer)
520 }
521}
522
523impl From<JsonSerializer> for Serializer {
524 fn from(serializer: JsonSerializer) -> Self {
525 Self::Json(serializer)
526 }
527}
528
529impl From<LogfmtSerializer> for Serializer {
530 fn from(serializer: LogfmtSerializer) -> Self {
531 Self::Logfmt(serializer)
532 }
533}
534
535impl From<NativeSerializer> for Serializer {
536 fn from(serializer: NativeSerializer) -> Self {
537 Self::Native(serializer)
538 }
539}
540
541impl From<NativeJsonSerializer> for Serializer {
542 fn from(serializer: NativeJsonSerializer) -> Self {
543 Self::NativeJson(serializer)
544 }
545}
546
547impl From<ProtobufSerializer> for Serializer {
548 fn from(serializer: ProtobufSerializer) -> Self {
549 Self::Protobuf(serializer)
550 }
551}
552
553impl From<RawMessageSerializer> for Serializer {
554 fn from(serializer: RawMessageSerializer) -> Self {
555 Self::RawMessage(serializer)
556 }
557}
558
559impl From<TextSerializer> for Serializer {
560 fn from(serializer: TextSerializer) -> Self {
561 Self::Text(serializer)
562 }
563}
564
565impl tokio_util::codec::Encoder<Event> for Serializer {
566 type Error = vector_common::Error;
567
568 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
569 match self {
570 Serializer::Avro(serializer) => serializer.encode(event, buffer),
571 Serializer::Cef(serializer) => serializer.encode(event, buffer),
572 Serializer::Csv(serializer) => serializer.encode(event, buffer),
573 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
574 Serializer::Json(serializer) => serializer.encode(event, buffer),
575 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
576 Serializer::Native(serializer) => serializer.encode(event, buffer),
577 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
578 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
579 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
580 Serializer::Text(serializer) => serializer.encode(event, buffer),
581 }
582 }
583}