codecs/decoding/framing/
newline_delimited.rs

1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, CharacterDelimitedDecoder};
7
8/// Config used to build a `NewlineDelimitedDecoder`.
9#[configurable_component]
10#[derive(Debug, Clone, Default, PartialEq, Eq)]
11pub struct NewlineDelimitedDecoderConfig {
12    /// Options for the newline delimited decoder.
13    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
14    pub newline_delimited: NewlineDelimitedDecoderOptions,
15}
16
17/// Options for building a `NewlineDelimitedDecoder`.
18#[configurable_component]
19#[derive(Clone, Debug, Derivative, PartialEq, Eq)]
20#[derivative(Default)]
21pub struct NewlineDelimitedDecoderOptions {
22    /// The maximum length of the byte buffer.
23    ///
24    /// This length does *not* include the trailing delimiter.
25    ///
26    /// By default, there is no maximum length enforced. If events are malformed, this can lead to
27    /// additional resource usage as events continue to be buffered in memory, and can potentially
28    /// lead to memory exhaustion in extreme cases.
29    ///
30    /// If there is a risk of processing malformed data, such as logs with user-controlled input,
31    /// consider setting the maximum length to a reasonably large value as a safety net. This
32    /// ensures that processing is not actually unbounded.
33    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
34    pub max_length: Option<usize>,
35}
36
37impl NewlineDelimitedDecoderOptions {
38    /// Creates a `NewlineDelimitedDecoderOptions` with a maximum frame length limit.
39    pub const fn new_with_max_length(max_length: usize) -> Self {
40        Self {
41            max_length: Some(max_length),
42        }
43    }
44}
45
46impl NewlineDelimitedDecoderConfig {
47    /// Creates a new `NewlineDelimitedDecoderConfig`.
48    pub fn new() -> Self {
49        Default::default()
50    }
51
52    /// Creates a `NewlineDelimitedDecoder` with a maximum frame length limit.
53    pub const fn new_with_max_length(max_length: usize) -> Self {
54        Self {
55            newline_delimited: { NewlineDelimitedDecoderOptions::new_with_max_length(max_length) },
56        }
57    }
58
59    /// Build the `NewlineDelimitedDecoder` from this configuration.
60    pub const fn build(&self) -> NewlineDelimitedDecoder {
61        if let Some(max_length) = self.newline_delimited.max_length {
62            NewlineDelimitedDecoder::new_with_max_length(max_length)
63        } else {
64            NewlineDelimitedDecoder::new()
65        }
66    }
67}
68
69/// A codec for handling bytes that are delimited by (a) newline(s).
70#[derive(Debug, Clone)]
71pub struct NewlineDelimitedDecoder(CharacterDelimitedDecoder);
72
73impl NewlineDelimitedDecoder {
74    /// Creates a new `NewlineDelimitedDecoder`.
75    pub const fn new() -> Self {
76        Self(CharacterDelimitedDecoder::new(b'\n'))
77    }
78
79    /// Creates a `NewlineDelimitedDecoder` with a maximum frame length limit.
80    ///
81    /// Any frames longer than `max_length` bytes will be discarded entirely.
82    pub const fn new_with_max_length(max_length: usize) -> Self {
83        Self(CharacterDelimitedDecoder::new_with_max_length(
84            b'\n', max_length,
85        ))
86    }
87}
88
89impl Default for NewlineDelimitedDecoder {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl Decoder for NewlineDelimitedDecoder {
96    type Item = Bytes;
97    type Error = BoxedFramingError;
98
99    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
100        self.0.decode(src)
101    }
102
103    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
104        self.0.decode_eof(src)
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[test]
113    fn decode_bytes_with_newlines() {
114        let mut input = BytesMut::from("foo\nbar\nbaz");
115        let mut decoder = NewlineDelimitedDecoder::new();
116
117        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
118        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
119        assert_eq!(decoder.decode(&mut input).unwrap(), None);
120    }
121
122    #[test]
123    fn decode_bytes_with_newlines_trailing() {
124        let mut input = BytesMut::from("foo\nbar\nbaz\n");
125        let mut decoder = NewlineDelimitedDecoder::new();
126
127        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
128        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
129        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
130        assert_eq!(decoder.decode(&mut input).unwrap(), None);
131    }
132
133    #[test]
134    fn decode_bytes_with_newlines_and_max_length() {
135        let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
136        let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
137
138        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
139        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
140        assert_eq!(decoder.decode(&mut input).unwrap(), None);
141    }
142
143    #[test]
144    fn decode_eof_bytes_with_newlines() {
145        let mut input = BytesMut::from("foo\nbar\nbaz");
146        let mut decoder = NewlineDelimitedDecoder::new();
147
148        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
149        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
150        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
151    }
152
153    #[test]
154    fn decode_eof_bytes_with_newlines_trailing() {
155        let mut input = BytesMut::from("foo\nbar\nbaz\n");
156        let mut decoder = NewlineDelimitedDecoder::new();
157
158        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
159        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
160        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
161        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
162    }
163
164    #[test]
165    fn decode_eof_bytes_with_newlines_and_max_length() {
166        let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
167        let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
168
169        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
170        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
171        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
172    }
173}