codecs/decoding/framing/
octet_counting.rs1use std::io;
2
3use bytes::{Buf, Bytes, BytesMut};
4use tokio_util::codec::{LinesCodec, LinesCodecError};
5use tracing::trace;
6use vector_config::configurable_component;
7
8use super::BoxedFramingError;
9
10#[configurable_component]
12#[derive(Debug, Clone, Default)]
13pub struct OctetCountingDecoderConfig {
14 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
15 pub octet_counting: OctetCountingDecoderOptions,
17}
18
19impl OctetCountingDecoderConfig {
20 pub fn build(&self) -> OctetCountingDecoder {
22 if let Some(max_length) = self.octet_counting.max_length {
23 OctetCountingDecoder::new_with_max_length(max_length)
24 } else {
25 OctetCountingDecoder::new()
26 }
27 }
28}
29
30#[configurable_component]
32#[derive(Clone, Debug, Default, PartialEq, Eq)]
33pub struct OctetCountingDecoderOptions {
34 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
36 pub max_length: Option<usize>,
37}
38
39#[derive(Clone, Debug)]
42pub struct OctetCountingDecoder {
43 other: LinesCodec,
44 octet_decoding: Option<State>,
45}
46
47#[derive(Clone, Copy, Debug, PartialEq, Eq)]
48pub enum State {
49 NotDiscarding,
50 Discarding(usize),
51 DiscardingToEol,
52}
53
54impl OctetCountingDecoder {
55 pub fn new() -> Self {
57 Self {
58 other: LinesCodec::new(),
59 octet_decoding: None,
60 }
61 }
62
63 pub fn new_with_max_length(max_length: usize) -> Self {
65 Self {
66 other: LinesCodec::new_with_max_length(max_length),
67 octet_decoding: None,
68 }
69 }
70
71 fn octet_decode(
73 &mut self,
74 state: State,
75 src: &mut BytesMut,
76 ) -> Result<Option<Bytes>, LinesCodecError> {
77 let space_pos = src.iter().position(|&b| b == b' ');
87
88 let newline_pos = src.iter().position(|&b| b == b'\n');
90
91 match (state, newline_pos, space_pos) {
92 (State::Discarding(chars), _, _) if src.len() >= chars => {
93 src.advance(chars);
97 self.octet_decoding = None;
98 Err(LinesCodecError::Io(io::Error::other(
99 "Frame length limit exceeded",
100 )))
101 }
102
103 (State::Discarding(chars), _, _) => {
104 self.octet_decoding = Some(State::Discarding(src.len() - chars));
109 src.advance(src.len());
110 Ok(None)
111 }
112
113 (State::DiscardingToEol, Some(offset), _) => {
114 src.advance(offset + 1);
116 self.octet_decoding = None;
117 Err(LinesCodecError::Io(io::Error::other(
118 "Frame length limit exceeded",
119 )))
120 }
121
122 (State::DiscardingToEol, None, _) => {
123 src.advance(src.len());
129 Ok(None)
130 }
131
132 (State::NotDiscarding, _, Some(space_pos)) if space_pos < self.other.max_length() => {
133 let len: usize = match std::str::from_utf8(&src[..space_pos])
139 .map_err(|_| ())
140 .and_then(|num| num.parse().map_err(|_| ()))
141 {
142 Ok(len) => len,
143 Err(_) => {
144 src.advance(space_pos + 1);
149 self.octet_decoding = None;
150 return Err(LinesCodecError::Io(io::Error::new(
151 io::ErrorKind::InvalidData,
152 "Unable to decode message len as number",
153 )));
154 }
155 };
156
157 let from = space_pos + 1;
158 let to = from + len;
159
160 if len > self.other.max_length() {
161 self.octet_decoding = Some(State::Discarding(len));
165 src.advance(space_pos + 1);
166
167 Ok(None)
168 } else if let Some(msg) = src.get(from..to) {
169 let bytes = match std::str::from_utf8(msg) {
170 Ok(_) => Bytes::copy_from_slice(msg),
171 Err(_) => {
172 src.advance(to);
177 self.octet_decoding = None;
178 return Err(LinesCodecError::Io(io::Error::new(
179 io::ErrorKind::InvalidData,
180 "Unable to decode message as UTF8",
181 )));
182 }
183 };
184
185 src.advance(to);
187 self.octet_decoding = None;
188 Ok(Some(bytes))
189 } else {
190 Ok(None)
196 }
197 }
198
199 (State::NotDiscarding, Some(newline_pos), _) => {
200 src.advance(newline_pos + 1);
202 Err(LinesCodecError::Io(io::Error::other(
203 "Frame length limit exceeded",
204 )))
205 }
206
207 (State::NotDiscarding, None, _) if src.len() < self.other.max_length() => {
208 Ok(None)
214 }
215
216 (State::NotDiscarding, None, _) => {
217 self.octet_decoding = Some(State::DiscardingToEol);
222 src.advance(src.len());
223 Ok(None)
224 }
225 }
226 }
227
228 fn checked_decode(
230 &mut self,
231 src: &mut BytesMut,
232 ) -> Option<Result<Option<Bytes>, LinesCodecError>> {
233 if let Some(&first_byte) = src.first()
234 && (49..=57).contains(&first_byte)
235 {
236 trace!("Octet counting encoded event detected.");
239 self.octet_decoding = Some(State::NotDiscarding);
240 }
241
242 self.octet_decoding
243 .map(|state| self.octet_decode(state, src))
244 }
245}
246
247impl Default for OctetCountingDecoder {
248 fn default() -> Self {
249 Self::new()
250 }
251}
252
253impl tokio_util::codec::Decoder for OctetCountingDecoder {
254 type Item = Bytes;
255 type Error = BoxedFramingError;
256
257 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
258 if let Some(ret) = self.checked_decode(src) {
259 ret
260 } else {
261 self.other
263 .decode(src)
264 .map(|line| line.map(|line| line.into()))
265 }
266 .map_err(Into::into)
267 }
268
269 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
270 if let Some(ret) = self.checked_decode(buf) {
271 ret
272 } else {
273 self.other
275 .decode_eof(buf)
276 .map(|line| line.map(|line| line.into()))
277 }
278 .map_err(Into::into)
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 #![allow(clippy::print_stdout)]
285
286 use bytes::BufMut;
287 use tokio_util::codec::Decoder;
288
289 use super::*;
290
291 #[test]
292 fn non_octet_decode_works_with_multiple_frames() {
293 let mut decoder = OctetCountingDecoder::new_with_max_length(128);
294 let mut buffer = BytesMut::with_capacity(16);
295
296 buffer.put(&b"<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were "[..]);
297 let result = decoder.decode(&mut buffer);
298 assert_eq!(Ok(None), result.map_err(|_| true));
299
300 buffer.put(&b"8 penguins in the shop.\n"[..]);
301 let result = decoder.decode(&mut buffer);
302 assert_eq!(
303 Ok(Some("<57>Mar 25 21:47:46 gleichner6005 quaerat[2444]: There were 8 penguins in the shop.".into())),
304 result.map_err(|_| true)
305 );
306 }
307
308 #[test]
309 fn octet_decode_works_with_multiple_frames() {
310 let mut decoder = OctetCountingDecoder::new_with_max_length(30);
311 let mut buffer = BytesMut::with_capacity(16);
312
313 buffer.put(&b"28 abcdefghijklm"[..]);
314 let result = decoder.decode(&mut buffer);
315 assert_eq!(Ok(None), result.map_err(|_| false));
316
317 buffer.put(&b"3 nopqrstuvwxyz"[..]);
320 let result = decoder.decode(&mut buffer);
321 assert_eq!(
322 Ok(Some("abcdefghijklm3 nopqrstuvwxyz".into())),
323 result.map_err(|_| false)
324 );
325 }
326
327 #[test]
328 fn octet_decode_moves_past_invalid_length() {
329 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
330 let mut buffer = BytesMut::with_capacity(16);
331
332 buffer.put(&b"232>1 zork"[..]);
334 let result = decoder.decode(&mut buffer);
335
336 assert!(result.is_err());
337 assert_eq!(b"zork"[..], buffer);
338 }
339
340 #[test]
341 fn octet_decode_moves_past_invalid_utf8() {
342 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
343 let mut buffer = BytesMut::with_capacity(16);
344
345 buffer.put(&[b'4', b' ', 0xf0, 0x28, 0x8c, 0xbc][..]);
347 let result = decoder.decode(&mut buffer);
348
349 assert!(result.is_err());
350 assert_eq!(b""[..], buffer);
351 }
352
353 #[test]
354 fn octet_decode_moves_past_exceeded_frame_length() {
355 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
356 let mut buffer = BytesMut::with_capacity(32);
357
358 buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit\n"[..]);
359 let result = decoder.decode(&mut buffer);
360
361 assert!(result.is_err());
362 assert_eq!(b""[..], buffer);
363 }
364
365 #[test]
366 fn octet_decode_rejects_exceeded_frame_length() {
367 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
368 let mut buffer = BytesMut::with_capacity(32);
369
370 buffer.put(&b"26 abcdefghijklmnopqrstuvwxyzand here we are"[..]);
371 let result = decoder.decode(&mut buffer);
372 assert_eq!(Ok(None), result.map_err(|_| false));
373 let result = decoder.decode(&mut buffer);
374
375 assert!(result.is_err());
376 assert_eq!(b"and here we are"[..], buffer);
377 }
378
379 #[test]
380 fn octet_decode_rejects_exceeded_frame_length_multiple_frames() {
381 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
382 let mut buffer = BytesMut::with_capacity(32);
383
384 buffer.put(&b"26 abc"[..]);
385 let _result = decoder.decode(&mut buffer);
386
387 buffer.put(&b"defghijklmnopqrstuvwxyzand here we are"[..]);
388 let result = decoder.decode(&mut buffer);
389
390 println!("{result:?}");
391 assert!(result.is_err());
392 assert_eq!(b"and here we are"[..], buffer);
393 }
394
395 #[test]
396 fn octet_decode_moves_past_exceeded_frame_length_multiple_frames() {
397 let mut decoder = OctetCountingDecoder::new_with_max_length(16);
398 let mut buffer = BytesMut::with_capacity(32);
399
400 buffer.put(&b"32thisshouldbelongerthanthmaxframeasizewhichmeansthesyslogparserwillnotbeabletodecodeit"[..]);
401 _ = decoder.decode(&mut buffer);
402
403 assert_eq!(decoder.octet_decoding, Some(State::DiscardingToEol));
404 buffer.put(&b"wemustcontinuetodiscard\n32 something valid"[..]);
405 let result = decoder.decode(&mut buffer);
406
407 assert!(result.is_err());
408 assert_eq!(b"32 something valid"[..], buffer);
409 }
410}