vector/codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3#[cfg(feature = "codecs-arrow")]
4use vector_lib::codecs::encoding::ArrowStreamSerializer;
5use vector_lib::codecs::{
6    CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
7    encoding::{Error, Framer, Serializer},
8};
9
10use crate::{
11    event::Event,
12    internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15/// Serializers that support batch encoding (encoding all events at once).
16#[derive(Debug, Clone)]
17pub enum BatchSerializer {
18    /// Arrow IPC stream format serializer.
19    #[cfg(feature = "codecs-arrow")]
20    Arrow(ArrowStreamSerializer),
21}
22
23/// An encoder that encodes batches of events.
24#[derive(Debug, Clone)]
25pub struct BatchEncoder {
26    serializer: BatchSerializer,
27}
28
29impl BatchEncoder {
30    /// Creates a new `BatchEncoder` with the specified batch serializer.
31    pub const fn new(serializer: BatchSerializer) -> Self {
32        Self { serializer }
33    }
34
35    /// Get the batch serializer.
36    pub const fn serializer(&self) -> &BatchSerializer {
37        &self.serializer
38    }
39
40    /// Get the HTTP content type.
41    #[cfg(feature = "codecs-arrow")]
42    pub const fn content_type(&self) -> &'static str {
43        match &self.serializer {
44            BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
45        }
46    }
47}
48
49impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
50    type Error = Error;
51
52    #[allow(unused_variables)]
53    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
54        #[allow(unreachable_patterns)]
55        match &mut self.serializer {
56            #[cfg(feature = "codecs-arrow")]
57            BatchSerializer::Arrow(serializer) => {
58                serializer.encode(events, buffer).map_err(|err| {
59                    use vector_lib::codecs::encoding::ArrowEncodingError;
60                    match err {
61                        ArrowEncodingError::NullConstraint { .. } => {
62                            Error::SchemaConstraintViolation(Box::new(err))
63                        }
64                        _ => Error::SerializingError(Box::new(err)),
65                    }
66                })
67            }
68            _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
69        }
70    }
71}
72
73/// An wrapper that supports both framed and batch encoding modes.
74#[derive(Debug, Clone)]
75pub enum EncoderKind {
76    /// Uses framing to encode individual events
77    Framed(Box<Encoder<Framer>>),
78    /// Encodes events in batches without framing
79    #[cfg(feature = "codecs-arrow")]
80    Batch(BatchEncoder),
81}
82
83#[derive(Debug, Clone)]
84/// An encoder that can encode structured events into byte frames.
85pub struct Encoder<Framer>
86where
87    Framer: Clone,
88{
89    framer: Framer,
90    serializer: Serializer,
91}
92
93impl Default for Encoder<Framer> {
94    fn default() -> Self {
95        Self {
96            framer: NewlineDelimitedEncoder::default().into(),
97            serializer: TextSerializerConfig::default().build().into(),
98        }
99    }
100}
101
102impl Default for Encoder<()> {
103    fn default() -> Self {
104        Self {
105            framer: (),
106            serializer: TextSerializerConfig::default().build().into(),
107        }
108    }
109}
110
111impl<Framer> Encoder<Framer>
112where
113    Framer: Clone,
114{
115    /// Serialize the event without applying framing.
116    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
117        let len = buffer.len();
118        let mut payload = buffer.split_off(len);
119
120        self.serialize_at_start(event, &mut payload)?;
121
122        buffer.unsplit(payload);
123
124        Ok(())
125    }
126
127    /// Serialize the event without applying framing, at the start of the provided buffer.
128    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
129        self.serializer.encode(event, buffer).map_err(|error| {
130            emit!(EncoderSerializeError { error: &error });
131            Error::SerializingError(error)
132        })
133    }
134}
135
136impl Encoder<Framer> {
137    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
138    /// from a structured event, and the `Framer` to wrap these into a byte
139    /// frame.
140    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
141        Self { framer, serializer }
142    }
143
144    /// Get the framer.
145    pub const fn framer(&self) -> &Framer {
146        &self.framer
147    }
148
149    /// Get the serializer.
150    pub const fn serializer(&self) -> &Serializer {
151        &self.serializer
152    }
153
154    /// Get the prefix that encloses a batch of events.
155    pub const fn batch_prefix(&self) -> &[u8] {
156        match (&self.framer, &self.serializer) {
157            (
158                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
159                Serializer::Json(_) | Serializer::NativeJson(_),
160            ) => b"[",
161            _ => &[],
162        }
163    }
164
165    /// Get the suffix that encloses a batch of events.
166    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
167        match (&self.framer, &self.serializer, empty) {
168            (
169                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
170                Serializer::Json(_) | Serializer::NativeJson(_),
171                _,
172            ) => b"]",
173            (Framer::NewlineDelimited(_), _, false) => b"\n",
174            _ => &[],
175        }
176    }
177
178    /// Get the HTTP content type.
179    pub const fn content_type(&self) -> &'static str {
180        match (&self.serializer, &self.framer) {
181            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
182                "application/x-ndjson"
183            }
184            (
185                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
186                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
187            ) => "application/json",
188            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
189            (
190                Serializer::Avro(_)
191                | Serializer::Cef(_)
192                | Serializer::Csv(_)
193                | Serializer::Gelf(_)
194                | Serializer::Json(_)
195                | Serializer::Logfmt(_)
196                | Serializer::NativeJson(_)
197                | Serializer::RawMessage(_)
198                | Serializer::Text(_),
199                _,
200            ) => "text/plain",
201            #[cfg(feature = "codecs-opentelemetry")]
202            (Serializer::Otlp(_), _) => "application/x-protobuf",
203        }
204    }
205}
206
207impl Encoder<()> {
208    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
209    /// from a structured event.
210    pub const fn new(serializer: Serializer) -> Self {
211        Self {
212            framer: (),
213            serializer,
214        }
215    }
216
217    /// Get the serializer.
218    pub const fn serializer(&self) -> &Serializer {
219        &self.serializer
220    }
221}
222
223impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
224    type Error = Error;
225
226    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
227        let len = buffer.len();
228        let mut payload = buffer.split_off(len);
229
230        self.serialize_at_start(event, &mut payload)?;
231
232        // Frame the serialized event.
233        self.framer.encode((), &mut payload).map_err(|error| {
234            emit!(EncoderFramingError { error: &error });
235            Error::FramingError(error)
236        })?;
237
238        buffer.unsplit(payload);
239
240        Ok(())
241    }
242}
243
244impl tokio_util::codec::Encoder<Event> for Encoder<()> {
245    type Error = Error;
246
247    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
248        let len = buffer.len();
249        let mut payload = buffer.split_off(len);
250
251        self.serialize_at_start(event, &mut payload)?;
252
253        buffer.unsplit(payload);
254
255        Ok(())
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use bytes::BufMut;
262    use futures_util::{SinkExt, StreamExt};
263    use tokio_util::codec::FramedWrite;
264    use vector_lib::{codecs::encoding::BoxedFramingError, event::LogEvent};
265
266    use super::*;
267
268    #[derive(Debug, Clone)]
269    struct ParenEncoder;
270
271    impl ParenEncoder {
272        pub(super) const fn new() -> Self {
273            Self
274        }
275    }
276
277    impl tokio_util::codec::Encoder<()> for ParenEncoder {
278        type Error = BoxedFramingError;
279
280        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
281            dst.reserve(2);
282            let inner = dst.split();
283            dst.put_u8(b'(');
284            dst.unsplit(inner);
285            dst.put_u8(b')');
286            Ok(())
287        }
288    }
289
290    #[derive(Debug, Clone)]
291    struct ErrorNthEncoder<T>(T, usize, usize)
292    where
293        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
294
295    impl<T> ErrorNthEncoder<T>
296    where
297        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
298    {
299        pub(super) const fn new(encoder: T, n: usize) -> Self {
300            Self(encoder, 0, n)
301        }
302    }
303
304    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
305    where
306        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
307    {
308        type Error = BoxedFramingError;
309
310        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
311            self.0.encode((), dst)?;
312            let result = if self.1 == self.2 {
313                Err(Box::new(std::io::Error::other("error")) as _)
314            } else {
315                Ok(())
316            };
317            self.1 += 1;
318            result
319        }
320    }
321
322    #[tokio::test]
323    async fn test_encode_events_sink_empty() {
324        let encoder = Encoder::<Framer>::new(
325            Framer::Boxed(Box::new(ParenEncoder::new())),
326            TextSerializerConfig::default().build().into(),
327        );
328        let source = futures::stream::iter(vec![
329            Event::Log(LogEvent::from("foo")),
330            Event::Log(LogEvent::from("bar")),
331            Event::Log(LogEvent::from("baz")),
332        ])
333        .map(Ok);
334        let sink = Vec::new();
335        let mut framed = FramedWrite::new(sink, encoder);
336        source.forward(&mut framed).await.unwrap();
337        let sink = framed.into_inner();
338        assert_eq!(sink, b"(foo)(bar)(baz)");
339    }
340
341    #[tokio::test]
342    async fn test_encode_events_sink_non_empty() {
343        let encoder = Encoder::<Framer>::new(
344            Framer::Boxed(Box::new(ParenEncoder::new())),
345            TextSerializerConfig::default().build().into(),
346        );
347        let source = futures::stream::iter(vec![
348            Event::Log(LogEvent::from("bar")),
349            Event::Log(LogEvent::from("baz")),
350            Event::Log(LogEvent::from("bat")),
351        ])
352        .map(Ok);
353        let sink = Vec::from("(foo)");
354        let mut framed = FramedWrite::new(sink, encoder);
355        source.forward(&mut framed).await.unwrap();
356        let sink = framed.into_inner();
357        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
358    }
359
360    #[tokio::test]
361    async fn test_encode_events_sink_empty_handle_framing_error() {
362        let encoder = Encoder::<Framer>::new(
363            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
364            TextSerializerConfig::default().build().into(),
365        );
366        let source = futures::stream::iter(vec![
367            Event::Log(LogEvent::from("foo")),
368            Event::Log(LogEvent::from("bar")),
369            Event::Log(LogEvent::from("baz")),
370        ])
371        .map(Ok);
372        let sink = Vec::new();
373        let mut framed = FramedWrite::new(sink, encoder);
374        assert!(source.forward(&mut framed).await.is_err());
375        framed.flush().await.unwrap();
376        let sink = framed.into_inner();
377        assert_eq!(sink, b"(foo)");
378    }
379
380    #[tokio::test]
381    async fn test_encode_events_sink_non_empty_handle_framing_error() {
382        let encoder = Encoder::<Framer>::new(
383            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
384            TextSerializerConfig::default().build().into(),
385        );
386        let source = futures::stream::iter(vec![
387            Event::Log(LogEvent::from("bar")),
388            Event::Log(LogEvent::from("baz")),
389            Event::Log(LogEvent::from("bat")),
390        ])
391        .map(Ok);
392        let sink = Vec::from("(foo)");
393        let mut framed = FramedWrite::new(sink, encoder);
394        assert!(source.forward(&mut framed).await.is_err());
395        framed.flush().await.unwrap();
396        let sink = framed.into_inner();
397        assert_eq!(sink, b"(foo)(bar)");
398    }
399
400    #[tokio::test]
401    async fn test_encode_batch_newline() {
402        let encoder = Encoder::<Framer>::new(
403            Framer::NewlineDelimited(NewlineDelimitedEncoder::default()),
404            TextSerializerConfig::default().build().into(),
405        );
406        let source = futures::stream::iter(vec![
407            Event::Log(LogEvent::from("bar")),
408            Event::Log(LogEvent::from("baz")),
409            Event::Log(LogEvent::from("bat")),
410        ])
411        .map(Ok);
412        let sink: Vec<u8> = Vec::new();
413        let mut framed = FramedWrite::new(sink, encoder);
414        source.forward(&mut framed).await.unwrap();
415        let sink = framed.into_inner();
416        assert_eq!(sink, b"bar\nbaz\nbat\n");
417    }
418}