codecs/decoding/framing/
mod.rs

1//! A collection of framing methods that can be used to convert from byte frames
2//! with defined boundaries to byte chunks.
3
4#![deny(missing_docs)]
5
6mod bytes;
7mod character_delimited;
8mod chunked_gelf;
9mod length_delimited;
10mod newline_delimited;
11mod octet_counting;
12
13use std::{any::Any, fmt::Debug};
14
15use ::bytes::Bytes;
16pub use character_delimited::{
17    CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions,
18};
19pub use chunked_gelf::{ChunkedGelfDecoder, ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions};
20use dyn_clone::DynClone;
21pub use length_delimited::{LengthDelimitedDecoder, LengthDelimitedDecoderConfig};
22pub use newline_delimited::{
23    NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, NewlineDelimitedDecoderOptions,
24};
25pub use octet_counting::{
26    OctetCountingDecoder, OctetCountingDecoderConfig, OctetCountingDecoderOptions,
27};
28use tokio_util::codec::LinesCodecError;
29
30pub use self::bytes::{BytesDecoder, BytesDecoderConfig};
31use super::StreamDecodingError;
32
33/// An error that occurred while producing byte frames from a byte stream / byte
34/// message.
35///
36/// It requires conformance to `TcpError` so that we can determine whether the
37/// error is recoverable or if trying to continue will lead to hanging up the
38/// TCP source indefinitely.
39pub trait FramingError: std::error::Error + StreamDecodingError + Send + Sync + Any {
40    /// Coerces the error to a `dyn Any`.
41    /// This is useful for downcasting the error to a concrete type
42    fn as_any(&self) -> &dyn Any;
43}
44
45impl std::error::Error for BoxedFramingError {}
46
47impl FramingError for std::io::Error {
48    fn as_any(&self) -> &dyn Any {
49        self as &dyn Any
50    }
51}
52
53impl FramingError for LinesCodecError {
54    fn as_any(&self) -> &dyn Any {
55        self as &dyn Any
56    }
57}
58
59impl<T> From<T> for BoxedFramingError
60where
61    T: FramingError + 'static,
62{
63    fn from(value: T) -> Self {
64        Box::new(value)
65    }
66}
67
68/// A `Box` containing a `FramingError`.
69pub type BoxedFramingError = Box<dyn FramingError>;
70
71impl StreamDecodingError for BoxedFramingError {
72    fn can_continue(&self) -> bool {
73        self.as_ref().can_continue()
74    }
75}
76
77/// Produce byte frames from a byte stream / byte message.
78pub trait Framer:
79    tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError> + DynClone + Debug + Send + Sync
80{
81}
82
83/// Default implementation for `Framer`s that implement
84/// `tokio_util::codec::Decoder`.
85impl<Decoder> Framer for Decoder where
86    Decoder: tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError>
87        + Clone
88        + Debug
89        + Send
90        + Sync
91{
92}
93
94dyn_clone::clone_trait_object!(Framer);
95
96/// A `Box` containing a `Framer`.
97pub type BoxedFramer = Box<dyn Framer>;