codecs/decoding/framing/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! A collection of framing methods that can be used to convert from byte frames
//! with defined boundaries to byte chunks.

#![deny(missing_docs)]

mod bytes;
mod character_delimited;
mod chunked_gelf;
mod length_delimited;
mod newline_delimited;
mod octet_counting;

use std::{any::Any, fmt::Debug};

use ::bytes::Bytes;
pub use character_delimited::{
    CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions,
};
pub use chunked_gelf::{ChunkedGelfDecoder, ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions};
use dyn_clone::DynClone;
pub use length_delimited::{LengthDelimitedDecoder, LengthDelimitedDecoderConfig};
pub use newline_delimited::{
    NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, NewlineDelimitedDecoderOptions,
};
pub use octet_counting::{
    OctetCountingDecoder, OctetCountingDecoderConfig, OctetCountingDecoderOptions,
};
use tokio_util::codec::LinesCodecError;

pub use self::bytes::{BytesDecoder, BytesDecoderConfig};
use super::StreamDecodingError;

/// An error that occurred while producing byte frames from a byte stream / byte
/// message.
///
/// It requires conformance to `TcpError` so that we can determine whether the
/// error is recoverable or if trying to continue will lead to hanging up the
/// TCP source indefinitely.
pub trait FramingError: std::error::Error + StreamDecodingError + Send + Sync + Any {
    /// Coerces the error to a `dyn Any`.
    /// This is useful for downcasting the error to a concrete type
    fn as_any(&self) -> &dyn Any;
}

impl std::error::Error for BoxedFramingError {}

impl FramingError for std::io::Error {
    fn as_any(&self) -> &dyn Any {
        self as &dyn Any
    }
}

impl FramingError for LinesCodecError {
    fn as_any(&self) -> &dyn Any {
        self as &dyn Any
    }
}

impl<T> From<T> for BoxedFramingError
where
    T: FramingError + 'static,
{
    fn from(value: T) -> Self {
        Box::new(value)
    }
}

/// A `Box` containing a `FramingError`.
pub type BoxedFramingError = Box<dyn FramingError>;

impl StreamDecodingError for BoxedFramingError {
    fn can_continue(&self) -> bool {
        self.as_ref().can_continue()
    }
}

/// Produce byte frames from a byte stream / byte message.
pub trait Framer:
    tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError> + DynClone + Debug + Send + Sync
{
}

/// Default implementation for `Framer`s that implement
/// `tokio_util::codec::Decoder`.
impl<Decoder> Framer for Decoder where
    Decoder: tokio_util::codec::Decoder<Item = Bytes, Error = BoxedFramingError>
        + Clone
        + Debug
        + Send
        + Sync
{
}

dyn_clone::clone_trait_object!(Framer);

/// A `Box` containing a `Framer`.
pub type BoxedFramer = Box<dyn Framer>;