codecs/decoding/framing/
character_delimited.rs

1use bytes::{Buf, Bytes, BytesMut};
2use memchr::memchr;
3use tokio_util::codec::Decoder;
4use tracing::{trace, warn};
5use vector_config::configurable_component;
6
7use super::BoxedFramingError;
8
9/// Config used to build a `CharacterDelimitedDecoder`.
10#[configurable_component]
11#[derive(Debug, Clone)]
12pub struct CharacterDelimitedDecoderConfig {
13    /// Options for the character delimited decoder.
14    pub character_delimited: CharacterDelimitedDecoderOptions,
15}
16
17impl CharacterDelimitedDecoderConfig {
18    /// Creates a `CharacterDelimitedDecoderConfig` with the specified delimiter and default max length.
19    pub const fn new(delimiter: u8) -> Self {
20        Self {
21            character_delimited: CharacterDelimitedDecoderOptions::new(delimiter, None),
22        }
23    }
24    /// Build the `CharacterDelimitedDecoder` from this configuration.
25    pub const fn build(&self) -> CharacterDelimitedDecoder {
26        if let Some(max_length) = self.character_delimited.max_length {
27            CharacterDelimitedDecoder::new_with_max_length(
28                self.character_delimited.delimiter,
29                max_length,
30            )
31        } else {
32            CharacterDelimitedDecoder::new(self.character_delimited.delimiter)
33        }
34    }
35}
36
37/// Options for building a `CharacterDelimitedDecoder`.
38#[configurable_component]
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct CharacterDelimitedDecoderOptions {
41    /// The character that delimits byte sequences.
42    #[configurable(metadata(docs::type_override = "ascii_char"))]
43    #[serde(with = "vector_core::serde::ascii_char")]
44    pub delimiter: u8,
45
46    /// The maximum length of the byte buffer.
47    ///
48    /// This length does *not* include the trailing delimiter.
49    ///
50    /// By default, there is no maximum length enforced. If events are malformed, this can lead to
51    /// additional resource usage as events continue to be buffered in memory, and can potentially
52    /// lead to memory exhaustion in extreme cases.
53    ///
54    /// If there is a risk of processing malformed data, such as logs with user-controlled input,
55    /// consider setting the maximum length to a reasonably large value as a safety net. This
56    /// ensures that processing is not actually unbounded.
57    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
58    pub max_length: Option<usize>,
59}
60
61impl CharacterDelimitedDecoderOptions {
62    /// Create a `CharacterDelimitedDecoderOptions` with a delimiter and optional max_length.
63    pub const fn new(delimiter: u8, max_length: Option<usize>) -> Self {
64        Self {
65            delimiter,
66            max_length,
67        }
68    }
69}
70
71/// A decoder for handling bytes that are delimited by (a) chosen character(s).
72#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct CharacterDelimitedDecoder {
74    /// The delimiter used to separate byte sequences.
75    pub delimiter: u8,
76    /// The maximum length of the byte buffer.
77    pub max_length: usize,
78}
79
80impl CharacterDelimitedDecoder {
81    /// Creates a `CharacterDelimitedDecoder` with the specified delimiter.
82    pub const fn new(delimiter: u8) -> Self {
83        CharacterDelimitedDecoder {
84            delimiter,
85            max_length: usize::MAX,
86        }
87    }
88
89    /// Creates a `CharacterDelimitedDecoder` with a maximum frame length limit.
90    ///
91    /// Any frames longer than `max_length` bytes will be discarded entirely.
92    pub const fn new_with_max_length(delimiter: u8, max_length: usize) -> Self {
93        CharacterDelimitedDecoder {
94            max_length,
95            ..CharacterDelimitedDecoder::new(delimiter)
96        }
97    }
98
99    /// Returns the maximum frame length when decoding.
100    pub const fn max_length(&self) -> usize {
101        self.max_length
102    }
103}
104
105impl Decoder for CharacterDelimitedDecoder {
106    type Item = Bytes;
107    type Error = BoxedFramingError;
108
109    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
110        loop {
111            // This function has the following goal: we are searching for
112            // sub-buffers delimited by `self.delimiter` with size no more than
113            // `self.max_length`. If a sub-buffer is found that exceeds
114            // `self.max_length` we discard it, else we return it. At the end of
115            // the buffer if the delimiter is not present the remainder of the
116            // buffer is discarded.
117            match memchr(self.delimiter, buf) {
118                None => return Ok(None),
119                Some(next_delimiter_idx) => {
120                    if next_delimiter_idx > self.max_length {
121                        // The discovered sub-buffer is too big, so we discard
122                        // it, taking care to also discard the delimiter.
123                        warn!(
124                            message = "Discarding frame larger than max_length.",
125                            buf_len = buf.len(),
126                            max_length = self.max_length,
127                            internal_log_rate_limit = true
128                        );
129                        buf.advance(next_delimiter_idx + 1);
130                    } else {
131                        let frame = buf.split_to(next_delimiter_idx).freeze();
132                        trace!(
133                            message = "Decoding the frame.",
134                            bytes_processed = frame.len()
135                        );
136                        buf.advance(1); // scoot past the delimiter
137                        return Ok(Some(frame));
138                    }
139                }
140            }
141        }
142    }
143
144    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
145        match self.decode(buf)? {
146            Some(frame) => Ok(Some(frame)),
147            None => {
148                if buf.is_empty() {
149                    Ok(None)
150                } else if buf.len() > self.max_length {
151                    warn!(
152                        message = "Discarding frame larger than max_length.",
153                        buf_len = buf.len(),
154                        max_length = self.max_length,
155                        internal_log_rate_limit = true
156                    );
157                    Ok(None)
158                } else {
159                    let bytes: Bytes = buf.split_to(buf.len()).freeze();
160                    Ok(Some(bytes))
161                }
162            }
163        }
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::collections::HashMap;
170
171    use bytes::BufMut;
172    use indoc::indoc;
173
174    use super::*;
175
176    #[test]
177    fn decode() {
178        let mut codec = CharacterDelimitedDecoder::new(b'\n');
179        let buf = &mut BytesMut::new();
180        buf.put_slice(b"abc\n");
181        assert_eq!(Some("abc".into()), codec.decode(buf).unwrap());
182    }
183
184    #[test]
185    fn decode_max_length() {
186        const MAX_LENGTH: usize = 6;
187
188        let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
189        let buf = &mut BytesMut::new();
190
191        // limit is 6 so it will skip longer lines
192        buf.put_slice(b"1234567\n123456\n123412314\n123");
193
194        assert_eq!(codec.decode(buf).unwrap(), Some(Bytes::from("123456")));
195        assert_eq!(codec.decode(buf).unwrap(), None);
196
197        let buf = &mut BytesMut::new();
198
199        // limit is 6 so it will skip longer lines
200        buf.put_slice(b"1234567\n123456\n123412314\n123");
201
202        assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123456")));
203        assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123")));
204        assert_eq!(codec.decode_eof(buf).unwrap(), None);
205    }
206
207    // Regression test for [infinite loop bug](https://github.com/vectordotdev/vector/issues/2564)
208    // Derived from https://github.com/tokio-rs/tokio/issues/1483
209    #[test]
210    fn decode_discard_repeat() {
211        const MAX_LENGTH: usize = 1;
212
213        let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
214        let buf = &mut BytesMut::new();
215
216        buf.reserve(200);
217        buf.put(&b"aa"[..]);
218        assert!(codec.decode(buf).unwrap().is_none());
219        buf.put(&b"a"[..]);
220        assert!(codec.decode(buf).unwrap().is_none());
221    }
222
223    #[test]
224    fn decode_json_escaped() {
225        let mut input = HashMap::new();
226        input.insert("key", "value");
227        input.insert("new", "li\nne");
228
229        let mut bytes = serde_json::to_vec(&input).unwrap();
230        bytes.push(b'\n');
231
232        let mut codec = CharacterDelimitedDecoder::new(b'\n');
233        let buf = &mut BytesMut::new();
234
235        buf.reserve(bytes.len());
236        buf.extend(bytes);
237
238        let result = codec.decode(buf).unwrap();
239
240        assert!(result.is_some());
241        assert!(buf.is_empty());
242    }
243
244    #[test]
245    fn decode_json_multiline() {
246        let events = indoc! {r#"
247            {"log":"\u0009at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)\n","stream":"stdout","time":"2019-01-18T07:49:27.374616758Z"}
248            {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374640288Z"}
249            {"log":"\u0009at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)\n","stream":"stdout","time":"2019-01-18T07:49:27.374655505Z"}
250            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374671955Z"}
251            {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374690312Z"}
252            {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)\n","stream":"stdout","time":"2019-01-18T07:49:27.374704522Z"}
253            {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)\n","stream":"stdout","time":"2019-01-18T07:49:27.374718459Z"}
254            {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)\n","stream":"stdout","time":"2019-01-18T07:49:27.374732919Z"}
255            {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)\n","stream":"stdout","time":"2019-01-18T07:49:27.374750799Z"}
256            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374764819Z"}
257            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374778682Z"}
258            {"log":"\u0009at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\n","stream":"stdout","time":"2019-01-18T07:49:27.374792429Z"}
259            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374805985Z"}
260            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374819625Z"}
261            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374833335Z"}
262            {"log":"\u0009at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)\n","stream":"stdout","time":"2019-01-18T07:49:27.374847845Z"}
263            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374861925Z"}
264            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37487589Z"}
265            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374890043Z"}
266            {"log":"\u0009at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)\n","stream":"stdout","time":"2019-01-18T07:49:27.374903813Z"}
267            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374917793Z"}
268            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374931586Z"}
269            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374946006Z"}
270            {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)\n","stream":"stdout","time":"2019-01-18T07:49:27.37496104Z"}
271            {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106)\n","stream":"stdout","time":"2019-01-18T07:49:27.37498773Z"}
272            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.375003113Z"}
273            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.375017063Z"}
274            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.37503086Z"}
275            {"log":"\u0009at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)\n","stream":"stdout","time":"2019-01-18T07:49:27.3750454Z"}
276            {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.37505928Z"}
277            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37507306Z"}
278            {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.375086726Z"}
279            {"log":"\u0009at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\n","stream":"stdout","time":"2019-01-18T07:49:27.375100817Z"}
280            {"log":"\u0009at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)\n","stream":"stdout","time":"2019-01-18T07:49:27.375115354Z"}
281            {"log":"\u0009at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)\n","stream":"stdout","time":"2019-01-18T07:49:27.375129454Z"}
282            {"log":"\u0009at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\n","stream":"stdout","time":"2019-01-18T07:49:27.375144001Z"}
283            {"log":"\u0009at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)\n","stream":"stdout","time":"2019-01-18T07:49:27.375157464Z"}
284            {"log":"\u0009at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\n","stream":"stdout","time":"2019-01-18T07:49:27.375170981Z"}
285            {"log":"\u0009at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)\n","stream":"stdout","time":"2019-01-18T07:49:27.375184417Z"}
286            {"log":"\u0009at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:800)\n","stream":"stdout","time":"2019-01-18T07:49:27.375198024Z"}
287            {"log":"\u0009at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\n","stream":"stdout","time":"2019-01-18T07:49:27.375211594Z"}
288            {"log":"\u0009at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)\n","stream":"stdout","time":"2019-01-18T07:49:27.375225237Z"}
289            {"log":"\u0009at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)\n","stream":"stdout","time":"2019-01-18T07:49:27.375239487Z"}
290            {"log":"\u0009at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\n","stream":"stdout","time":"2019-01-18T07:49:27.375253464Z"}
291            {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n","stream":"stdout","time":"2019-01-18T07:49:27.375323255Z"}
292            {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n","stream":"stdout","time":"2019-01-18T07:49:27.375345642Z"}
293            {"log":"\u0009at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\n","stream":"stdout","time":"2019-01-18T07:49:27.375363208Z"}
294            {"log":"\u0009at java.lang.Thread.run(Thread.java:748)\n","stream":"stdout","time":"2019-01-18T07:49:27.375377695Z"}
295            {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375391335Z"}
296            {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375416915Z"}
297            {"log":"2019-01-18 07:53:06.419 [               ]  INFO 1 --- [vent-bus.prod-1] c.t.listener.CommonListener              : warehousing Dailywarehousing.daily\n","stream":"stdout","time":"2019-01-18T07:53:06.420527437Z"}
298        "#};
299
300        let mut codec = CharacterDelimitedDecoder::new(b'\n');
301        let buf = &mut BytesMut::new();
302
303        buf.extend(events.to_string().as_bytes());
304
305        let mut i = 0;
306        while codec.decode(buf).unwrap().is_some() {
307            i += 1;
308        }
309
310        assert_eq!(i, 51);
311    }
312}