vector/codecs/decoding/
decoder.rs

1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_lib::{
4    codecs::decoding::{
5        BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
6        format::Deserializer as _,
7    },
8    config::LogNamespace,
9};
10
11use crate::{
12    event::Event,
13    internal_events::{DecoderDeserializeError, DecoderFramingError},
14};
15
16/// A decoder that can decode structured events from a byte stream / byte
17/// messages.
18#[derive(Clone)]
19pub struct Decoder {
20    /// The framer being used.
21    pub framer: Framer,
22    /// The deserializer being used.
23    pub deserializer: Deserializer,
24    /// The `log_namespace` being used.
25    pub log_namespace: LogNamespace,
26}
27
28impl Default for Decoder {
29    fn default() -> Self {
30        Self {
31            framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
32            deserializer: Deserializer::Bytes(BytesDeserializer),
33            log_namespace: LogNamespace::Legacy,
34        }
35    }
36}
37
38impl Decoder {
39    /// Creates a new `Decoder` with the specified `Framer` to produce byte
40    /// frames from the byte stream / byte messages and `Deserializer` to parse
41    /// structured events from a byte frame.
42    pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
43        Self {
44            framer,
45            deserializer,
46            log_namespace: LogNamespace::Legacy,
47        }
48    }
49
50    /// Sets the log namespace that will be used when decoding.
51    pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
52        self.log_namespace = log_namespace;
53        self
54    }
55
56    /// Handles the framing result and parses it into a structured event, if
57    /// possible.
58    ///
59    /// Emits logs if either framing or parsing failed.
60    fn handle_framing_result(
61        &mut self,
62        frame: Result<Option<Bytes>, BoxedFramingError>,
63    ) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
64        let frame = frame.map_err(|error| {
65            emit!(DecoderFramingError { error: &error });
66            Error::FramingError(error)
67        })?;
68
69        frame
70            .map(|frame| self.deserializer_parse(frame))
71            .transpose()
72    }
73
74    /// Parses a frame using the included deserializer, and handles any errors by logging.
75    pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
76        let byte_size = frame.len();
77
78        // Parse structured events from the byte frame.
79        self.deserializer
80            .parse(frame, self.log_namespace)
81            .map(|events| (events, byte_size))
82            .map_err(|error| {
83                emit!(DecoderDeserializeError { error: &error });
84                Error::ParsingError(error)
85            })
86    }
87}
88
89impl tokio_util::codec::Decoder for Decoder {
90    type Item = (SmallVec<[Event; 1]>, usize);
91    type Error = Error;
92
93    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
94        let frame = self.framer.decode(buf);
95        self.handle_framing_result(frame)
96    }
97
98    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
99        let frame = self.framer.decode_eof(buf);
100        self.handle_framing_result(frame)
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use bytes::Bytes;
107    use futures::{StreamExt, stream};
108    use tokio_util::{codec::FramedRead, io::StreamReader};
109    use vector_lib::codecs::{
110        JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
111        decoding::{Deserializer, Framer},
112    };
113    use vrl::value::Value;
114
115    use super::Decoder;
116
117    #[tokio::test]
118    async fn framed_read_recover_from_error() {
119        let iter = stream::iter(
120            ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
121                .into_iter()
122                .map(Bytes::from),
123        );
124        let stream = iter.map(Ok::<_, std::io::Error>);
125        let reader = StreamReader::new(stream);
126        let decoder = Decoder::new(
127            Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
128            Deserializer::Json(JsonDeserializer::default()),
129        );
130        let mut stream = FramedRead::new(reader, decoder);
131
132        let next = stream.next().await.unwrap();
133        let event = next.unwrap().0.pop().unwrap().into_log();
134        assert_eq!(event.get("foo").unwrap(), &Value::from(1));
135
136        let next = stream.next().await.unwrap();
137        let error = next.unwrap_err();
138        assert!(error.can_continue());
139
140        let next = stream.next().await.unwrap();
141        let event = next.unwrap().0.pop().unwrap().into_log();
142        assert_eq!(event.get("bar").unwrap(), &Value::from(2));
143    }
144}