vector/codecs/decoding/
decoder.rs1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_lib::codecs::decoding::{
4 format::Deserializer as _, BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer,
5 NewlineDelimitedDecoder,
6};
7use vector_lib::config::LogNamespace;
8
9use crate::{
10 event::Event,
11 internal_events::{DecoderDeserializeError, DecoderFramingError},
12};
13
14#[derive(Clone)]
17pub struct Decoder {
18 pub framer: Framer,
20 pub deserializer: Deserializer,
22 pub log_namespace: LogNamespace,
24}
25
26impl Default for Decoder {
27 fn default() -> Self {
28 Self {
29 framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
30 deserializer: Deserializer::Bytes(BytesDeserializer),
31 log_namespace: LogNamespace::Legacy,
32 }
33 }
34}
35
36impl Decoder {
37 pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
41 Self {
42 framer,
43 deserializer,
44 log_namespace: LogNamespace::Legacy,
45 }
46 }
47
48 pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
50 self.log_namespace = log_namespace;
51 self
52 }
53
54 fn handle_framing_result(
59 &mut self,
60 frame: Result<Option<Bytes>, BoxedFramingError>,
61 ) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
62 let frame = frame.map_err(|error| {
63 emit!(DecoderFramingError { error: &error });
64 Error::FramingError(error)
65 })?;
66
67 frame
68 .map(|frame| self.deserializer_parse(frame))
69 .transpose()
70 }
71
72 pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
74 let byte_size = frame.len();
75
76 self.deserializer
78 .parse(frame, self.log_namespace)
79 .map(|events| (events, byte_size))
80 .map_err(|error| {
81 emit!(DecoderDeserializeError { error: &error });
82 Error::ParsingError(error)
83 })
84 }
85}
86
87impl tokio_util::codec::Decoder for Decoder {
88 type Item = (SmallVec<[Event; 1]>, usize);
89 type Error = Error;
90
91 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
92 let frame = self.framer.decode(buf);
93 self.handle_framing_result(frame)
94 }
95
96 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
97 let frame = self.framer.decode_eof(buf);
98 self.handle_framing_result(frame)
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::Decoder;
105 use bytes::Bytes;
106 use futures::{stream, StreamExt};
107 use tokio_util::{codec::FramedRead, io::StreamReader};
108 use vector_lib::codecs::{
109 decoding::{Deserializer, Framer},
110 JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
111 };
112 use vrl::value::Value;
113
114 #[tokio::test]
115 async fn framed_read_recover_from_error() {
116 let iter = stream::iter(
117 ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
118 .into_iter()
119 .map(Bytes::from),
120 );
121 let stream = iter.map(Ok::<_, std::io::Error>);
122 let reader = StreamReader::new(stream);
123 let decoder = Decoder::new(
124 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
125 Deserializer::Json(JsonDeserializer::default()),
126 );
127 let mut stream = FramedRead::new(reader, decoder);
128
129 let next = stream.next().await.unwrap();
130 let event = next.unwrap().0.pop().unwrap().into_log();
131 assert_eq!(event.get("foo").unwrap(), &Value::from(1));
132
133 let next = stream.next().await.unwrap();
134 let error = next.unwrap_err();
135 assert!(error.can_continue());
136
137 let next = stream.next().await.unwrap();
138 let event = next.unwrap().0.pop().unwrap().into_log();
139 assert_eq!(event.get("bar").unwrap(), &Value::from(2));
140 }
141}