codecs/decoding/framing/
mod.rs

1//! A collection of framing methods that can be used to convert from byte frames
2//! with defined boundaries to byte chunks.
3
4#![deny(missing_docs)]
5
6mod bytes;
7mod character_delimited;
8mod chunked_gelf;
9mod length_delimited;
10mod newline_delimited;
11mod octet_counting;
12mod varint_length_delimited;
13
14use std::{any::Any, fmt::Debug};
15
16use ::bytes::Bytes;
17pub use character_delimited::{
18    CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions,
19};
20pub use chunked_gelf::{ChunkedGelfDecoder, ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions};
21use dyn_clone::DynClone;
22pub use length_delimited::{LengthDelimitedDecoder, LengthDelimitedDecoderConfig};
23pub use newline_delimited::{
24    NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, NewlineDelimitedDecoderOptions,
25};
26pub use octet_counting::{
27    OctetCountingDecoder, OctetCountingDecoderConfig, OctetCountingDecoderOptions,
28};
29use tokio_util::codec::LinesCodecError;
30pub use varint_length_delimited::{
31    VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
32};
33
34pub use self::bytes::{BytesDecoder, BytesDecoderConfig};
35use super::StreamDecodingError;
36
37/// An error that occurred while producing byte frames from a byte stream / byte
38/// message.
39///
40/// It requires conformance to `TcpError` so that we can determine whether the
41/// error is recoverable or if trying to continue will lead to hanging up the
42/// TCP source indefinitely.
43pub trait FramingError: std::error::Error + StreamDecodingError + Send + Sync + Any {
44    /// Coerces the error to a `dyn Any`.
45    /// This is useful for downcasting the error to a concrete type
46    fn as_any(&self) -> &dyn Any;
47}
48
49impl std::error::Error for BoxedFramingError {}
50
51impl FramingError for std::io::Error {
52    fn as_any(&self) -> &dyn Any {
53        self as &dyn Any
54    }
55}
56
57impl FramingError for LinesCodecError {
58    fn as_any(&self) -> &dyn Any {
59        self as &dyn Any
60    }
61}
62
63impl<T> From<T> for BoxedFramingError
64where
65    T: FramingError + 'static,
66{
67    fn from(value: T) -> Self {
68        Box::new(value)
69    }
70}
71
72/// A `Box` containing a `FramingError`.
73pub type BoxedFramingError = Box<dyn FramingError>;
74
75impl StreamDecodingError for BoxedFramingError {
76    fn can_continue(&self) -> bool {
77        self.as_ref().can_continue()
78    }
79}
80
81/// Produce byte frames from a byte stream / byte message.
82pub trait Framer:
83    tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError> + DynClone + Debug + Send + Sync
84{
85}
86
87/// Default implementation for `Framer`s that implement
88/// `tokio_util::codec::Decoder`.
89impl<Decoder> Framer for Decoder where
90    Decoder: tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError>
91        + Clone
92        + Debug
93        + Send
94        + Sync
95{
96}
97
98dyn_clone::clone_trait_object!(Framer);
99
100/// A `Box` containing a `Framer`.
101pub type BoxedFramer = Box<dyn Framer>;