vector/codecs/encoding/
encoder.rs

1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_lib::codecs::{
4    CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
5    encoding::{Error, Framer, Serializer},
6};
7
8use crate::{
9    event::Event,
10    internal_events::{EncoderFramingError, EncoderSerializeError},
11};
12
13#[derive(Debug, Clone)]
14/// An encoder that can encode structured events into byte frames.
15pub struct Encoder<Framer>
16where
17    Framer: Clone,
18{
19    framer: Framer,
20    serializer: Serializer,
21}
22
23impl Default for Encoder<Framer> {
24    fn default() -> Self {
25        Self {
26            framer: NewlineDelimitedEncoder::default().into(),
27            serializer: TextSerializerConfig::default().build().into(),
28        }
29    }
30}
31
32impl Default for Encoder<()> {
33    fn default() -> Self {
34        Self {
35            framer: (),
36            serializer: TextSerializerConfig::default().build().into(),
37        }
38    }
39}
40
41impl<Framer> Encoder<Framer>
42where
43    Framer: Clone,
44{
45    /// Serialize the event without applying framing.
46    pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
47        let len = buffer.len();
48        let mut payload = buffer.split_off(len);
49
50        self.serialize_at_start(event, &mut payload)?;
51
52        buffer.unsplit(payload);
53
54        Ok(())
55    }
56
57    /// Serialize the event without applying framing, at the start of the provided buffer.
58    fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
59        self.serializer.encode(event, buffer).map_err(|error| {
60            emit!(EncoderSerializeError { error: &error });
61            Error::SerializingError(error)
62        })
63    }
64}
65
66impl Encoder<Framer> {
67    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
68    /// from a structured event, and the `Framer` to wrap these into a byte
69    /// frame.
70    pub const fn new(framer: Framer, serializer: Serializer) -> Self {
71        Self { framer, serializer }
72    }
73
74    /// Get the framer.
75    pub const fn framer(&self) -> &Framer {
76        &self.framer
77    }
78
79    /// Get the serializer.
80    pub const fn serializer(&self) -> &Serializer {
81        &self.serializer
82    }
83
84    /// Get the prefix that encloses a batch of events.
85    pub const fn batch_prefix(&self) -> &[u8] {
86        match (&self.framer, &self.serializer) {
87            (
88                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
89                Serializer::Json(_) | Serializer::NativeJson(_),
90            ) => b"[",
91            _ => &[],
92        }
93    }
94
95    /// Get the suffix that encloses a batch of events.
96    pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
97        match (&self.framer, &self.serializer, empty) {
98            (
99                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
100                Serializer::Json(_) | Serializer::NativeJson(_),
101                _,
102            ) => b"]",
103            (Framer::NewlineDelimited(_), _, false) => b"\n",
104            _ => &[],
105        }
106    }
107
108    /// Get the HTTP content type.
109    pub const fn content_type(&self) -> &'static str {
110        match (&self.serializer, &self.framer) {
111            (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
112                "application/x-ndjson"
113            }
114            (
115                Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
116                Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
117            ) => "application/json",
118            (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
119            (
120                Serializer::Avro(_)
121                | Serializer::Cef(_)
122                | Serializer::Csv(_)
123                | Serializer::Gelf(_)
124                | Serializer::Json(_)
125                | Serializer::Logfmt(_)
126                | Serializer::NativeJson(_)
127                | Serializer::RawMessage(_)
128                | Serializer::Text(_),
129                _,
130            ) => "text/plain",
131            #[cfg(feature = "codecs-opentelemetry")]
132            (Serializer::Otlp(_), _) => "application/x-protobuf",
133        }
134    }
135}
136
137impl Encoder<()> {
138    /// Creates a new `Encoder` with the specified `Serializer` to produce bytes
139    /// from a structured event.
140    pub const fn new(serializer: Serializer) -> Self {
141        Self {
142            framer: (),
143            serializer,
144        }
145    }
146
147    /// Get the serializer.
148    pub const fn serializer(&self) -> &Serializer {
149        &self.serializer
150    }
151}
152
153impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
154    type Error = Error;
155
156    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
157        let len = buffer.len();
158        let mut payload = buffer.split_off(len);
159
160        self.serialize_at_start(event, &mut payload)?;
161
162        // Frame the serialized event.
163        self.framer.encode((), &mut payload).map_err(|error| {
164            emit!(EncoderFramingError { error: &error });
165            Error::FramingError(error)
166        })?;
167
168        buffer.unsplit(payload);
169
170        Ok(())
171    }
172}
173
174impl tokio_util::codec::Encoder<Event> for Encoder<()> {
175    type Error = Error;
176
177    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
178        let len = buffer.len();
179        let mut payload = buffer.split_off(len);
180
181        self.serialize_at_start(event, &mut payload)?;
182
183        buffer.unsplit(payload);
184
185        Ok(())
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use bytes::BufMut;
192    use futures_util::{SinkExt, StreamExt};
193    use tokio_util::codec::FramedWrite;
194    use vector_lib::{codecs::encoding::BoxedFramingError, event::LogEvent};
195
196    use super::*;
197
198    #[derive(Debug, Clone)]
199    struct ParenEncoder;
200
201    impl ParenEncoder {
202        pub(super) const fn new() -> Self {
203            Self
204        }
205    }
206
207    impl tokio_util::codec::Encoder<()> for ParenEncoder {
208        type Error = BoxedFramingError;
209
210        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
211            dst.reserve(2);
212            let inner = dst.split();
213            dst.put_u8(b'(');
214            dst.unsplit(inner);
215            dst.put_u8(b')');
216            Ok(())
217        }
218    }
219
220    #[derive(Debug, Clone)]
221    struct ErrorNthEncoder<T>(T, usize, usize)
222    where
223        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
224
225    impl<T> ErrorNthEncoder<T>
226    where
227        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
228    {
229        pub(super) const fn new(encoder: T, n: usize) -> Self {
230            Self(encoder, 0, n)
231        }
232    }
233
234    impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
235    where
236        T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
237    {
238        type Error = BoxedFramingError;
239
240        fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
241            self.0.encode((), dst)?;
242            let result = if self.1 == self.2 {
243                Err(Box::new(std::io::Error::other("error")) as _)
244            } else {
245                Ok(())
246            };
247            self.1 += 1;
248            result
249        }
250    }
251
252    #[tokio::test]
253    async fn test_encode_events_sink_empty() {
254        let encoder = Encoder::<Framer>::new(
255            Framer::Boxed(Box::new(ParenEncoder::new())),
256            TextSerializerConfig::default().build().into(),
257        );
258        let source = futures::stream::iter(vec![
259            Event::Log(LogEvent::from("foo")),
260            Event::Log(LogEvent::from("bar")),
261            Event::Log(LogEvent::from("baz")),
262        ])
263        .map(Ok);
264        let sink = Vec::new();
265        let mut framed = FramedWrite::new(sink, encoder);
266        source.forward(&mut framed).await.unwrap();
267        let sink = framed.into_inner();
268        assert_eq!(sink, b"(foo)(bar)(baz)");
269    }
270
271    #[tokio::test]
272    async fn test_encode_events_sink_non_empty() {
273        let encoder = Encoder::<Framer>::new(
274            Framer::Boxed(Box::new(ParenEncoder::new())),
275            TextSerializerConfig::default().build().into(),
276        );
277        let source = futures::stream::iter(vec![
278            Event::Log(LogEvent::from("bar")),
279            Event::Log(LogEvent::from("baz")),
280            Event::Log(LogEvent::from("bat")),
281        ])
282        .map(Ok);
283        let sink = Vec::from("(foo)");
284        let mut framed = FramedWrite::new(sink, encoder);
285        source.forward(&mut framed).await.unwrap();
286        let sink = framed.into_inner();
287        assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
288    }
289
290    #[tokio::test]
291    async fn test_encode_events_sink_empty_handle_framing_error() {
292        let encoder = Encoder::<Framer>::new(
293            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
294            TextSerializerConfig::default().build().into(),
295        );
296        let source = futures::stream::iter(vec![
297            Event::Log(LogEvent::from("foo")),
298            Event::Log(LogEvent::from("bar")),
299            Event::Log(LogEvent::from("baz")),
300        ])
301        .map(Ok);
302        let sink = Vec::new();
303        let mut framed = FramedWrite::new(sink, encoder);
304        assert!(source.forward(&mut framed).await.is_err());
305        framed.flush().await.unwrap();
306        let sink = framed.into_inner();
307        assert_eq!(sink, b"(foo)");
308    }
309
310    #[tokio::test]
311    async fn test_encode_events_sink_non_empty_handle_framing_error() {
312        let encoder = Encoder::<Framer>::new(
313            Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
314            TextSerializerConfig::default().build().into(),
315        );
316        let source = futures::stream::iter(vec![
317            Event::Log(LogEvent::from("bar")),
318            Event::Log(LogEvent::from("baz")),
319            Event::Log(LogEvent::from("bat")),
320        ])
321        .map(Ok);
322        let sink = Vec::from("(foo)");
323        let mut framed = FramedWrite::new(sink, encoder);
324        assert!(source.forward(&mut framed).await.is_err());
325        framed.flush().await.unwrap();
326        let sink = framed.into_inner();
327        assert_eq!(sink, b"(foo)(bar)");
328    }
329
330    #[tokio::test]
331    async fn test_encode_batch_newline() {
332        let encoder = Encoder::<Framer>::new(
333            Framer::NewlineDelimited(NewlineDelimitedEncoder::default()),
334            TextSerializerConfig::default().build().into(),
335        );
336        let source = futures::stream::iter(vec![
337            Event::Log(LogEvent::from("bar")),
338            Event::Log(LogEvent::from("baz")),
339            Event::Log(LogEvent::from("bat")),
340        ])
341        .map(Ok);
342        let sink: Vec<u8> = Vec::new();
343        let mut framed = FramedWrite::new(sink, encoder);
344        source.forward(&mut framed).await.unwrap();
345        let sink = framed.into_inner();
346        assert_eq!(sink, b"bar\nbaz\nbat\n");
347    }
348}