codecs/decoding/framing/
character_delimited.rs

1use bytes::{Buf, Bytes, BytesMut};
2use memchr::memchr;
3use tokio_util::codec::Decoder;
4use tracing::{trace, warn};
5use vector_config::configurable_component;
6
7use super::BoxedFramingError;
8
9/// Config used to build a `CharacterDelimitedDecoder`.
10#[configurable_component]
11#[derive(Debug, Clone)]
12pub struct CharacterDelimitedDecoderConfig {
13    /// Options for the character delimited decoder.
14    pub character_delimited: CharacterDelimitedDecoderOptions,
15}
16
17impl CharacterDelimitedDecoderConfig {
18    /// Creates a `CharacterDelimitedDecoderConfig` with the specified delimiter and default max length.
19    pub const fn new(delimiter: u8) -> Self {
20        Self {
21            character_delimited: CharacterDelimitedDecoderOptions::new(delimiter, None),
22        }
23    }
24    /// Build the `CharacterDelimitedDecoder` from this configuration.
25    pub const fn build(&self) -> CharacterDelimitedDecoder {
26        if let Some(max_length) = self.character_delimited.max_length {
27            CharacterDelimitedDecoder::new_with_max_length(
28                self.character_delimited.delimiter,
29                max_length,
30            )
31        } else {
32            CharacterDelimitedDecoder::new(self.character_delimited.delimiter)
33        }
34    }
35}
36
37/// Options for building a `CharacterDelimitedDecoder`.
38#[configurable_component]
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct CharacterDelimitedDecoderOptions {
41    /// The character that delimits byte sequences.
42    #[configurable(metadata(docs::type_override = "ascii_char"))]
43    #[serde(with = "vector_core::serde::ascii_char")]
44    pub delimiter: u8,
45
46    /// The maximum length of the byte buffer.
47    ///
48    /// This length does *not* include the trailing delimiter.
49    ///
50    /// By default, there is no maximum length enforced. If events are malformed, this can lead to
51    /// additional resource usage as events continue to be buffered in memory, and can potentially
52    /// lead to memory exhaustion in extreme cases.
53    ///
54    /// If there is a risk of processing malformed data, such as logs with user-controlled input,
55    /// consider setting the maximum length to a reasonably large value as a safety net. This
56    /// ensures that processing is not actually unbounded.
57    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
58    pub max_length: Option<usize>,
59}
60
61impl CharacterDelimitedDecoderOptions {
62    /// Create a `CharacterDelimitedDecoderOptions` with a delimiter and optional max_length.
63    pub const fn new(delimiter: u8, max_length: Option<usize>) -> Self {
64        Self {
65            delimiter,
66            max_length,
67        }
68    }
69}
70
71/// A decoder for handling bytes that are delimited by (a) chosen character(s).
72#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct CharacterDelimitedDecoder {
74    /// The delimiter used to separate byte sequences.
75    pub delimiter: u8,
76    /// The maximum length of the byte buffer.
77    pub max_length: usize,
78}
79
80impl CharacterDelimitedDecoder {
81    /// Creates a `CharacterDelimitedDecoder` with the specified delimiter.
82    pub const fn new(delimiter: u8) -> Self {
83        CharacterDelimitedDecoder {
84            delimiter,
85            max_length: usize::MAX,
86        }
87    }
88
89    /// Creates a `CharacterDelimitedDecoder` with a maximum frame length limit.
90    ///
91    /// Any frames longer than `max_length` bytes will be discarded entirely.
92    pub const fn new_with_max_length(delimiter: u8, max_length: usize) -> Self {
93        CharacterDelimitedDecoder {
94            max_length,
95            ..CharacterDelimitedDecoder::new(delimiter)
96        }
97    }
98
99    /// Returns the maximum frame length when decoding.
100    pub const fn max_length(&self) -> usize {
101        self.max_length
102    }
103}
104
105impl Decoder for CharacterDelimitedDecoder {
106    type Item = Bytes;
107    type Error = BoxedFramingError;
108
109    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
110        loop {
111            // This function has the following goal: we are searching for
112            // sub-buffers delimited by `self.delimiter` with size no more than
113            // `self.max_length`. If a sub-buffer is found that exceeds
114            // `self.max_length` we discard it, else we return it. At the end of
115            // the buffer if the delimiter is not present the remainder of the
116            // buffer is discarded.
117            match memchr(self.delimiter, buf) {
118                None => return Ok(None),
119                Some(next_delimiter_idx) => {
120                    if next_delimiter_idx > self.max_length {
121                        // The discovered sub-buffer is too big, so we discard
122                        // it, taking care to also discard the delimiter.
123                        warn!(
124                            message = "Discarding frame larger than max_length.",
125                            buf_len = buf.len(),
126                            max_length = self.max_length
127                        );
128                        buf.advance(next_delimiter_idx + 1);
129                    } else {
130                        let frame = buf.split_to(next_delimiter_idx).freeze();
131                        trace!(
132                            message = "Decoding the frame.",
133                            bytes_processed = frame.len()
134                        );
135                        buf.advance(1); // scoot past the delimiter
136                        return Ok(Some(frame));
137                    }
138                }
139            }
140        }
141    }
142
143    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
144        match self.decode(buf)? {
145            Some(frame) => Ok(Some(frame)),
146            None => {
147                if buf.is_empty() {
148                    Ok(None)
149                } else if buf.len() > self.max_length {
150                    warn!(
151                        message = "Discarding frame larger than max_length.",
152                        buf_len = buf.len(),
153                        max_length = self.max_length
154                    );
155                    Ok(None)
156                } else {
157                    let bytes: Bytes = buf.split_to(buf.len()).freeze();
158                    Ok(Some(bytes))
159                }
160            }
161        }
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use std::collections::HashMap;
168
169    use bytes::BufMut;
170    use indoc::indoc;
171
172    use super::*;
173
174    #[test]
175    fn decode() {
176        let mut codec = CharacterDelimitedDecoder::new(b'\n');
177        let buf = &mut BytesMut::new();
178        buf.put_slice(b"abc\n");
179        assert_eq!(Some("abc".into()), codec.decode(buf).unwrap());
180    }
181
182    #[test]
183    fn decode_max_length() {
184        const MAX_LENGTH: usize = 6;
185
186        let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
187        let buf = &mut BytesMut::new();
188
189        // limit is 6 so it will skip longer lines
190        buf.put_slice(b"1234567\n123456\n123412314\n123");
191
192        assert_eq!(codec.decode(buf).unwrap(), Some(Bytes::from("123456")));
193        assert_eq!(codec.decode(buf).unwrap(), None);
194
195        let buf = &mut BytesMut::new();
196
197        // limit is 6 so it will skip longer lines
198        buf.put_slice(b"1234567\n123456\n123412314\n123");
199
200        assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123456")));
201        assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123")));
202        assert_eq!(codec.decode_eof(buf).unwrap(), None);
203    }
204
205    // Regression test for [infinite loop bug](https://github.com/vectordotdev/vector/issues/2564)
206    // Derived from https://github.com/tokio-rs/tokio/issues/1483
207    #[test]
208    fn decode_discard_repeat() {
209        const MAX_LENGTH: usize = 1;
210
211        let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
212        let buf = &mut BytesMut::new();
213
214        buf.reserve(200);
215        buf.put(&b"aa"[..]);
216        assert!(codec.decode(buf).unwrap().is_none());
217        buf.put(&b"a"[..]);
218        assert!(codec.decode(buf).unwrap().is_none());
219    }
220
221    #[test]
222    fn decode_json_escaped() {
223        let mut input = HashMap::new();
224        input.insert("key", "value");
225        input.insert("new", "li\nne");
226
227        let mut bytes = serde_json::to_vec(&input).unwrap();
228        bytes.push(b'\n');
229
230        let mut codec = CharacterDelimitedDecoder::new(b'\n');
231        let buf = &mut BytesMut::new();
232
233        buf.reserve(bytes.len());
234        buf.extend(bytes);
235
236        let result = codec.decode(buf).unwrap();
237
238        assert!(result.is_some());
239        assert!(buf.is_empty());
240    }
241
242    #[test]
243    fn decode_json_multiline() {
244        let events = indoc! {r#"
245            {"log":"\u0009at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)\n","stream":"stdout","time":"2019-01-18T07:49:27.374616758Z"}
246            {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374640288Z"}
247            {"log":"\u0009at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)\n","stream":"stdout","time":"2019-01-18T07:49:27.374655505Z"}
248            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374671955Z"}
249            {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374690312Z"}
250            {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)\n","stream":"stdout","time":"2019-01-18T07:49:27.374704522Z"}
251            {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)\n","stream":"stdout","time":"2019-01-18T07:49:27.374718459Z"}
252            {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)\n","stream":"stdout","time":"2019-01-18T07:49:27.374732919Z"}
253            {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)\n","stream":"stdout","time":"2019-01-18T07:49:27.374750799Z"}
254            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374764819Z"}
255            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374778682Z"}
256            {"log":"\u0009at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\n","stream":"stdout","time":"2019-01-18T07:49:27.374792429Z"}
257            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374805985Z"}
258            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374819625Z"}
259            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374833335Z"}
260            {"log":"\u0009at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)\n","stream":"stdout","time":"2019-01-18T07:49:27.374847845Z"}
261            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374861925Z"}
262            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37487589Z"}
263            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374890043Z"}
264            {"log":"\u0009at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)\n","stream":"stdout","time":"2019-01-18T07:49:27.374903813Z"}
265            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374917793Z"}
266            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374931586Z"}
267            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374946006Z"}
268            {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)\n","stream":"stdout","time":"2019-01-18T07:49:27.37496104Z"}
269            {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106)\n","stream":"stdout","time":"2019-01-18T07:49:27.37498773Z"}
270            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.375003113Z"}
271            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.375017063Z"}
272            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.37503086Z"}
273            {"log":"\u0009at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)\n","stream":"stdout","time":"2019-01-18T07:49:27.3750454Z"}
274            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.37505928Z"}
275            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37507306Z"}
276            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.375086726Z"}
277            {"log":"\u0009at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\n","stream":"stdout","time":"2019-01-18T07:49:27.375100817Z"}
278            {"log":"\u0009at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)\n","stream":"stdout","time":"2019-01-18T07:49:27.375115354Z"}
279            {"log":"\u0009at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)\n","stream":"stdout","time":"2019-01-18T07:49:27.375129454Z"}
280            {"log":"\u0009at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\n","stream":"stdout","time":"2019-01-18T07:49:27.375144001Z"}
281            {"log":"\u0009at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)\n","stream":"stdout","time":"2019-01-18T07:49:27.375157464Z"}
282            {"log":"\u0009at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\n","stream":"stdout","time":"2019-01-18T07:49:27.375170981Z"}
283            {"log":"\u0009at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)\n","stream":"stdout","time":"2019-01-18T07:49:27.375184417Z"}
284            {"log":"\u0009at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:800)\n","stream":"stdout","time":"2019-01-18T07:49:27.375198024Z"}
285            {"log":"\u0009at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\n","stream":"stdout","time":"2019-01-18T07:49:27.375211594Z"}
286            {"log":"\u0009at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)\n","stream":"stdout","time":"2019-01-18T07:49:27.375225237Z"}
287            {"log":"\u0009at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)\n","stream":"stdout","time":"2019-01-18T07:49:27.375239487Z"}
288            {"log":"\u0009at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\n","stream":"stdout","time":"2019-01-18T07:49:27.375253464Z"}
289            {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n","stream":"stdout","time":"2019-01-18T07:49:27.375323255Z"}
290            {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n","stream":"stdout","time":"2019-01-18T07:49:27.375345642Z"}
291            {"log":"\u0009at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\n","stream":"stdout","time":"2019-01-18T07:49:27.375363208Z"}
292            {"log":"\u0009at java.lang.Thread.run(Thread.java:748)\n","stream":"stdout","time":"2019-01-18T07:49:27.375377695Z"}
293            {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375391335Z"}
294            {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375416915Z"}
295            {"log":"2019-01-18 07:53:06.419 [               ]  INFO 1 --- [vent-bus.prod-1] c.t.listener.CommonListener              : warehousing Dailywarehousing.daily\n","stream":"stdout","time":"2019-01-18T07:53:06.420527437Z"}
296        "#};
297
298        let mut codec = CharacterDelimitedDecoder::new(b'\n');
299        let buf = &mut BytesMut::new();
300
301        buf.extend(events.to_string().as_bytes());
302
303        let mut i = 0;
304        while codec.decode(buf).unwrap().is_some() {
305            i += 1;
306        }
307
308        assert_eq!(i, 51);
309    }
310}