codecs/
decoder_framed_read.rs

1use bytes::BytesMut;
2use futures::Stream;
3use pin_project::pin_project;
4use std::{
5    io,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::io::AsyncRead;
10use tokio_util::codec::{Decoder, FramedRead};
11
12/// Internal wrapper that converts decoder errors into successful results.
13///
14/// This wrapper transforms a decoder's error result from `Err(error)` into
15/// `Ok(Some(Err(error)))`, which prevents `FramedRead` from terminating the stream
16/// while still propagating the error to the caller.
17struct DecoderResultWrapper<D> {
18    inner: D,
19}
20
21impl<D> DecoderResultWrapper<D>
22where
23    D: Decoder,
24{
25    const fn new(inner: D) -> Self {
26        Self { inner }
27    }
28}
29
30impl<D> Decoder for DecoderResultWrapper<D>
31where
32    D: Decoder,
33{
34    type Item = Result<D::Item, D::Error>;
35    type Error = io::Error;
36
37    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
38        match self.inner.decode(src) {
39            Ok(item) => Ok(item.map(Ok)),
40            Err(error) => Ok(Some(Err(error))),
41        }
42    }
43
44    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
45        match self.inner.decode_eof(src) {
46            Ok(item) => Ok(item.map(Ok)),
47            Err(error) => Ok(Some(Err(error))),
48        }
49    }
50}
51
52/// A `tokio_util::codec::FramedRead` wrapper that continues decoding after recoverable decoder errors.
53///
54/// # Problem
55///
56/// The standard `tokio_util::codec::FramedRead` terminates the stream when a decoder
57/// returns an error. This is problematic for Vector because:
58/// - Vector decoders classify some errors as recoverable (e.g., malformed JSON in one line
59///   shouldn't stop processing subsequent valid lines)
60/// - Sources need to continue processing data even after encountering decode errors
61/// - Metrics and observability require tracking both successful and failed decode attempts
62///
63/// # Solution
64///
65/// `DecoderFramedRead` wraps the decoder in a `DecoderResultWrapper` that transforms
66/// decoder errors into successful results containing the error. This allows:
67/// - The stream to continue after errors
68/// - Callers to inspect errors and decide whether to continue (via `StreamDecodingError::can_continue()`)
69/// - Proper error metrics and logging
70///
71/// # When to Use
72///
73/// Use `DecoderFramedRead` when:
74/// - You're using a Vector `Decoder` that implements error recovery logic
75/// - You need to continue processing after decode errors
76/// - You're processing line-delimited or record-based formats where one bad record shouldn't stop processing
77///
78/// Use standard `FramedRead` when:
79/// - You're using simple decoders (e.g., `CharacterDelimitedDecoder`) that don't need error recovery
80/// - Any decode error should terminate the stream
81/// - You're working with binary protocols where errors indicate corruption
82///
83/// # Example
84///
85/// ```ignore
86/// use vector_lib::codecs::{DecoderFramedRead, Decoder};
87/// use futures::StreamExt;
88///
89/// let decoder = Decoder::new(
90///     Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
91///     Deserializer::Json(JsonDeserializer::default()),
92/// );
93///
94/// let mut stream = DecoderFramedRead::new(reader, decoder);
95///
96/// while let Some(result) = stream.next().await {
97///     match result {
98///         Ok(events) => process_events(events),
99///         Err(error) if error.can_continue() => {
100///             // Log the error but continue processing
101///             warn!("Decode error (continuing): {}", error);
102///         }
103///         Err(error) => {
104///             // Fatal error, stop processing
105///             error!("Fatal decode error: {}", error);
106///             break;
107///         }
108///     }
109/// }
110/// ```
111#[pin_project]
112pub struct DecoderFramedRead<T, D> {
113    #[pin]
114    inner: FramedRead<T, DecoderResultWrapper<D>>,
115}
116
117impl<T, D> DecoderFramedRead<T, D>
118where
119    T: AsyncRead,
120    D: Decoder,
121{
122    /// Creates a new `DecoderFramedRead` with the given decoder.
123    ///
124    /// This wraps the provided decoder to enable error recovery, allowing the stream
125    /// to continue processing after recoverable decode errors.
126    ///
127    /// # Arguments
128    ///
129    /// * `inner` - The async reader to read from
130    /// * `decoder` - The decoder to use for parsing data
131    pub fn new(inner: T, decoder: D) -> Self {
132        Self {
133            inner: FramedRead::new(inner, DecoderResultWrapper::new(decoder)),
134        }
135    }
136
137    /// Creates a new `DecoderFramedRead` with a specific buffer capacity.
138    ///
139    /// Use this when you know the expected message size to optimize memory usage.
140    ///
141    /// # Arguments
142    ///
143    /// * `inner` - The async reader to read from
144    /// * `decoder` - The decoder to use for parsing data
145    /// * `capacity` - The initial buffer capacity in bytes
146    pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> Self {
147        Self {
148            inner: FramedRead::with_capacity(inner, DecoderResultWrapper::new(decoder), capacity),
149        }
150    }
151
152    /// Returns a reference to the underlying I/O stream.
153    ///
154    /// This is useful for accessing the underlying reader's properties or state
155    /// without consuming the `DecoderFramedRead`.
156    pub fn get_ref(&self) -> &T {
157        self.inner.get_ref()
158    }
159
160    /// Returns a mutable reference to the underlying I/O stream.
161    ///
162    /// This allows modifying the underlying reader's state, though care should be
163    /// taken not to interfere with ongoing decoding operations.
164    pub fn get_mut(&mut self) -> &mut T {
165        self.inner.get_mut()
166    }
167
168    /// Returns a reference to the internal read buffer.
169    ///
170    /// This provides access to any buffered but not yet decoded data. Useful for
171    /// debugging or implementing custom recovery logic.
172    pub fn read_buffer(&self) -> &BytesMut {
173        self.inner.read_buffer()
174    }
175}
176
177impl<T, D> Stream for DecoderFramedRead<T, D>
178where
179    T: AsyncRead,
180    D: Decoder,
181    D::Error: From<io::Error>,
182{
183    type Item = Result<D::Item, D::Error>;
184
185    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
186        let this = self.project();
187
188        // The DecoderResultWrapper transforms errors into Ok(Err(...)) so the stream continues.
189        // We need to unwrap this double Result structure here.
190        match this.inner.poll_next(cx) {
191            Poll::Ready(Some(Ok(Ok(item)))) => Poll::Ready(Some(Ok(item))),
192            Poll::Ready(Some(Ok(Err(error)))) => Poll::Ready(Some(Err(error))),
193            Poll::Ready(Some(Err(error))) => Poll::Ready(Some(Err(error.into()))),
194            Poll::Ready(None) => Poll::Ready(None),
195            Poll::Pending => Poll::Pending,
196        }
197    }
198}