codecs/decoder_framed_read.rs
1use bytes::BytesMut;
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5 io,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::io::AsyncRead;
10use tokio_util::codec::{Decoder, FramedRead};
11
12/// Internal wrapper that converts decoder errors into successful results.
13///
14/// This wrapper transforms a decoder's error result from `Err(error)` into
15/// `Ok(Some(Err(error)))`, which prevents `FramedRead` from terminating the stream
16/// while still propagating the error to the caller.
17struct DecoderResultWrapper<D> {
18 inner: D,
19}
20
21impl<D> DecoderResultWrapper<D>
22where
23 D: Decoder,
24{
25 const fn new(inner: D) -> Self {
26 Self { inner }
27 }
28}
29
30impl<D> Decoder for DecoderResultWrapper<D>
31where
32 D: Decoder,
33{
34 type Item = Result<D::Item, D::Error>;
35 type Error = io::Error;
36
37 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38 match self.inner.decode(src) {
39 Ok(item) => Ok(item.map(Ok)),
40 Err(error) => Ok(Some(Err(error))),
41 }
42 }
43
44 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45 match self.inner.decode_eof(src) {
46 Ok(item) => Ok(item.map(Ok)),
47 Err(error) => Ok(Some(Err(error))),
48 }
49 }
50}
51
52/// A `tokio_util::codec::FramedRead` wrapper that continues decoding after recoverable decoder errors.
53///
54/// # Problem
55///
56/// The standard `tokio_util::codec::FramedRead` terminates the stream when a decoder
57/// returns an error. This is problematic for Vector because:
58/// - Vector decoders classify some errors as recoverable (e.g., malformed JSON in one line
59/// shouldn't stop processing subsequent valid lines)
60/// - Sources need to continue processing data even after encountering decode errors
61/// - Metrics and observability require tracking both successful and failed decode attempts
62///
63/// # Solution
64///
65/// `DecoderFramedRead` wraps the decoder in a `DecoderResultWrapper` that transforms
66/// decoder errors into successful results containing the error. This allows:
67/// - The stream to continue after errors
68/// - Callers to inspect errors and decide whether to continue (via `StreamDecodingError::can_continue()`)
69/// - Proper error metrics and logging
70///
71/// # When to Use
72///
73/// Use `DecoderFramedRead` when:
74/// - You're using a Vector `Decoder` that implements error recovery logic
75/// - You need to continue processing after decode errors
76/// - You're processing line-delimited or record-based formats where one bad record shouldn't stop processing
77///
78/// Use standard `FramedRead` when:
79/// - You're using simple decoders (e.g., `CharacterDelimitedDecoder`) that don't need error recovery
80/// - Any decode error should terminate the stream
81/// - You're working with binary protocols where errors indicate corruption
82///
83/// # Example
84///
85/// ```ignore
86/// use vector_lib::codecs::{DecoderFramedRead, Decoder};
87/// use futures::StreamExt;
88///
89/// let decoder = Decoder::new(
90/// Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
91/// Deserializer::Json(JsonDeserializer::default()),
92/// );
93///
94/// let mut stream = DecoderFramedRead::new(reader, decoder);
95///
96/// while let Some(result) = stream.next().await {
97/// match result {
98/// Ok(events) => process_events(events),
99/// Err(error) if error.can_continue() => {
100/// // Log the error but continue processing
101/// warn!("Decode error (continuing): {}", error);
102/// }
103/// Err(error) => {
104/// // Fatal error, stop processing
105/// error!("Fatal decode error: {}", error);
106/// break;
107/// }
108/// }
109/// }
110/// ```
111#[pin_project]
112pub struct DecoderFramedRead<T, D> {
113 #[pin]
114 inner: FramedRead<T, DecoderResultWrapper<D>>,
115}
116
117impl<T, D> DecoderFramedRead<T, D>
118where
119 T: AsyncRead,
120 D: Decoder,
121{
122 /// Creates a new `DecoderFramedRead` with the given decoder.
123 ///
124 /// This wraps the provided decoder to enable error recovery, allowing the stream
125 /// to continue processing after recoverable decode errors.
126 ///
127 /// # Arguments
128 ///
129 /// * `inner` - The async reader to read from
130 /// * `decoder` - The decoder to use for parsing data
131 pub fn new(inner: T, decoder: D) -> Self {
132 Self {
133 inner: FramedRead::new(inner, DecoderResultWrapper::new(decoder)),
134 }
135 }
136
137 /// Creates a new `DecoderFramedRead` with a specific buffer capacity.
138 ///
139 /// Use this when you know the expected message size to optimize memory usage.
140 ///
141 /// # Arguments
142 ///
143 /// * `inner` - The async reader to read from
144 /// * `decoder` - The decoder to use for parsing data
145 /// * `capacity` - The initial buffer capacity in bytes
146 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> Self {
147 Self {
148 inner: FramedRead::with_capacity(inner, DecoderResultWrapper::new(decoder), capacity),
149 }
150 }
151
152 /// Returns a reference to the underlying I/O stream.
153 ///
154 /// This is useful for accessing the underlying reader's properties or state
155 /// without consuming the `DecoderFramedRead`.
156 pub fn get_ref(&self) -> &T {
157 self.inner.get_ref()
158 }
159
160 /// Returns a mutable reference to the underlying I/O stream.
161 ///
162 /// This allows modifying the underlying reader's state, though care should be
163 /// taken not to interfere with ongoing decoding operations.
164 pub fn get_mut(&mut self) -> &mut T {
165 self.inner.get_mut()
166 }
167
168 /// Returns a reference to the internal read buffer.
169 ///
170 /// This provides access to any buffered but not yet decoded data. Useful for
171 /// debugging or implementing custom recovery logic.
172 pub fn read_buffer(&self) -> &BytesMut {
173 self.inner.read_buffer()
174 }
175}
176
177impl<T, D> Stream for DecoderFramedRead<T, D>
178where
179 T: AsyncRead,
180 D: Decoder,
181 D::Error: From<io::Error>,
182{
183 type Item = Result<D::Item, D::Error>;
184
185 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186 let this = self.project();
187
188 // The DecoderResultWrapper transforms errors into Ok(Err(...)) so the stream continues.
189 // We need to unwrap this double Result structure here.
190 match this.inner.poll_next(cx) {
191 Poll::Ready(Some(Ok(Ok(item)))) => Poll::Ready(Some(Ok(item))),
192 Poll::Ready(Some(Ok(Err(error)))) => Poll::Ready(Some(Err(error))),
193 Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error.into()))),
194 Poll::Ready(None) => Poll::Ready(None),
195 Poll::Pending => Poll::Pending,
196 }
197 }
198}