vector/codecs/decoding/
decoder.rs1use bytes::{Bytes, BytesMut};
2use smallvec::SmallVec;
3use vector_lib::{
4 codecs::decoding::{
5 BoxedFramingError, BytesDeserializer, Deserializer, Error, Framer, NewlineDelimitedDecoder,
6 format::Deserializer as _,
7 },
8 config::LogNamespace,
9};
10
11use crate::{
12 event::Event,
13 internal_events::{DecoderDeserializeError, DecoderFramingError},
14};
15
16#[derive(Clone)]
19pub struct Decoder {
20 pub framer: Framer,
22 pub deserializer: Deserializer,
24 pub log_namespace: LogNamespace,
26}
27
28impl Default for Decoder {
29 fn default() -> Self {
30 Self {
31 framer: Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
32 deserializer: Deserializer::Bytes(BytesDeserializer),
33 log_namespace: LogNamespace::Legacy,
34 }
35 }
36}
37
38impl Decoder {
39 pub const fn new(framer: Framer, deserializer: Deserializer) -> Self {
43 Self {
44 framer,
45 deserializer,
46 log_namespace: LogNamespace::Legacy,
47 }
48 }
49
50 pub const fn with_log_namespace(mut self, log_namespace: LogNamespace) -> Self {
52 self.log_namespace = log_namespace;
53 self
54 }
55
56 fn handle_framing_result(
61 &mut self,
62 frame: Result<Option<Bytes>, BoxedFramingError>,
63 ) -> Result<Option<(SmallVec<[Event; 1]>, usize)>, Error> {
64 let frame = frame.map_err(|error| {
65 emit!(DecoderFramingError { error: &error });
66 Error::FramingError(error)
67 })?;
68
69 frame
70 .map(|frame| self.deserializer_parse(frame))
71 .transpose()
72 }
73
74 pub fn deserializer_parse(&self, frame: Bytes) -> Result<(SmallVec<[Event; 1]>, usize), Error> {
76 let byte_size = frame.len();
77
78 self.deserializer
80 .parse(frame, self.log_namespace)
81 .map(|events| (events, byte_size))
82 .map_err(|error| {
83 emit!(DecoderDeserializeError { error: &error });
84 Error::ParsingError(error)
85 })
86 }
87}
88
89impl tokio_util::codec::Decoder for Decoder {
90 type Item = (SmallVec<[Event; 1]>, usize);
91 type Error = Error;
92
93 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
94 let frame = self.framer.decode(buf);
95 self.handle_framing_result(frame)
96 }
97
98 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
99 let frame = self.framer.decode_eof(buf);
100 self.handle_framing_result(frame)
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use bytes::Bytes;
107 use futures::{StreamExt, stream};
108 use tokio_util::{codec::FramedRead, io::StreamReader};
109 use vector_lib::codecs::{
110 JsonDeserializer, NewlineDelimitedDecoder, StreamDecodingError,
111 decoding::{Deserializer, Framer},
112 };
113 use vrl::value::Value;
114
115 use super::Decoder;
116
117 #[tokio::test]
118 async fn framed_read_recover_from_error() {
119 let iter = stream::iter(
120 ["{ \"foo\": 1 }\n", "invalid\n", "{ \"bar\": 2 }\n"]
121 .into_iter()
122 .map(Bytes::from),
123 );
124 let stream = iter.map(Ok::<_, std::io::Error>);
125 let reader = StreamReader::new(stream);
126 let decoder = Decoder::new(
127 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
128 Deserializer::Json(JsonDeserializer::default()),
129 );
130 let mut stream = FramedRead::new(reader, decoder);
131
132 let next = stream.next().await.unwrap();
133 let event = next.unwrap().0.pop().unwrap().into_log();
134 assert_eq!(event.get("foo").unwrap(), &Value::from(1));
135
136 let next = stream.next().await.unwrap();
137 let error = next.unwrap_err();
138 assert!(error.can_continue());
139
140 let next = stream.next().await.unwrap();
141 let event = next.unwrap().0.pop().unwrap().into_log();
142 assert_eq!(event.get("bar").unwrap(), &Value::from(2));
143 }
144}