codecs/decoding/framing/
octet_counting.rs1use std::io;
2
3use bytes::{Buf, Bytes, BytesMut};
4use derivative::Derivative;
5use tokio_util::codec::{LinesCodec, LinesCodecError};
6use tracing::trace;
7use vector_config::configurable_component;
8
9use super::BoxedFramingError;
10
11#[configurable_component]
13#[derive(Debug, Clone, Default)]
14pub struct OctetCountingDecoderConfig {
15 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
16 pub octet_counting: OctetCountingDecoderOptions,
18}
19
20impl OctetCountingDecoderConfig {
21 pub fn build(&self) -> OctetCountingDecoder {
23 if let Some(max_length) = self.octet_counting.max_length {
24 OctetCountingDecoder::new_with_max_length(max_length)
25 } else {
26 OctetCountingDecoder::new()
27 }
28 }
29}
30
31#[configurable_component]
33#[derive(Clone, Debug, Derivative, PartialEq, Eq)]
34#[derivative(Default)]
35pub struct OctetCountingDecoderOptions {
36 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
38 pub max_length: Option<usize>,
39}
40
41#[derive(Clone, Debug)]
44pub struct OctetCountingDecoder {
45 other: LinesCodec,
46 octet_decoding: Option<State>,
47}
48
49#[derive(Clone, Copy, Debug, PartialEq, Eq)]
50pub enum State {
51 NotDiscarding,
52 Discarding(usize),
53 DiscardingToEol,
54}
55
56impl OctetCountingDecoder {
57 pub fn new() -> Self {
59 Self {
60 other: LinesCodec::new(),
61 octet_decoding: None,
62 }
63 }
64
65 pub fn new_with_max_length(max_length: usize) -> Self {
67 Self {
68 other: LinesCodec::new_with_max_length(max_length),
69 octet_decoding: None,
70 }
71 }
72
73 fn octet_decode(
75 &mut self,
76 state: State,
77 src: &mut BytesMut,
78 ) -> Result<Option<Bytes>, LinesCodecError> {
79 let space_pos = src.iter().position(|&b| b == b' ');
89
90 let newline_pos = src.iter().position(|&b| b == b'\n');
92
93 match (state, newline_pos, space_pos) {
94 (State::Discarding(chars), _, _) if src.len() >= chars => {
95 src.advance(chars);
99 self.octet_decoding = None;
100 Err(LinesCodecError::Io(io::Error::other(
101 "Frame length limit exceeded",
102 )))
103 }
104
105 (State::Discarding(chars), _, _) => {
106 self.octet_decoding = Some(State::Discarding(src.len() - chars));
111 src.advance(src.len());
112 Ok(None)
113 }
114
115 (State::DiscardingToEol, Some(offset), _) => {
116 src.advance(offset + 1);
118 self.octet_decoding = None;
119 Err(LinesCodecError::Io(io::Error::other(
120 "Frame length limit exceeded",
121 )))
122 }
123
124 (State::DiscardingToEol, None, _) => {
125 src.advance(src.len());
131 Ok(None)
132 }
133
134 (State::NotDiscarding, _, Some(space_pos)) if space_pos < self.other.max_length() => {
135 let len: usize = match std::str::from_utf8(&src[..space_pos])
141 .map_err(|_| ())
142 .and_then(|num| num.parse().map_err(|_| ()))
143 {
144 Ok(len) => len,
145 Err(_) => {
146 src.advance(space_pos + 1);
151 self.octet_decoding = None;
152 return Err(LinesCodecError::Io(io::Error::new(
153 io::ErrorKind::InvalidData,
154 "Unable to decode message len as number",
155 )));
156 }
157 };
158
159 let from = space_pos + 1;
160 let to = from + len;
161
162 if len > self.other.max_length() {
163 self.octet_decoding = Some(State::Discarding(len));
167 src.advance(space_pos + 1);
168
169 Ok(None)
170 } else if let Some(msg) = src.get(from..to) {
171 let bytes = match std::str::from_utf8(msg) {
172 Ok(_) => Bytes::copy_from_slice(msg),
173 Err(_) => {
174 src.advance(to);
179 self.octet_decoding = None;
180 return Err(LinesCodecError::Io(io::Error::new(
181 io::ErrorKind::InvalidData,
182 "Unable to decode message as UTF8",
183 )));
184 }
185 };
186
187 src.advance(to);
189 self.octet_decoding = None;
190 Ok(Some(bytes))
191 } else {
192 Ok(None)
198 }
199 }
200
201 (State::NotDiscarding, Some(newline_pos), _) => {
202 src.advance(newline_pos + 1);
204 Err(LinesCodecError::Io(io::Error::other(
205 "Frame length limit exceeded",
206 )))
207 }
208
209 (State::NotDiscarding, None, _) if src.len() < self.other.max_length() => {
210 Ok(None)
216 }
217
218 (State::NotDiscarding, None, _) => {
219 self.octet_decoding = Some(State::DiscardingToEol);
224 src.advance(src.len());
225 Ok(None)
226 }
227 }
228 }
229
230 fn checked_decode(
232 &mut self,
233 src: &mut BytesMut,
234 ) -> Option<Result<Option<Bytes>, LinesCodecError>> {
235 if let Some(&first_byte) = src.first() {
236 if (49..=57).contains(&first_byte) {
237 trace!("Octet counting encoded event detected.");
240 self.octet_decoding = Some(State::NotDiscarding);
241 }
242 }
243
244 self.octet_decoding
245 .map(|state| self.octet_decode(state, src))
246 }
247}
248
249impl Default for OctetCountingDecoder {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255impl tokio_util::codec::Decoder for OctetCountingDecoder {
256 type Item = Bytes;
257 type Error = BoxedFramingError;
258
259 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
260 if let Some(ret) = self.checked_decode(src) {
261 ret
262 } else {
263 self.other
265 .decode(src)
266 .map(|line| line.map(|line| line.into()))
267 }
268 .map_err(Into::into)
269 }
270
271 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
272 if let Some(ret) = self.checked_decode(buf) {
273 ret
274 } else {
275 self.other
277 .decode_eof(buf)
278 .map(|line| line.map(|line| line.into()))
279 }
280 .map_err(Into::into)
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 #![allow(clippy::print_stdout)]
287
288 use bytes::BufMut;
289 use tokio_util::codec::Decoder;
290
291 use super::*;
292
293 #[test]
294 fn non_octet_decode_works_with_multiple_frames() {
295 let mut decoder = OctetCountingDecoder::new_with_max_length(128);
296 let mut buffer = BytesMut::with_capacity(16);
297
298 buffer.put(&b"<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were "[..]);
299 let result = decoder.decode(&mut buffer);
300 assert_eq!(Ok(None), result.map_err(|_| true));
301
302 buffer.put(&b"8 penguins in the shop.\n"[..]);
303 let result = decoder.decode(&mut buffer);
304 assert_eq!(
305 Ok(Some("<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were 8 penguins in the shop.".into())),
306 result.map_err(|_| true)
307 );
308 }
309
310 #[test]
311 fn octet_decode_works_with_multiple_frames() {
312 let mut decoder = OctetCountingDecoder::new_with_max_length(30);
313 let mut buffer = BytesMut::with_capacity(16);
314
315 buffer.put(&b"28 abcdefghijklm"[..]);
316 let result = decoder.decode(&mut buffer);
317 assert_eq!(Ok(None), result.map_err(|_| false));
318
319 buffer.put(&b"3 nopqrstuvwxyz"[..]);
322 let result = decoder.decode(&mut buffer);
323 assert_eq!(
324 Ok(Some("abcdefghijklm3 nopqrstuvwxyz".into())),
325 result.map_err(|_| false)
326 );
327 }
328
329 #[test]
330 fn octet_decode_moves_past_invalid_length() {
331 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
332 let mut buffer = BytesMut::with_capacity(16);
333
334 buffer.put(&b"232>1 zork"[..]);
336 let result = decoder.decode(&mut buffer);
337
338 assert!(result.is_err());
339 assert_eq!(b"zork"[..], buffer);
340 }
341
342 #[test]
343 fn octet_decode_moves_past_invalid_utf8() {
344 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
345 let mut buffer = BytesMut::with_capacity(16);
346
347 buffer.put(&[b'4', b' ', 0xf0, 0x28, 0x8c, 0xbc][..]);
349 let result = decoder.decode(&mut buffer);
350
351 assert!(result.is_err());
352 assert_eq!(b""[..], buffer);
353 }
354
355 #[test]
356 fn octet_decode_moves_past_exceeded_frame_length() {
357 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
358 let mut buffer = BytesMut::with_capacity(32);
359
360 buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit\n"[..]);
361 let result = decoder.decode(&mut buffer);
362
363 assert!(result.is_err());
364 assert_eq!(b""[..], buffer);
365 }
366
367 #[test]
368 fn octet_decode_rejects_exceeded_frame_length() {
369 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
370 let mut buffer = BytesMut::with_capacity(32);
371
372 buffer.put(&b"26 abcdefghijklmnopqrstuvwxyzand here we are"[..]);
373 let result = decoder.decode(&mut buffer);
374 assert_eq!(Ok(None), result.map_err(|_| false));
375 let result = decoder.decode(&mut buffer);
376
377 assert!(result.is_err());
378 assert_eq!(b"and here we are"[..], buffer);
379 }
380
381 #[test]
382 fn octet_decode_rejects_exceeded_frame_length_multiple_frames() {
383 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
384 let mut buffer = BytesMut::with_capacity(32);
385
386 buffer.put(&b"26 abc"[..]);
387 let _result = decoder.decode(&mut buffer);
388
389 buffer.put(&b"defghijklmnopqrstuvwxyzand here we are"[..]);
390 let result = decoder.decode(&mut buffer);
391
392 println!("{result:?}");
393 assert!(result.is_err());
394 assert_eq!(b"and here we are"[..], buffer);
395 }
396
397 #[test]
398 fn octet_decode_moves_past_exceeded_frame_length_multiple_frames() {
399 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
400 let mut buffer = BytesMut::with_capacity(32);
401
402 buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit"[..]);
403 _ = decoder.decode(&mut buffer);
404
405 assert_eq!(decoder.octet_decoding, Some(State::DiscardingToEol));
406 buffer.put(&b"wemustcontinuetodiscard\n32 something valid"[..]);
407 let result = decoder.decode(&mut buffer);
408
409 assert!(result.is_err());
410 assert_eq!(b"32 something valid"[..], buffer);
411 }
412}