codecs/decoding/framing/
octet_counting.rs

1use std::io;
2
3use bytes::{Buf, Bytes, BytesMut};
4use tokio_util::codec::{LinesCodec, LinesCodecError};
5use tracing::trace;
6use vector_config::configurable_component;
7
8use super::BoxedFramingError;
9
10/// Config used to build a `OctetCountingDecoder`.
11#[configurable_component]
12#[derive(Debug, Clone, Default)]
13pub struct OctetCountingDecoderConfig {
14    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
15    /// Options for the octet counting decoder.
16    pub octet_counting: OctetCountingDecoderOptions,
17}
18
19impl OctetCountingDecoderConfig {
20    /// Build the `OctetCountingDecoder` from this configuration.
21    pub fn build(&self) -> OctetCountingDecoder {
22        if let Some(max_length) = self.octet_counting.max_length {
23            OctetCountingDecoder::new_with_max_length(max_length)
24        } else {
25            OctetCountingDecoder::new()
26        }
27    }
28}
29
30/// Options for building a `OctetCountingDecoder`.
31#[configurable_component]
32#[derive(Clone, Debug, Default, PartialEq, Eq)]
33pub struct OctetCountingDecoderOptions {
34    /// The maximum length of the byte buffer.
35    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
36    pub max_length: Option<usize>,
37}
38
39/// Codec using the `Octet Counting` format as specified in
40/// <https://tools.ietf.org/html/rfc6587#section-3.4.1>.
41#[derive(Clone, Debug)]
42pub struct OctetCountingDecoder {
43    other: LinesCodec,
44    octet_decoding: Option<State>,
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum State {
49    NotDiscarding,
50    Discarding(usize),
51    DiscardingToEol,
52}
53
54impl OctetCountingDecoder {
55    /// Creates a new `OctetCountingDecoder`.
56    pub fn new() -> Self {
57        Self {
58            other: LinesCodec::new(),
59            octet_decoding: None,
60        }
61    }
62
63    /// Creates a `OctetCountingDecoder` with a maximum frame length limit.
64    pub fn new_with_max_length(max_length: usize) -> Self {
65        Self {
66            other: LinesCodec::new_with_max_length(max_length),
67            octet_decoding: None,
68        }
69    }
70
71    /// Decode a frame.
72    fn octet_decode(
73        &mut self,
74        state: State,
75        src: &mut BytesMut,
76    ) -> Result<Option<Bytes>, LinesCodecError> {
77        // Encoding scheme:
78        //
79        // len ' ' data
80        // |    |  | len number of bytes that contain syslog message
81        // |    |
82        // |    | Separating whitespace
83        // |
84        // | ASCII decimal number of unknown length
85
86        let space_pos = src.iter().position(|&b| b == b' ');
87
88        // If we are discarding, discard to the next newline.
89        let newline_pos = src.iter().position(|&b| b == b'\n');
90
91        match (state, newline_pos, space_pos) {
92            (State::Discarding(chars), _, _) if src.len() >= chars => {
93                // We have a certain number of chars to discard.
94                //
95                // There are enough chars in this frame to discard
96                src.advance(chars);
97                self.octet_decoding = None;
98                Err(LinesCodecError::Io(io::Error::other(
99                    "Frame length limit exceeded",
100                )))
101            }
102
103            (State::Discarding(chars), _, _) => {
104                // We have a certain number of chars to discard.
105                //
106                // There aren't enough in this frame so we need to discard the
107                // entire frame and adjust the amount to discard accordingly.
108                self.octet_decoding = Some(State::Discarding(src.len() - chars));
109                src.advance(src.len());
110                Ok(None)
111            }
112
113            (State::DiscardingToEol, Some(offset), _) => {
114                // When discarding we keep discarding to the next newline.
115                src.advance(offset + 1);
116                self.octet_decoding = None;
117                Err(LinesCodecError::Io(io::Error::other(
118                    "Frame length limit exceeded",
119                )))
120            }
121
122            (State::DiscardingToEol, None, _) => {
123                // There is no newline in this frame.
124                //
125                // Since we don't have a set number of chars we want to discard,
126                // we need to discard to the next newline. Advance as far as we
127                // can to discard the entire frame.
128                src.advance(src.len());
129                Ok(None)
130            }
131
132            (State::NotDiscarding, _, Some(space_pos)) if space_pos < self.other.max_length() => {
133                // Everything looks good.
134                //
135                // We aren't discarding, we have a space that is not beyond our
136                // maximum length. Attempt to parse the bytes as a number which
137                // will hopefully give us a sensible length for our message.
138                let len: usize = match std::str::from_utf8(&src[..space_pos])
139                    .map_err(|_| ())
140                    .and_then(|num| num.parse().map_err(|_| ()))
141                {
142                    Ok(len) => len,
143                    Err(_) => {
144                        // It was not a sensible number.
145                        //
146                        // Advance the buffer past the erroneous bytes to
147                        // prevent us getting stuck in an infinite loop.
148                        src.advance(space_pos + 1);
149                        self.octet_decoding = None;
150                        return Err(LinesCodecError::Io(io::Error::new(
151                            io::ErrorKind::InvalidData,
152                            "Unable to decode message len as number",
153                        )));
154                    }
155                };
156
157                let from = space_pos + 1;
158                let to = from + len;
159
160                if len > self.other.max_length() {
161                    // The length is greater than we want.
162                    //
163                    // We need to discard the entire message.
164                    self.octet_decoding = Some(State::Discarding(len));
165                    src.advance(space_pos + 1);
166
167                    Ok(None)
168                } else if let Some(msg) = src.get(from..to) {
169                    let bytes = match std::str::from_utf8(msg) {
170                        Ok(_) => Bytes::copy_from_slice(msg),
171                        Err(_) => {
172                            // The data was not valid UTF8 :-(.
173                            //
174                            // Advance the buffer past the erroneous bytes to
175                            // prevent us getting stuck in an infinite loop.
176                            src.advance(to);
177                            self.octet_decoding = None;
178                            return Err(LinesCodecError::Io(io::Error::new(
179                                io::ErrorKind::InvalidData,
180                                "Unable to decode message as UTF8",
181                            )));
182                        }
183                    };
184
185                    // We have managed to read the entire message as valid UTF8!
186                    src.advance(to);
187                    self.octet_decoding = None;
188                    Ok(Some(bytes))
189                } else {
190                    // We have an acceptable number of bytes in this message,
191                    // but not all the data was in the frame.
192                    //
193                    // Return `None` to indicate we want more data before we do
194                    // anything else.
195                    Ok(None)
196                }
197            }
198
199            (State::NotDiscarding, Some(newline_pos), _) => {
200                // Beyond maximum length, advance to the newline.
201                src.advance(newline_pos + 1);
202                Err(LinesCodecError::Io(io::Error::other(
203                    "Frame length limit exceeded",
204                )))
205            }
206
207            (State::NotDiscarding, None, _) if src.len() < self.other.max_length() => {
208                // We aren't discarding, but there is no useful character to
209                // tell us what to do next.
210                //
211                // We are still not beyond the max length, so just return `None`
212                // to indicate we need to wait for more data.
213                Ok(None)
214            }
215
216            (State::NotDiscarding, None, _) => {
217                // There is no newline in this frame and we have more data than
218                // we want to handle.
219                //
220                // Advance as far as we can to discard the entire frame.
221                self.octet_decoding = Some(State::DiscardingToEol);
222                src.advance(src.len());
223                Ok(None)
224            }
225        }
226    }
227
228    /// `None` if this is not octet counting encoded.
229    fn checked_decode(
230        &mut self,
231        src: &mut BytesMut,
232    ) -> Option<Result<Option<Bytes>, LinesCodecError>> {
233        if let Some(&first_byte) = src.first()
234            && (49..=57).contains(&first_byte)
235        {
236            // First character is non zero number so we can assume that
237            // octet count framing is used.
238            trace!("Octet counting encoded event detected.");
239            self.octet_decoding = Some(State::NotDiscarding);
240        }
241
242        self.octet_decoding
243            .map(|state| self.octet_decode(state, src))
244    }
245}
246
247impl Default for OctetCountingDecoder {
248    fn default() -> Self {
249        Self::new()
250    }
251}
252
253impl tokio_util::codec::Decoder for OctetCountingDecoder {
254    type Item = Bytes;
255    type Error = BoxedFramingError;
256
257    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
258        if let Some(ret) = self.checked_decode(src) {
259            ret
260        } else {
261            // Octet counting isn't used so fallback to newline codec.
262            self.other
263                .decode(src)
264                .map(|line| line.map(|line| line.into()))
265        }
266        .map_err(Into::into)
267    }
268
269    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
270        if let Some(ret) = self.checked_decode(buf) {
271            ret
272        } else {
273            // Octet counting isn't used so fallback to newline codec.
274            self.other
275                .decode_eof(buf)
276                .map(|line| line.map(|line| line.into()))
277        }
278        .map_err(Into::into)
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    #![allow(clippy::print_stdout)]
285
286    use bytes::BufMut;
287    use tokio_util::codec::Decoder;
288
289    use super::*;
290
291    #[test]
292    fn non_octet_decode_works_with_multiple_frames() {
293        let mut decoder = OctetCountingDecoder::new_with_max_length(128);
294        let mut buffer = BytesMut::with_capacity(16);
295
296        buffer.put(&b"<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were "[..]);
297        let result = decoder.decode(&mut buffer);
298        assert_eq!(Ok(None), result.map_err(|_| true));
299
300        buffer.put(&b"8 penguins in the shop.\n"[..]);
301        let result = decoder.decode(&mut buffer);
302        assert_eq!(
303            Ok(Some("<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were 8 penguins in the shop.".into())),
304            result.map_err(|_| true)
305        );
306    }
307
308    #[test]
309    fn octet_decode_works_with_multiple_frames() {
310        let mut decoder = OctetCountingDecoder::new_with_max_length(30);
311        let mut buffer = BytesMut::with_capacity(16);
312
313        buffer.put(&b"28 abcdefghijklm"[..]);
314        let result = decoder.decode(&mut buffer);
315        assert_eq!(Ok(None), result.map_err(|_| false));
316
317        // Sending another frame starting with a number should not cause it to
318        // try to decode a new message.
319        buffer.put(&b"3 nopqrstuvwxyz"[..]);
320        let result = decoder.decode(&mut buffer);
321        assert_eq!(
322            Ok(Some("abcdefghijklm3 nopqrstuvwxyz".into())),
323            result.map_err(|_| false)
324        );
325    }
326
327    #[test]
328    fn octet_decode_moves_past_invalid_length() {
329        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
330        let mut buffer = BytesMut::with_capacity(16);
331
332        // An invalid syslog message that starts with a digit so we think it is starting with the len.
333        buffer.put(&b"232>1 zork"[..]);
334        let result = decoder.decode(&mut buffer);
335
336        assert!(result.is_err());
337        assert_eq!(b"zork"[..], buffer);
338    }
339
340    #[test]
341    fn octet_decode_moves_past_invalid_utf8() {
342        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
343        let mut buffer = BytesMut::with_capacity(16);
344
345        // An invalid syslog message containing invalid utf8 bytes.
346        buffer.put(&[b'4', b' ', 0xf0, 0x28, 0x8c, 0xbc][..]);
347        let result = decoder.decode(&mut buffer);
348
349        assert!(result.is_err());
350        assert_eq!(b""[..], buffer);
351    }
352
353    #[test]
354    fn octet_decode_moves_past_exceeded_frame_length() {
355        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
356        let mut buffer = BytesMut::with_capacity(32);
357
358        buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit\n"[..]);
359        let result = decoder.decode(&mut buffer);
360
361        assert!(result.is_err());
362        assert_eq!(b""[..], buffer);
363    }
364
365    #[test]
366    fn octet_decode_rejects_exceeded_frame_length() {
367        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
368        let mut buffer = BytesMut::with_capacity(32);
369
370        buffer.put(&b"26 abcdefghijklmnopqrstuvwxyzand here we are"[..]);
371        let result = decoder.decode(&mut buffer);
372        assert_eq!(Ok(None), result.map_err(|_| false));
373        let result = decoder.decode(&mut buffer);
374
375        assert!(result.is_err());
376        assert_eq!(b"and here we are"[..], buffer);
377    }
378
379    #[test]
380    fn octet_decode_rejects_exceeded_frame_length_multiple_frames() {
381        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
382        let mut buffer = BytesMut::with_capacity(32);
383
384        buffer.put(&b"26 abc"[..]);
385        let _result = decoder.decode(&mut buffer);
386
387        buffer.put(&b"defghijklmnopqrstuvwxyzand here we are"[..]);
388        let result = decoder.decode(&mut buffer);
389
390        println!("{result:?}");
391        assert!(result.is_err());
392        assert_eq!(b"and here we are"[..], buffer);
393    }
394
395    #[test]
396    fn octet_decode_moves_past_exceeded_frame_length_multiple_frames() {
397        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
398        let mut buffer = BytesMut::with_capacity(32);
399
400        buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit"[..]);
401        _ = decoder.decode(&mut buffer);
402
403        assert_eq!(decoder.octet_decoding, Some(State::DiscardingToEol));
404        buffer.put(&b"wemustcontinuetodiscard\n32 something valid"[..]);
405        let result = decoder.decode(&mut buffer);
406
407        assert!(result.is_err());
408        assert_eq!(b"32 something valid"[..], buffer);
409    }
410}