codecs/decoding/framing/
octet_counting.rs

1use std::io;
2
3use bytes::{Buf, Bytes, BytesMut};
4use derivative::Derivative;
5use tokio_util::codec::{LinesCodec, LinesCodecError};
6use tracing::trace;
7use vector_config::configurable_component;
8
9use super::BoxedFramingError;
10
11/// Config used to build a `OctetCountingDecoder`.
12#[configurable_component]
13#[derive(Debug, Clone, Default)]
14pub struct OctetCountingDecoderConfig {
15    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
16    /// Options for the octet counting decoder.
17    pub octet_counting: OctetCountingDecoderOptions,
18}
19
20impl OctetCountingDecoderConfig {
21    /// Build the `OctetCountingDecoder` from this configuration.
22    pub fn build(&self) -> OctetCountingDecoder {
23        if let Some(max_length) = self.octet_counting.max_length {
24            OctetCountingDecoder::new_with_max_length(max_length)
25        } else {
26            OctetCountingDecoder::new()
27        }
28    }
29}
30
31/// Options for building a `OctetCountingDecoder`.
32#[configurable_component]
33#[derive(Clone, Debug, Derivative, PartialEq, Eq)]
34#[derivative(Default)]
35pub struct OctetCountingDecoderOptions {
36    /// The maximum length of the byte buffer.
37    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
38    pub max_length: Option<usize>,
39}
40
41/// Codec using the `Octet Counting` format as specified in
42/// <https://tools.ietf.org/html/rfc6587#section-3.4.1>.
43#[derive(Clone, Debug)]
44pub struct OctetCountingDecoder {
45    other: LinesCodec,
46    octet_decoding: Option<State>,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub enum State {
51    NotDiscarding,
52    Discarding(usize),
53    DiscardingToEol,
54}
55
56impl OctetCountingDecoder {
57    /// Creates a new `OctetCountingDecoder`.
58    pub fn new() -> Self {
59        Self {
60            other: LinesCodec::new(),
61            octet_decoding: None,
62        }
63    }
64
65    /// Creates a `OctetCountingDecoder` with a maximum frame length limit.
66    pub fn new_with_max_length(max_length: usize) -> Self {
67        Self {
68            other: LinesCodec::new_with_max_length(max_length),
69            octet_decoding: None,
70        }
71    }
72
73    /// Decode a frame.
74    fn octet_decode(
75        &mut self,
76        state: State,
77        src: &mut BytesMut,
78    ) -> Result<Option<Bytes>, LinesCodecError> {
79        // Encoding scheme:
80        //
81        // len ' ' data
82        // |    |  | len number of bytes that contain syslog message
83        // |    |
84        // |    | Separating whitespace
85        // |
86        // | ASCII decimal number of unknown length
87
88        let space_pos = src.iter().position(|&b| b == b' ');
89
90        // If we are discarding, discard to the next newline.
91        let newline_pos = src.iter().position(|&b| b == b'\n');
92
93        match (state, newline_pos, space_pos) {
94            (State::Discarding(chars), _, _) if src.len() >= chars => {
95                // We have a certain number of chars to discard.
96                //
97                // There are enough chars in this frame to discard
98                src.advance(chars);
99                self.octet_decoding = None;
100                Err(LinesCodecError::Io(io::Error::other(
101                    "Frame length limit exceeded",
102                )))
103            }
104
105            (State::Discarding(chars), _, _) => {
106                // We have a certain number of chars to discard.
107                //
108                // There aren't enough in this frame so we need to discard the
109                // entire frame and adjust the amount to discard accordingly.
110                self.octet_decoding = Some(State::Discarding(src.len() - chars));
111                src.advance(src.len());
112                Ok(None)
113            }
114
115            (State::DiscardingToEol, Some(offset), _) => {
116                // When discarding we keep discarding to the next newline.
117                src.advance(offset + 1);
118                self.octet_decoding = None;
119                Err(LinesCodecError::Io(io::Error::other(
120                    "Frame length limit exceeded",
121                )))
122            }
123
124            (State::DiscardingToEol, None, _) => {
125                // There is no newline in this frame.
126                //
127                // Since we don't have a set number of chars we want to discard,
128                // we need to discard to the next newline. Advance as far as we
129                // can to discard the entire frame.
130                src.advance(src.len());
131                Ok(None)
132            }
133
134            (State::NotDiscarding, _, Some(space_pos)) if space_pos < self.other.max_length() => {
135                // Everything looks good.
136                //
137                // We aren't discarding, we have a space that is not beyond our
138                // maximum length. Attempt to parse the bytes as a number which
139                // will hopefully give us a sensible length for our message.
140                let len: usize = match std::str::from_utf8(&src[..space_pos])
141                    .map_err(|_| ())
142                    .and_then(|num| num.parse().map_err(|_| ()))
143                {
144                    Ok(len) => len,
145                    Err(_) => {
146                        // It was not a sensible number.
147                        //
148                        // Advance the buffer past the erroneous bytes to
149                        // prevent us getting stuck in an infinite loop.
150                        src.advance(space_pos + 1);
151                        self.octet_decoding = None;
152                        return Err(LinesCodecError::Io(io::Error::new(
153                            io::ErrorKind::InvalidData,
154                            "Unable to decode message len as number",
155                        )));
156                    }
157                };
158
159                let from = space_pos + 1;
160                let to = from + len;
161
162                if len > self.other.max_length() {
163                    // The length is greater than we want.
164                    //
165                    // We need to discard the entire message.
166                    self.octet_decoding = Some(State::Discarding(len));
167                    src.advance(space_pos + 1);
168
169                    Ok(None)
170                } else if let Some(msg) = src.get(from..to) {
171                    let bytes = match std::str::from_utf8(msg) {
172                        Ok(_) => Bytes::copy_from_slice(msg),
173                        Err(_) => {
174                            // The data was not valid UTF8 :-(.
175                            //
176                            // Advance the buffer past the erroneous bytes to
177                            // prevent us getting stuck in an infinite loop.
178                            src.advance(to);
179                            self.octet_decoding = None;
180                            return Err(LinesCodecError::Io(io::Error::new(
181                                io::ErrorKind::InvalidData,
182                                "Unable to decode message as UTF8",
183                            )));
184                        }
185                    };
186
187                    // We have managed to read the entire message as valid UTF8!
188                    src.advance(to);
189                    self.octet_decoding = None;
190                    Ok(Some(bytes))
191                } else {
192                    // We have an acceptable number of bytes in this message,
193                    // but not all the data was in the frame.
194                    //
195                    // Return `None` to indicate we want more data before we do
196                    // anything else.
197                    Ok(None)
198                }
199            }
200
201            (State::NotDiscarding, Some(newline_pos), _) => {
202                // Beyond maximum length, advance to the newline.
203                src.advance(newline_pos + 1);
204                Err(LinesCodecError::Io(io::Error::other(
205                    "Frame length limit exceeded",
206                )))
207            }
208
209            (State::NotDiscarding, None, _) if src.len() < self.other.max_length() => {
210                // We aren't discarding, but there is no useful character to
211                // tell us what to do next.
212                //
213                // We are still not beyond the max length, so just return `None`
214                // to indicate we need to wait for more data.
215                Ok(None)
216            }
217
218            (State::NotDiscarding, None, _) => {
219                // There is no newline in this frame and we have more data than
220                // we want to handle.
221                //
222                // Advance as far as we can to discard the entire frame.
223                self.octet_decoding = Some(State::DiscardingToEol);
224                src.advance(src.len());
225                Ok(None)
226            }
227        }
228    }
229
230    /// `None` if this is not octet counting encoded.
231    fn checked_decode(
232        &mut self,
233        src: &mut BytesMut,
234    ) -> Option<Result<Option<Bytes>, LinesCodecError>> {
235        if let Some(&first_byte) = src.first() {
236            if (49..=57).contains(&first_byte) {
237                // First character is non zero number so we can assume that
238                // octet count framing is used.
239                trace!("Octet counting encoded event detected.");
240                self.octet_decoding = Some(State::NotDiscarding);
241            }
242        }
243
244        self.octet_decoding
245            .map(|state| self.octet_decode(state, src))
246    }
247}
248
249impl Default for OctetCountingDecoder {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255impl tokio_util::codec::Decoder for OctetCountingDecoder {
256    type Item = Bytes;
257    type Error = BoxedFramingError;
258
259    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
260        if let Some(ret) = self.checked_decode(src) {
261            ret
262        } else {
263            // Octet counting isn't used so fallback to newline codec.
264            self.other
265                .decode(src)
266                .map(|line| line.map(|line| line.into()))
267        }
268        .map_err(Into::into)
269    }
270
271    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
272        if let Some(ret) = self.checked_decode(buf) {
273            ret
274        } else {
275            // Octet counting isn't used so fallback to newline codec.
276            self.other
277                .decode_eof(buf)
278                .map(|line| line.map(|line| line.into()))
279        }
280        .map_err(Into::into)
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    #![allow(clippy::print_stdout)]
287
288    use bytes::BufMut;
289    use tokio_util::codec::Decoder;
290
291    use super::*;
292
293    #[test]
294    fn non_octet_decode_works_with_multiple_frames() {
295        let mut decoder = OctetCountingDecoder::new_with_max_length(128);
296        let mut buffer = BytesMut::with_capacity(16);
297
298        buffer.put(&b"<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were "[..]);
299        let result = decoder.decode(&mut buffer);
300        assert_eq!(Ok(None), result.map_err(|_| true));
301
302        buffer.put(&b"8 penguins in the shop.\n"[..]);
303        let result = decoder.decode(&mut buffer);
304        assert_eq!(
305            Ok(Some("<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were 8 penguins in the shop.".into())),
306            result.map_err(|_| true)
307        );
308    }
309
310    #[test]
311    fn octet_decode_works_with_multiple_frames() {
312        let mut decoder = OctetCountingDecoder::new_with_max_length(30);
313        let mut buffer = BytesMut::with_capacity(16);
314
315        buffer.put(&b"28 abcdefghijklm"[..]);
316        let result = decoder.decode(&mut buffer);
317        assert_eq!(Ok(None), result.map_err(|_| false));
318
319        // Sending another frame starting with a number should not cause it to
320        // try to decode a new message.
321        buffer.put(&b"3 nopqrstuvwxyz"[..]);
322        let result = decoder.decode(&mut buffer);
323        assert_eq!(
324            Ok(Some("abcdefghijklm3 nopqrstuvwxyz".into())),
325            result.map_err(|_| false)
326        );
327    }
328
329    #[test]
330    fn octet_decode_moves_past_invalid_length() {
331        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
332        let mut buffer = BytesMut::with_capacity(16);
333
334        // An invalid syslog message that starts with a digit so we think it is starting with the len.
335        buffer.put(&b"232>1 zork"[..]);
336        let result = decoder.decode(&mut buffer);
337
338        assert!(result.is_err());
339        assert_eq!(b"zork"[..], buffer);
340    }
341
342    #[test]
343    fn octet_decode_moves_past_invalid_utf8() {
344        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
345        let mut buffer = BytesMut::with_capacity(16);
346
347        // An invalid syslog message containing invalid utf8 bytes.
348        buffer.put(&[b'4', b' ', 0xf0, 0x28, 0x8c, 0xbc][..]);
349        let result = decoder.decode(&mut buffer);
350
351        assert!(result.is_err());
352        assert_eq!(b""[..], buffer);
353    }
354
355    #[test]
356    fn octet_decode_moves_past_exceeded_frame_length() {
357        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
358        let mut buffer = BytesMut::with_capacity(32);
359
360        buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit\n"[..]);
361        let result = decoder.decode(&mut buffer);
362
363        assert!(result.is_err());
364        assert_eq!(b""[..], buffer);
365    }
366
367    #[test]
368    fn octet_decode_rejects_exceeded_frame_length() {
369        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
370        let mut buffer = BytesMut::with_capacity(32);
371
372        buffer.put(&b"26 abcdefghijklmnopqrstuvwxyzand here we are"[..]);
373        let result = decoder.decode(&mut buffer);
374        assert_eq!(Ok(None), result.map_err(|_| false));
375        let result = decoder.decode(&mut buffer);
376
377        assert!(result.is_err());
378        assert_eq!(b"and here we are"[..], buffer);
379    }
380
381    #[test]
382    fn octet_decode_rejects_exceeded_frame_length_multiple_frames() {
383        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
384        let mut buffer = BytesMut::with_capacity(32);
385
386        buffer.put(&b"26 abc"[..]);
387        let _result = decoder.decode(&mut buffer);
388
389        buffer.put(&b"defghijklmnopqrstuvwxyzand here we are"[..]);
390        let result = decoder.decode(&mut buffer);
391
392        println!("{result:?}");
393        assert!(result.is_err());
394        assert_eq!(b"and here we are"[..], buffer);
395    }
396
397    #[test]
398    fn octet_decode_moves_past_exceeded_frame_length_multiple_frames() {
399        let mut decoder = OctetCountingDecoder::new_with_max_length(16);
400        let mut buffer = BytesMut::with_capacity(32);
401
402        buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit"[..]);
403        _ = decoder.decode(&mut buffer);
404
405        assert_eq!(decoder.octet_decoding, Some(State::DiscardingToEol));
406        buffer.put(&b"wemustcontinuetodiscard\n32 something valid"[..]);
407        let result = decoder.decode(&mut buffer);
408
409        assert!(result.is_err());
410        assert_eq!(b"32 something valid"[..], buffer);
411    }
412}