codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8#[cfg(feature = "parquet")]
9use crate::encoding::ParquetSerializer;
10use crate::{
11    encoding::{Error, Framer, Serializer},
12    internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15/// Serializers that support batch encoding (encoding all events at once).
16#[derive(Debug, Clone)]
17pub enum BatchSerializer {
18    /// Arrow IPC stream format serializer.
19    #[cfg(feature = "arrow")]
20    Arrow(ArrowStreamSerializer),
21    /// Parquet format serializer.
22    #[cfg(feature = "parquet")]
23    Parquet(Box<ParquetSerializer>),
24}
25
26/// An encoder that encodes batches of events.
27#[derive(Debug, Clone)]
28pub struct BatchEncoder {
29    serializer: BatchSerializer,
30}
31
32impl BatchEncoder {
33    /// Creates a new `BatchEncoder` with the specified batch serializer.
34    pub const fn new(serializer: BatchSerializer) -> Self {
35        Self { serializer }
36    }
37
38    /// Get the batch serializer.
39    pub const fn serializer(&self) -> &BatchSerializer {
40        &self.serializer
41    }
42
43    /// Get the HTTP content type.
44    #[cfg(any(feature = "arrow", feature = "parquet"))]
45    pub const fn content_type(&self) -> &'static str {
46        match &self.serializer {
47            BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
48            #[cfg(feature = "parquet")]
49            BatchSerializer::Parquet(_) => "application/vnd.apache.parquet",
50        }
51    }
52}
53
54impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
55    type Error = Error;
56
57    #[allow(unused_variables)]
58    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
59        #[allow(unreachable_patterns)]
60        match &mut self.serializer {
61            #[cfg(feature = "arrow")]
62            BatchSerializer::Arrow(serializer) => {
63                serializer.encode(events, buffer).map_err(|err| {
64                    use crate::encoding::ArrowEncodingError;
65                    match err {
66                        ArrowEncodingError::NullConstraint { .. } => {
67                            Error::SchemaConstraintViolation(Box::new(err))
68                        }
69                        _ => Error::SerializingError(Box::new(err)),
70                    }
71                })
72            }
73            #[cfg(feature = "parquet")]
74            BatchSerializer::Parquet(serializer) => serializer
75                .encode(events, buffer)
76                .map_err(Error::SerializingError),
77            #[allow(unreachable_patterns)]
78            _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
79        }
80    }
81}
82
83/// An wrapper that supports both framed and batch encoding modes.
84#[derive(Debug, Clone)]
85pub enum EncoderKind {
86    /// Uses framing to encode individual events
87    Framed(Box<Encoder<Framer>>),
88    /// Encodes events in batches without framing
89    #[cfg(any(feature = "arrow", feature = "parquet"))]
90    Batch(BatchEncoder),
91}
92
93#[derive(Debug, Clone)]
94/// An encoder that can encode structured events into byte frames.
95pub struct Encoder<Framer>
96where
97    Framer: Clone,
98{
99    framer: Framer,
100    serializer: Serializer,
101}
102
103impl Default for Encoder<Framer> {
104    fn default() -> Self {
105        use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
106
107        Self {
108            framer: NewlineDelimitedEncoder::default().into(),
109            serializer: TextSerializerConfig::default().build().into(),
110        }
111    }
112}
113
114impl Default for Encoder<()> {
115    fn default() -> Self {
116        use crate::encoding::TextSerializerConfig;
117
118        Self {
119            framer: (),
120            serializer: TextSerializerConfig::default().build().into(),
121        }
122    }
123}
124
125impl<Framer> Encoder<Framer>
126where
127    Framer: Clone,
128{
129    /// Serialize the event without applying framing.
130    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
131        let len = buffer.len();
132        let mut payload = buffer.split_off(len);
133
134        self.serialize_at_start(event, &mut payload)?;
135
136        buffer.unsplit(payload);
137
138        Ok(())
139    }
140
141    /// Serialize the event without applying framing, at the start of the provided buffer.
142    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
143        self.serializer.encode(event, buffer).map_err(|error| {
144            emit(EncoderSerializeError { error: &error });
145            Error::SerializingError(error)
146        })
147    }
148}
149
150impl Encoder<Framer> {
151    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
152    /// from a structured event, and the `Framer` to wrap these into a byte
153    /// frame.
154    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
155        Self { framer, serializer }
156    }
157
158    /// Get the framer.
159    pub const fn framer(&self) -> &Framer {
160        &self.framer
161    }
162
163    /// Get the serializer.
164    pub const fn serializer(&self) -> &Serializer {
165        &self.serializer
166    }
167
168    /// Get the prefix that encloses a batch of events.
169    pub const fn batch_prefix(&self) -> &[u8] {
170        match (&self.framer, &self.serializer) {
171            (
172                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
173                    delimiter: b',',
174                }),
175                Serializer::Json(_) | Serializer::NativeJson(_),
176            ) => b"[",
177            _ => &[],
178        }
179    }
180
181    /// Get the suffix that encloses a batch of events.
182    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
183        match (&self.framer, &self.serializer, empty) {
184            (
185                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
186                    delimiter: b',',
187                }),
188                Serializer::Json(_) | Serializer::NativeJson(_),
189                _,
190            ) => b"]",
191            (Framer::NewlineDelimited(_), _, false) => b"\n",
192            _ => &[],
193        }
194    }
195
196    /// Get the HTTP content type.
197    pub const fn content_type(&self) -> &'static str {
198        match (&self.serializer, &self.framer) {
199            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
200                "application/x-ndjson"
201            }
202            (
203                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
204                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
205                    delimiter: b',',
206                }),
207            ) => "application/json",
208            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
209            (
210                Serializer::Avro(_)
211                | Serializer::Cef(_)
212                | Serializer::Csv(_)
213                | Serializer::Gelf(_)
214                | Serializer::Json(_)
215                | Serializer::Logfmt(_)
216                | Serializer::NativeJson(_)
217                | Serializer::RawMessage(_)
218                | Serializer::Text(_),
219                _,
220            ) => "text/plain",
221            #[cfg(feature = "syslog")]
222            (Serializer::Syslog(_), _) => "text/plain",
223            #[cfg(feature = "opentelemetry")]
224            (Serializer::Otlp(_), _) => "application/x-protobuf",
225        }
226    }
227}
228
229impl Encoder<()> {
230    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
231    /// from a structured event.
232    pub const fn new(serializer: Serializer) -> Self {
233        Self {
234            framer: (),
235            serializer,
236        }
237    }
238
239    /// Get the serializer.
240    pub const fn serializer(&self) -> &Serializer {
241        &self.serializer
242    }
243}
244
245impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
246    type Error = Error;
247
248    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
249        let len = buffer.len();
250        let mut payload = buffer.split_off(len);
251
252        self.serialize_at_start(event, &mut payload)?;
253
254        // Frame the serialized event.
255        self.framer.encode((), &mut payload).map_err(|error| {
256            emit(EncoderFramingError { error: &error });
257            Error::FramingError(error)
258        })?;
259
260        buffer.unsplit(payload);
261
262        Ok(())
263    }
264}
265
266impl tokio_util::codec::Encoder<Event> for Encoder<()> {
267    type Error = Error;
268
269    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
270        let len = buffer.len();
271        let mut payload = buffer.split_off(len);
272
273        self.serialize_at_start(event, &mut payload)?;
274
275        buffer.unsplit(payload);
276
277        Ok(())
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use bytes::BufMut;
284    use futures::{SinkExt, StreamExt};
285    use tokio_util::codec::FramedWrite;
286    use vector_core::event::LogEvent;
287
288    use super::*;
289    use crate::encoding::BoxedFramingError;
290
291    #[derive(Debug, Clone)]
292    struct ParenEncoder;
293
294    impl ParenEncoder {
295        pub(super) const fn new() -> Self {
296            Self
297        }
298    }
299
300    impl tokio_util::codec::Encoder<()> for ParenEncoder {
301        type Error = BoxedFramingError;
302
303        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
304            dst.reserve(2);
305            let inner = dst.split();
306            dst.put_u8(b'(');
307            dst.unsplit(inner);
308            dst.put_u8(b')');
309            Ok(())
310        }
311    }
312
313    #[derive(Debug, Clone)]
314    struct ErrorNthEncoder<T>(T, usize, usize)
315    where
316        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
317
318    impl<T> ErrorNthEncoder<T>
319    where
320        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
321    {
322        pub(super) const fn new(encoder: T, n: usize) -> Self {
323            Self(encoder, 0, n)
324        }
325    }
326
327    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
328    where
329        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
330    {
331        type Error = BoxedFramingError;
332
333        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
334            self.0.encode((), dst)?;
335            let result = if self.1 == self.2 {
336                Err(Box::new(std::io::Error::other("error")) as _)
337            } else {
338                Ok(())
339            };
340            self.1 += 1;
341            result
342        }
343    }
344
345    #[tokio::test]
346    async fn test_encode_events_sink_empty() {
347        let encoder = Encoder::<Framer>::new(
348            Framer::Boxed(Box::new(ParenEncoder::new())),
349            crate::encoding::TextSerializerConfig::default()
350                .build()
351                .into(),
352        );
353        let source = futures::stream::iter(vec![
354            Event::Log(LogEvent::from("foo")),
355            Event::Log(LogEvent::from("bar")),
356            Event::Log(LogEvent::from("baz")),
357        ])
358        .map(Ok);
359        let sink = Vec::new();
360        let mut framed = FramedWrite::new(sink, encoder);
361        source.forward(&mut framed).await.unwrap();
362        let sink = framed.into_inner();
363        assert_eq!(sink, b"(foo)(bar)(baz)");
364    }
365
366    #[tokio::test]
367    async fn test_encode_events_sink_non_empty() {
368        let encoder = Encoder::<Framer>::new(
369            Framer::Boxed(Box::new(ParenEncoder::new())),
370            crate::encoding::TextSerializerConfig::default()
371                .build()
372                .into(),
373        );
374        let source = futures::stream::iter(vec![
375            Event::Log(LogEvent::from("bar")),
376            Event::Log(LogEvent::from("baz")),
377            Event::Log(LogEvent::from("bat")),
378        ])
379        .map(Ok);
380        let sink = Vec::from("(foo)");
381        let mut framed = FramedWrite::new(sink, encoder);
382        source.forward(&mut framed).await.unwrap();
383        let sink = framed.into_inner();
384        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
385    }
386
387    #[tokio::test]
388    async fn test_encode_events_sink_empty_handle_framing_error() {
389        let encoder = Encoder::<Framer>::new(
390            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
391            crate::encoding::TextSerializerConfig::default()
392                .build()
393                .into(),
394        );
395        let source = futures::stream::iter(vec![
396            Event::Log(LogEvent::from("foo")),
397            Event::Log(LogEvent::from("bar")),
398            Event::Log(LogEvent::from("baz")),
399        ])
400        .map(Ok);
401        let sink = Vec::new();
402        let mut framed = FramedWrite::new(sink, encoder);
403        assert!(source.forward(&mut framed).await.is_err());
404        framed.flush().await.unwrap();
405        let sink = framed.into_inner();
406        assert_eq!(sink, b"(foo)");
407    }
408
409    #[tokio::test]
410    async fn test_encode_events_sink_non_empty_handle_framing_error() {
411        let encoder = Encoder::<Framer>::new(
412            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
413            crate::encoding::TextSerializerConfig::default()
414                .build()
415                .into(),
416        );
417        let source = futures::stream::iter(vec![
418            Event::Log(LogEvent::from("bar")),
419            Event::Log(LogEvent::from("baz")),
420            Event::Log(LogEvent::from("bat")),
421        ])
422        .map(Ok);
423        let sink = Vec::from("(foo)");
424        let mut framed = FramedWrite::new(sink, encoder);
425        assert!(source.forward(&mut framed).await.is_err());
426        framed.flush().await.unwrap();
427        let sink = framed.into_inner();
428        assert_eq!(sink, b"(foo)(bar)");
429    }
430
431    #[tokio::test]
432    async fn test_encode_batch_newline() {
433        let encoder = Encoder::<Framer>::new(
434            Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
435            crate::encoding::TextSerializerConfig::default()
436                .build()
437                .into(),
438        );
439        let source = futures::stream::iter(vec![
440            Event::Log(LogEvent::from("bar")),
441            Event::Log(LogEvent::from("baz")),
442            Event::Log(LogEvent::from("bat")),
443        ])
444        .map(Ok);
445        let sink: Vec<u8> = Vec::new();
446        let mut framed = FramedWrite::new(sink, encoder);
447        source.forward(&mut framed).await.unwrap();
448        let sink = framed.into_inner();
449        assert_eq!(sink, b"bar\nbaz\nbat\n");
450    }
451}