codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8use crate::{
9    encoding::{Error, Framer, Serializer},
10    internal_events::{EncoderFramingError, EncoderSerializeError},
11};
12
13/// Serializers that support batch encoding (encoding all events at once).
14#[derive(Debug, Clone)]
15pub enum BatchSerializer {
16    /// Arrow IPC stream format serializer.
17    #[cfg(feature = "arrow")]
18    Arrow(ArrowStreamSerializer),
19}
20
21/// An encoder that encodes batches of events.
22#[derive(Debug, Clone)]
23pub struct BatchEncoder {
24    serializer: BatchSerializer,
25}
26
27impl BatchEncoder {
28    /// Creates a new `BatchEncoder` with the specified batch serializer.
29    pub const fn new(serializer: BatchSerializer) -> Self {
30        Self { serializer }
31    }
32
33    /// Get the batch serializer.
34    pub const fn serializer(&self) -> &BatchSerializer {
35        &self.serializer
36    }
37
38    /// Get the HTTP content type.
39    #[cfg(feature = "arrow")]
40    pub const fn content_type(&self) -> &'static str {
41        match &self.serializer {
42            BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
43        }
44    }
45}
46
47impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
48    type Error = Error;
49
50    #[allow(unused_variables)]
51    fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
52        #[allow(unreachable_patterns)]
53        match &mut self.serializer {
54            #[cfg(feature = "arrow")]
55            BatchSerializer::Arrow(serializer) => {
56                serializer.encode(events, buffer).map_err(|err| {
57                    use crate::encoding::ArrowEncodingError;
58                    match err {
59                        ArrowEncodingError::NullConstraint { .. } => {
60                            Error::SchemaConstraintViolation(Box::new(err))
61                        }
62                        _ => Error::SerializingError(Box::new(err)),
63                    }
64                })
65            }
66            _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
67        }
68    }
69}
70
71/// An wrapper that supports both framed and batch encoding modes.
72#[derive(Debug, Clone)]
73pub enum EncoderKind {
74    /// Uses framing to encode individual events
75    Framed(Box<Encoder<Framer>>),
76    /// Encodes events in batches without framing
77    #[cfg(feature = "arrow")]
78    Batch(BatchEncoder),
79}
80
81#[derive(Debug, Clone)]
82/// An encoder that can encode structured events into byte frames.
83pub struct Encoder<Framer>
84where
85    Framer: Clone,
86{
87    framer: Framer,
88    serializer: Serializer,
89}
90
91impl Default for Encoder<Framer> {
92    fn default() -> Self {
93        use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
94
95        Self {
96            framer: NewlineDelimitedEncoder::default().into(),
97            serializer: TextSerializerConfig::default().build().into(),
98        }
99    }
100}
101
102impl Default for Encoder<()> {
103    fn default() -> Self {
104        use crate::encoding::TextSerializerConfig;
105
106        Self {
107            framer: (),
108            serializer: TextSerializerConfig::default().build().into(),
109        }
110    }
111}
112
113impl<Framer> Encoder<Framer>
114where
115    Framer: Clone,
116{
117    /// Serialize the event without applying framing.
118    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
119        let len = buffer.len();
120        let mut payload = buffer.split_off(len);
121
122        self.serialize_at_start(event, &mut payload)?;
123
124        buffer.unsplit(payload);
125
126        Ok(())
127    }
128
129    /// Serialize the event without applying framing, at the start of the provided buffer.
130    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
131        self.serializer.encode(event, buffer).map_err(|error| {
132            emit(EncoderSerializeError { error: &error });
133            Error::SerializingError(error)
134        })
135    }
136}
137
138impl Encoder<Framer> {
139    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
140    /// from a structured event, and the `Framer` to wrap these into a byte
141    /// frame.
142    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
143        Self { framer, serializer }
144    }
145
146    /// Get the framer.
147    pub const fn framer(&self) -> &Framer {
148        &self.framer
149    }
150
151    /// Get the serializer.
152    pub const fn serializer(&self) -> &Serializer {
153        &self.serializer
154    }
155
156    /// Get the prefix that encloses a batch of events.
157    pub const fn batch_prefix(&self) -> &[u8] {
158        match (&self.framer, &self.serializer) {
159            (
160                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
161                    delimiter: b',',
162                }),
163                Serializer::Json(_) | Serializer::NativeJson(_),
164            ) => b"[",
165            _ => &[],
166        }
167    }
168
169    /// Get the suffix that encloses a batch of events.
170    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
171        match (&self.framer, &self.serializer, empty) {
172            (
173                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
174                    delimiter: b',',
175                }),
176                Serializer::Json(_) | Serializer::NativeJson(_),
177                _,
178            ) => b"]",
179            (Framer::NewlineDelimited(_), _, false) => b"\n",
180            _ => &[],
181        }
182    }
183
184    /// Get the HTTP content type.
185    pub const fn content_type(&self) -> &'static str {
186        match (&self.serializer, &self.framer) {
187            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
188                "application/x-ndjson"
189            }
190            (
191                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
192                Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
193                    delimiter: b',',
194                }),
195            ) => "application/json",
196            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
197            (
198                Serializer::Avro(_)
199                | Serializer::Cef(_)
200                | Serializer::Csv(_)
201                | Serializer::Gelf(_)
202                | Serializer::Json(_)
203                | Serializer::Logfmt(_)
204                | Serializer::NativeJson(_)
205                | Serializer::RawMessage(_)
206                | Serializer::Text(_),
207                _,
208            ) => "text/plain",
209            #[cfg(feature = "syslog")]
210            (Serializer::Syslog(_), _) => "text/plain",
211            #[cfg(feature = "opentelemetry")]
212            (Serializer::Otlp(_), _) => "application/x-protobuf",
213        }
214    }
215}
216
217impl Encoder<()> {
218    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
219    /// from a structured event.
220    pub const fn new(serializer: Serializer) -> Self {
221        Self {
222            framer: (),
223            serializer,
224        }
225    }
226
227    /// Get the serializer.
228    pub const fn serializer(&self) -> &Serializer {
229        &self.serializer
230    }
231}
232
233impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
234    type Error = Error;
235
236    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
237        let len = buffer.len();
238        let mut payload = buffer.split_off(len);
239
240        self.serialize_at_start(event, &mut payload)?;
241
242        // Frame the serialized event.
243        self.framer.encode((), &mut payload).map_err(|error| {
244            emit(EncoderFramingError { error: &error });
245            Error::FramingError(error)
246        })?;
247
248        buffer.unsplit(payload);
249
250        Ok(())
251    }
252}
253
254impl tokio_util::codec::Encoder<Event> for Encoder<()> {
255    type Error = Error;
256
257    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
258        let len = buffer.len();
259        let mut payload = buffer.split_off(len);
260
261        self.serialize_at_start(event, &mut payload)?;
262
263        buffer.unsplit(payload);
264
265        Ok(())
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use bytes::BufMut;
272    use futures::{SinkExt, StreamExt};
273    use tokio_util::codec::FramedWrite;
274    use vector_core::event::LogEvent;
275
276    use super::*;
277    use crate::encoding::BoxedFramingError;
278
279    #[derive(Debug, Clone)]
280    struct ParenEncoder;
281
282    impl ParenEncoder {
283        pub(super) const fn new() -> Self {
284            Self
285        }
286    }
287
288    impl tokio_util::codec::Encoder<()> for ParenEncoder {
289        type Error = BoxedFramingError;
290
291        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
292            dst.reserve(2);
293            let inner = dst.split();
294            dst.put_u8(b'(');
295            dst.unsplit(inner);
296            dst.put_u8(b')');
297            Ok(())
298        }
299    }
300
301    #[derive(Debug, Clone)]
302    struct ErrorNthEncoder<T>(T, usize, usize)
303    where
304        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
305
306    impl<T> ErrorNthEncoder<T>
307    where
308        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
309    {
310        pub(super) const fn new(encoder: T, n: usize) -> Self {
311            Self(encoder, 0, n)
312        }
313    }
314
315    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
316    where
317        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
318    {
319        type Error = BoxedFramingError;
320
321        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
322            self.0.encode((), dst)?;
323            let result = if self.1 == self.2 {
324                Err(Box::new(std::io::Error::other("error")) as _)
325            } else {
326                Ok(())
327            };
328            self.1 += 1;
329            result
330        }
331    }
332
333    #[tokio::test]
334    async fn test_encode_events_sink_empty() {
335        let encoder = Encoder::<Framer>::new(
336            Framer::Boxed(Box::new(ParenEncoder::new())),
337            crate::encoding::TextSerializerConfig::default()
338                .build()
339                .into(),
340        );
341        let source = futures::stream::iter(vec![
342            Event::Log(LogEvent::from("foo")),
343            Event::Log(LogEvent::from("bar")),
344            Event::Log(LogEvent::from("baz")),
345        ])
346        .map(Ok);
347        let sink = Vec::new();
348        let mut framed = FramedWrite::new(sink, encoder);
349        source.forward(&mut framed).await.unwrap();
350        let sink = framed.into_inner();
351        assert_eq!(sink, b"(foo)(bar)(baz)");
352    }
353
354    #[tokio::test]
355    async fn test_encode_events_sink_non_empty() {
356        let encoder = Encoder::<Framer>::new(
357            Framer::Boxed(Box::new(ParenEncoder::new())),
358            crate::encoding::TextSerializerConfig::default()
359                .build()
360                .into(),
361        );
362        let source = futures::stream::iter(vec![
363            Event::Log(LogEvent::from("bar")),
364            Event::Log(LogEvent::from("baz")),
365            Event::Log(LogEvent::from("bat")),
366        ])
367        .map(Ok);
368        let sink = Vec::from("(foo)");
369        let mut framed = FramedWrite::new(sink, encoder);
370        source.forward(&mut framed).await.unwrap();
371        let sink = framed.into_inner();
372        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
373    }
374
375    #[tokio::test]
376    async fn test_encode_events_sink_empty_handle_framing_error() {
377        let encoder = Encoder::<Framer>::new(
378            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
379            crate::encoding::TextSerializerConfig::default()
380                .build()
381                .into(),
382        );
383        let source = futures::stream::iter(vec![
384            Event::Log(LogEvent::from("foo")),
385            Event::Log(LogEvent::from("bar")),
386            Event::Log(LogEvent::from("baz")),
387        ])
388        .map(Ok);
389        let sink = Vec::new();
390        let mut framed = FramedWrite::new(sink, encoder);
391        assert!(source.forward(&mut framed).await.is_err());
392        framed.flush().await.unwrap();
393        let sink = framed.into_inner();
394        assert_eq!(sink, b"(foo)");
395    }
396
397    #[tokio::test]
398    async fn test_encode_events_sink_non_empty_handle_framing_error() {
399        let encoder = Encoder::<Framer>::new(
400            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
401            crate::encoding::TextSerializerConfig::default()
402                .build()
403                .into(),
404        );
405        let source = futures::stream::iter(vec![
406            Event::Log(LogEvent::from("bar")),
407            Event::Log(LogEvent::from("baz")),
408            Event::Log(LogEvent::from("bat")),
409        ])
410        .map(Ok);
411        let sink = Vec::from("(foo)");
412        let mut framed = FramedWrite::new(sink, encoder);
413        assert!(source.forward(&mut framed).await.is_err());
414        framed.flush().await.unwrap();
415        let sink = framed.into_inner();
416        assert_eq!(sink, b"(foo)(bar)");
417    }
418
419    #[tokio::test]
420    async fn test_encode_batch_newline() {
421        let encoder = Encoder::<Framer>::new(
422            Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
423            crate::encoding::TextSerializerConfig::default()
424                .build()
425                .into(),
426        );
427        let source = futures::stream::iter(vec![
428            Event::Log(LogEvent::from("bar")),
429            Event::Log(LogEvent::from("baz")),
430            Event::Log(LogEvent::from("bat")),
431        ])
432        .map(Ok);
433        let sink: Vec<u8> = Vec::new();
434        let mut framed = FramedWrite::new(sink, encoder);
435        source.forward(&mut framed).await.unwrap();
436        let sink = framed.into_inner();
437        assert_eq!(sink, b"bar\nbaz\nbat\n");
438    }
439}