vector/codecs/decoding/
decoder.rs

1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_lib::codecs::decoding::{
4    format::Deserializer as _, BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer,
5    NewlineDelimitedDecoder,
6};
7use vector_lib::config::LogNamespace;
8
9use crate::{
10    event::Event,
11    internal_events::{DecoderDeserializeError, DecoderFramingError},
12};
13
14/// A decoder that can decode structured events from a byte stream / byte
15/// messages.
16#[derive(Clone)]
17pub struct Decoder {
18    /// The framer being used.
19    pub framer: Framer,
20    /// The deserializer being used.
21    pub deserializer: Deserializer,
22    /// The `log_namespace` being used.
23    pub log_namespace: LogNamespace,
24}
25
26impl Default for Decoder {
27    fn default() -> Self {
28        Self {
29            framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
30            deserializer: Deserializer::Bytes(BytesDeserializer),
31            log_namespace: LogNamespace::Legacy,
32        }
33    }
34}
35
36impl Decoder {
37    /// Creates a new `Decoder` with the specified `Framer` to produce byte
38    /// frames from the byte stream / byte messages and `Deserializer` to parse
39    /// structured events from a byte frame.
40    pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
41        Self {
42            framer,
43            deserializer,
44            log_namespace: LogNamespace::Legacy,
45        }
46    }
47
48    /// Sets the log namespace that will be used when decoding.
49    pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
50        self.log_namespace = log_namespace;
51        self
52    }
53
54    /// Handles the framing result and parses it into a structured event, if
55    /// possible.
56    ///
57    /// Emits logs if either framing or parsing failed.
58    fn handle_framing_result(
59        &mut self,
60        frame: Result<Option<Bytes>, BoxedFramingError>,
61    ) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
62        let frame = frame.map_err(|error| {
63            emit!(DecoderFramingError { error: &error });
64            Error::FramingError(error)
65        })?;
66
67        frame
68            .map(|frame| self.deserializer_parse(frame))
69            .transpose()
70    }
71
72    /// Parses a frame using the included deserializer, and handles any errors by logging.
73    pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
74        let byte_size = frame.len();
75
76        // Parse structured events from the byte frame.
77        self.deserializer
78            .parse(frame, self.log_namespace)
79            .map(|events| (events, byte_size))
80            .map_err(|error| {
81                emit!(DecoderDeserializeError { error: &error });
82                Error::ParsingError(error)
83            })
84    }
85}
86
87impl tokio_util::codec::Decoder for Decoder {
88    type Item = (SmallVec<[Event; 1]>, usize);
89    type Error = Error;
90
91    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
92        let frame = self.framer.decode(buf);
93        self.handle_framing_result(frame)
94    }
95
96    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
97        let frame = self.framer.decode_eof(buf);
98        self.handle_framing_result(frame)
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::Decoder;
105    use bytes::Bytes;
106    use futures::{stream, StreamExt};
107    use tokio_util::{codec::FramedRead, io::StreamReader};
108    use vector_lib::codecs::{
109        decoding::{Deserializer, Framer},
110        JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
111    };
112    use vrl::value::Value;
113
114    #[tokio::test]
115    async fn framed_read_recover_from_error() {
116        let iter = stream::iter(
117            ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
118                .into_iter()
119                .map(Bytes::from),
120        );
121        let stream = iter.map(Ok::<_, std::io::Error>);
122        let reader = StreamReader::new(stream);
123        let decoder = Decoder::new(
124            Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
125            Deserializer::Json(JsonDeserializer::default()),
126        );
127        let mut stream = FramedRead::new(reader, decoder);
128
129        let next = stream.next().await.unwrap();
130        let event = next.unwrap().0.pop().unwrap().into_log();
131        assert_eq!(event.get("foo").unwrap(), &Value::from(1));
132
133        let next = stream.next().await.unwrap();
134        let error = next.unwrap_err();
135        assert!(error.can_continue());
136
137        let next = stream.next().await.unwrap();
138        let event = next.unwrap().0.pop().unwrap().into_log();
139        assert_eq!(event.get("bar").unwrap(), &Value::from(2));
140    }
141}