codecs/decoding/framing/
chunked_gelf.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    io::Read,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27    DEFAULT_TIMEOUT_SECS
28}
29
30/// Config used to build a `ChunkedGelfDecoder`.
31#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34    /// Options for the chunked GELF decoder.
35    #[serde(default)]
36    pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40    /// Build the `ChunkedGelfDecoder` from this configuration.
41    pub fn build(&self) -> ChunkedGelfDecoder {
42        ChunkedGelfDecoder::new(
43            self.chunked_gelf.timeout_secs,
44            self.chunked_gelf.pending_messages_limit,
45            self.chunked_gelf.max_length,
46            self.chunked_gelf.decompression,
47        )
48    }
49}
50
51/// Options for building a `ChunkedGelfDecoder`.
52#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56    /// The timeout, in seconds, for a message to be fully received. If the timeout is reached, the
57    /// decoder drops all the received chunks of the timed out message.
58    #[serde(default = "default_timeout_secs")]
59    #[derivative(Default(value = "default_timeout_secs()"))]
60    pub timeout_secs: f64,
61
62    /// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts
63    /// dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded.
64    /// If this option is not set, the decoder does not limit the number of pending messages and the memory usage
65    /// of its messages buffer can grow unbounded. This matches Graylog Server's behavior.
66    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67    pub pending_messages_limit: Option<usize>,
68
69    /// The maximum length of a single GELF message, in bytes. Messages longer than this length will
70    /// be dropped. If this option is not set, the decoder does not limit the length of messages and
71    /// the per-message memory is unbounded.
72    ///
73    /// **Note**: A message can be composed of multiple chunks and this limit is applied to the whole
74    /// message, not to individual chunks.
75    ///
76    /// This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation.
77    /// The message's payload is the concatenation of all the chunks' payloads.
78    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79    pub max_length: Option<usize>,
80
81    /// Decompression configuration for GELF messages.
82    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83    pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86/// Decompression options for ChunkedGelfDecoder.
87#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
89#[derivative(Default)]
90pub enum ChunkedGelfDecompressionConfig {
91    /// Automatically detect the decompression method based on the magic bytes of the message.
92    #[derivative(Default)]
93    Auto,
94    /// Use Gzip decompression.
95    Gzip,
96    /// Use Zlib decompression.
97    Zlib,
98    /// Do not decompress the message.
99    None,
100}
101
102impl ChunkedGelfDecompressionConfig {
103    pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
104        match self {
105            Self::Auto => ChunkedGelfDecompression::from_magic(data),
106            Self::Gzip => ChunkedGelfDecompression::Gzip,
107            Self::Zlib => ChunkedGelfDecompression::Zlib,
108            Self::None => ChunkedGelfDecompression::None,
109        }
110    }
111}
112
113#[derive(Debug)]
114struct MessageState {
115    total_chunks: u8,
116    chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
117    chunks_bitmap: u128,
118    current_length: usize,
119    timeout_task: JoinHandle<()>,
120}
121
122impl MessageState {
123    pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
124        Self {
125            total_chunks,
126            chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
127            chunks_bitmap: 0,
128            current_length: 0,
129            timeout_task,
130        }
131    }
132
133    fn is_chunk_present(&self, sequence_number: u8) -> bool {
134        let chunk_bitmap_id = 1 << sequence_number;
135        self.chunks_bitmap & chunk_bitmap_id != 0
136    }
137
138    fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
139        let chunk_bitmap_id = 1 << sequence_number;
140        self.chunks_bitmap |= chunk_bitmap_id;
141        self.current_length += chunk.remaining();
142        self.chunks[sequence_number as usize] = chunk;
143    }
144
145    fn is_complete(&self) -> bool {
146        self.chunks_bitmap.count_ones() == self.total_chunks as u32
147    }
148
149    fn current_length(&self) -> usize {
150        self.current_length
151    }
152
153    fn retrieve_message(&self) -> Option<Bytes> {
154        if self.is_complete() {
155            self.timeout_task.abort();
156            let chunks = &self.chunks[0..self.total_chunks as usize];
157            let mut message = BytesMut::new();
158            for chunk in chunks {
159                message.extend_from_slice(chunk);
160            }
161            Some(message.freeze())
162        } else {
163            None
164        }
165    }
166}
167
168#[derive(Debug, PartialEq, Eq)]
169pub enum ChunkedGelfDecompression {
170    Gzip,
171    Zlib,
172    None,
173}
174
175impl ChunkedGelfDecompression {
176    pub fn from_magic(data: &Bytes) -> Self {
177        if data.starts_with(GZIP_MAGIC) {
178            trace!("Detected Gzip compression");
179            return Self::Gzip;
180        }
181
182        if data.starts_with(ZLIB_MAGIC) {
183            // Based on https://datatracker.ietf.org/doc/html/rfc1950#section-2.2
184            if let Some([first_byte, second_byte]) = data.get(0..2)
185                && (*first_byte as u16 * 256 + *second_byte as u16).is_multiple_of(31)
186            {
187                trace!("Detected Zlib compression");
188                return Self::Zlib;
189            };
190
191            warn!(
192                "Detected Zlib magic bytes but the header is invalid: {:?}",
193                data.get(0..2)
194            );
195        };
196
197        trace!("No compression detected",);
198        Self::None
199    }
200
201    pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
202        let decompressed = match self {
203            Self::Gzip => {
204                let mut decoder = MultiGzDecoder::new(data.reader());
205                let mut decompressed = Vec::new();
206                decoder
207                    .read_to_end(&mut decompressed)
208                    .context(GzipDecompressionSnafu)?;
209                Bytes::from(decompressed)
210            }
211            Self::Zlib => {
212                let mut decoder = ZlibDecoder::new(data.reader());
213                let mut decompressed = Vec::new();
214                decoder
215                    .read_to_end(&mut decompressed)
216                    .context(ZlibDecompressionSnafu)?;
217                Bytes::from(decompressed)
218            }
219            Self::None => data,
220        };
221        Ok(decompressed)
222    }
223}
224
225#[derive(Debug, Snafu)]
226pub enum ChunkedGelfDecompressionError {
227    #[snafu(display("Gzip decompression error: {source}"))]
228    GzipDecompression { source: std::io::Error },
229    #[snafu(display("Zlib decompression error: {source}"))]
230    ZlibDecompression { source: std::io::Error },
231}
232
233#[derive(Debug, Snafu)]
234pub enum ChunkedGelfDecoderError {
235    #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
236    InvalidChunkHeader { header: Bytes },
237    #[snafu(display(
238        "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
239    ))]
240    InvalidTotalChunks {
241        message_id: u64,
242        sequence_number: u8,
243        total_chunks: u8,
244    },
245    #[snafu(display(
246        "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
247    ))]
248    InvalidSequenceNumber {
249        message_id: u64,
250        sequence_number: u8,
251        total_chunks: u8,
252    },
253    #[snafu(display(
254        "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
255    ))]
256    PendingMessagesLimitReached {
257        message_id: u64,
258        sequence_number: u8,
259        pending_messages_limit: usize,
260    },
261    #[snafu(display(
262        "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
263    ))]
264    TotalChunksMismatch {
265        message_id: u64,
266        sequence_number: u8,
267        original_total_chunks: u8,
268        received_total_chunks: u8,
269    },
270    #[snafu(display(
271        "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
272    ))]
273    MaxLengthExceed {
274        message_id: u64,
275        sequence_number: u8,
276        length: usize,
277        max_length: usize,
278    },
279    #[snafu(display("Error while decompressing message. {source}"))]
280    Decompression {
281        source: ChunkedGelfDecompressionError,
282    },
283}
284
285impl StreamDecodingError for ChunkedGelfDecoderError {
286    fn can_continue(&self) -> bool {
287        true
288    }
289}
290
291impl FramingError for ChunkedGelfDecoderError {
292    fn as_any(&self) -> &dyn Any {
293        self as &dyn Any
294    }
295}
296
297/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP)
298/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go).
299#[derive(Debug, Clone)]
300pub struct ChunkedGelfDecoder {
301    // We have to use this decoder to read all the bytes from the buffer first and don't let tokio
302    // read it buffered, as tokio FramedRead will not always call the decode method with the
303    // whole message. (see https://docs.rs/tokio-util/latest/src/tokio_util/codec/framed_impl.rs.html#26).
304    // This limitation is due to the fact that the GELF format does not specify the length of the
305    // message, so we have to read all the bytes from the message (datagram)
306    bytes_decoder: BytesDecoder,
307    decompression_config: ChunkedGelfDecompressionConfig,
308    state: Arc<Mutex<HashMap<u64, MessageState>>>,
309    timeout: Duration,
310    pending_messages_limit: Option<usize>,
311    max_length: Option<usize>,
312}
313
314impl ChunkedGelfDecoder {
315    /// Creates a new `ChunkedGelfDecoder`.
316    pub fn new(
317        timeout_secs: f64,
318        pending_messages_limit: Option<usize>,
319        max_length: Option<usize>,
320        decompression_config: ChunkedGelfDecompressionConfig,
321    ) -> Self {
322        Self {
323            bytes_decoder: BytesDecoder::new(),
324            decompression_config,
325            state: Arc::new(Mutex::new(HashMap::new())),
326            timeout: Duration::from_secs_f64(timeout_secs),
327            pending_messages_limit,
328            max_length,
329        }
330    }
331
332    /// Decode a GELF chunk
333    pub fn decode_chunk(
334        &mut self,
335        mut chunk: Bytes,
336    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
337        // Encoding scheme:
338        //
339        // +------------+-----------------+--------------+----------------------+
340        // | Message id | Sequence number | Total chunks |    Chunk payload     |
341        // +------------+-----------------+--------------+----------------------+
342        // | 64 bits    | 8 bits          | 8 bits       | remaining bits       |
343        // +------------+-----------------+--------------+----------------------+
344        //
345        // As this codec is oriented for UDP, the chunks (datagrams) are not guaranteed to be received in order,
346        // nor to be received at all. So, we have to store the chunks in a buffer (state field) until we receive
347        // all the chunks of a message. When we receive all the chunks of a message, we can concatenate them
348        // and return the complete payload.
349
350        // We need 10 bytes to read the message id, sequence number and total chunks
351        ensure!(
352            chunk.remaining() >= 10,
353            InvalidChunkHeaderSnafu { header: chunk }
354        );
355
356        let message_id = chunk.get_u64();
357        let sequence_number = chunk.get_u8();
358        let total_chunks = chunk.get_u8();
359
360        ensure!(
361            total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
362            InvalidTotalChunksSnafu {
363                message_id,
364                sequence_number,
365                total_chunks
366            }
367        );
368
369        ensure!(
370            sequence_number < total_chunks,
371            InvalidSequenceNumberSnafu {
372                message_id,
373                sequence_number,
374                total_chunks
375            }
376        );
377
378        let mut state_lock = self.state.lock().expect("poisoned lock");
379
380        if let Some(pending_messages_limit) = self.pending_messages_limit {
381            ensure!(
382                state_lock.len() < pending_messages_limit,
383                PendingMessagesLimitReachedSnafu {
384                    message_id,
385                    sequence_number,
386                    pending_messages_limit
387                }
388            );
389        }
390
391        let message_state = state_lock.entry(message_id).or_insert_with(|| {
392            // We need to spawn a task that will clear the message state after a certain time
393            // otherwise we will have a memory leak due to messages that never complete
394            let state = Arc::clone(&self.state);
395            let timeout = self.timeout;
396            let timeout_handle = tokio::spawn(async move {
397                tokio::time::sleep(timeout).await;
398                let mut state_lock = state.lock().expect("poisoned lock");
399                if state_lock.remove(&message_id).is_some() {
400                    warn!(
401                        message_id = message_id,
402                        timeout_secs = timeout.as_secs_f64(),
403                        "Message was not fully received within the timeout window. Discarding it."
404                    );
405                }
406            });
407            MessageState::new(total_chunks, timeout_handle)
408        });
409
410        ensure!(
411            message_state.total_chunks == total_chunks,
412            TotalChunksMismatchSnafu {
413                message_id,
414                sequence_number,
415                original_total_chunks: message_state.total_chunks,
416                received_total_chunks: total_chunks
417            }
418        );
419
420        if message_state.is_chunk_present(sequence_number) {
421            debug!(
422                message_id = message_id,
423                sequence_number = sequence_number,
424                "Received a duplicate chunk. Ignoring it."
425            );
426            return Ok(None);
427        }
428
429        message_state.add_chunk(sequence_number, chunk);
430
431        if let Some(max_length) = self.max_length {
432            let length = message_state.current_length();
433            if length > max_length {
434                state_lock.remove(&message_id);
435                return Err(ChunkedGelfDecoderError::MaxLengthExceed {
436                    message_id,
437                    sequence_number,
438                    length,
439                    max_length,
440                });
441            }
442        }
443
444        if let Some(message) = message_state.retrieve_message() {
445            state_lock.remove(&message_id);
446            Ok(Some(message))
447        } else {
448            Ok(None)
449        }
450    }
451
452    /// Decode a GELF message that may be chunked or not. The source bytes are expected to be
453    /// datagram-based (or message-based), so it must not contain multiple GELF messages
454    /// delimited by '\0', such as it would be in a stream-based protocol.
455    pub fn decode_message(
456        &mut self,
457        mut src: Bytes,
458    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
459        let message = if src.starts_with(GELF_MAGIC) {
460            trace!("Received a chunked GELF message based on the magic bytes");
461            src.advance(2);
462            self.decode_chunk(src)?
463        } else {
464            trace!(
465                "Received an unchunked GELF message. First two bytes of message: {:?}",
466                &src[0..2]
467            );
468            Some(src)
469        };
470
471        // We can have both chunked and unchunked messages that are compressed
472        message
473            .map(|message| {
474                self.decompression_config
475                    .get_decompression(&message)
476                    .decompress(message)
477                    .context(DecompressionSnafu)
478            })
479            .transpose()
480    }
481}
482
483impl Default for ChunkedGelfDecoder {
484    fn default() -> Self {
485        Self::new(
486            DEFAULT_TIMEOUT_SECS,
487            None,
488            None,
489            ChunkedGelfDecompressionConfig::Auto,
490        )
491    }
492}
493
494impl Decoder for ChunkedGelfDecoder {
495    type Item = Bytes;
496
497    type Error = BoxedFramingError;
498
499    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
500        if src.is_empty() {
501            return Ok(None);
502        }
503
504        Ok(self
505            .bytes_decoder
506            .decode(src)?
507            .and_then(|frame| self.decode_message(frame).transpose())
508            .transpose()?)
509    }
510    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
511        if buf.is_empty() {
512            return Ok(None);
513        }
514
515        Ok(self
516            .bytes_decoder
517            .decode_eof(buf)?
518            .and_then(|frame| self.decode_message(frame).transpose())
519            .transpose()?)
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
526
527    use bytes::{BufMut, BytesMut};
528    use flate2::write::{GzEncoder, ZlibEncoder};
529    use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
530    use rstest::{fixture, rstest};
531    use tracing_test::traced_test;
532
533    use super::*;
534
535    pub enum Compression {
536        Gzip,
537        Zlib,
538    }
539
540    impl Compression {
541        pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
542            self.compress_with_level(payload, flate2::Compression::default())
543        }
544
545        pub fn compress_with_level(
546            &self,
547            payload: &impl AsRef<[u8]>,
548            level: flate2::Compression,
549        ) -> Bytes {
550            match self {
551                Compression::Gzip => {
552                    let mut encoder = GzEncoder::new(Vec::new(), level);
553                    encoder
554                        .write_all(payload.as_ref())
555                        .expect("failed to write to encoder");
556                    encoder.finish().expect("failed to finish encoder").into()
557                }
558                Compression::Zlib => {
559                    let mut encoder = ZlibEncoder::new(Vec::new(), level);
560                    encoder
561                        .write_all(payload.as_ref())
562                        .expect("failed to write to encoder");
563                    encoder.finish().expect("failed to finish encoder").into()
564                }
565            }
566        }
567    }
568
569    fn create_chunk(
570        message_id: u64,
571        sequence_number: u8,
572        total_chunks: u8,
573        payload: &impl AsRef<[u8]>,
574    ) -> BytesMut {
575        let mut chunk = BytesMut::new();
576        chunk.put_slice(GELF_MAGIC);
577        chunk.put_u64(message_id);
578        chunk.put_u8(sequence_number);
579        chunk.put_u8(total_chunks);
580        chunk.extend_from_slice(payload.as_ref());
581        chunk
582    }
583
584    #[fixture]
585    fn unchunked_message() -> (BytesMut, String) {
586        let payload = "foo";
587        (BytesMut::from(payload), payload.to_string())
588    }
589
590    #[fixture]
591    fn two_chunks_message() -> ([BytesMut; 2], String) {
592        let message_id = 1u64;
593        let total_chunks = 2u8;
594
595        let first_sequence_number = 0u8;
596        let first_payload = "foo";
597        let first_chunk = create_chunk(
598            message_id,
599            first_sequence_number,
600            total_chunks,
601            &first_payload,
602        );
603
604        let second_sequence_number = 1u8;
605        let second_payload = "bar";
606        let second_chunk = create_chunk(
607            message_id,
608            second_sequence_number,
609            total_chunks,
610            &second_payload,
611        );
612
613        (
614            [first_chunk, second_chunk],
615            format!("{first_payload}{second_payload}"),
616        )
617    }
618
619    #[fixture]
620    fn three_chunks_message() -> ([BytesMut; 3], String) {
621        let message_id = 2u64;
622        let total_chunks = 3u8;
623
624        let first_sequence_number = 0u8;
625        let first_payload = "foo";
626        let first_chunk = create_chunk(
627            message_id,
628            first_sequence_number,
629            total_chunks,
630            &first_payload,
631        );
632
633        let second_sequence_number = 1u8;
634        let second_payload = "bar";
635        let second_chunk = create_chunk(
636            message_id,
637            second_sequence_number,
638            total_chunks,
639            &second_payload,
640        );
641
642        let third_sequence_number = 2u8;
643        let third_payload = "baz";
644        let third_chunk = create_chunk(
645            message_id,
646            third_sequence_number,
647            total_chunks,
648            &third_payload,
649        );
650
651        (
652            [first_chunk, second_chunk, third_chunk],
653            format!("{first_payload}{second_payload}{third_payload}"),
654        )
655    }
656
657    fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
658        error
659            .as_any()
660            .downcast_ref::<ChunkedGelfDecoderError>()
661            .expect("Expected ChunkedGelfDecoderError to be downcasted")
662    }
663
664    #[rstest]
665    #[tokio::test]
666    async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
667        let (mut chunks, expected_message) = two_chunks_message;
668        let mut decoder = ChunkedGelfDecoder::default();
669
670        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
671        assert!(frame.is_none());
672
673        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
674        assert_eq!(frame, Some(Bytes::from(expected_message)));
675    }
676
677    #[rstest]
678    #[tokio::test]
679    async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
680        let (mut message, expected_message) = unchunked_message;
681        let mut decoder = ChunkedGelfDecoder::default();
682
683        let frame = decoder.decode_eof(&mut message).unwrap();
684        assert_eq!(frame, Some(Bytes::from(expected_message)));
685    }
686
687    #[rstest]
688    #[tokio::test]
689    async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
690        let (mut chunks, expected_message) = two_chunks_message;
691        let mut decoder = ChunkedGelfDecoder::default();
692
693        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
694        assert!(frame.is_none());
695
696        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
697        assert_eq!(frame, Some(Bytes::from(expected_message)));
698    }
699
700    #[rstest]
701    #[tokio::test]
702    async fn decode_unordered_messages(
703        two_chunks_message: ([BytesMut; 2], String),
704        three_chunks_message: ([BytesMut; 3], String),
705    ) {
706        let (mut two_chunks, two_chunks_expected) = two_chunks_message;
707        let (mut three_chunks, three_chunks_expected) = three_chunks_message;
708        let mut decoder = ChunkedGelfDecoder::default();
709
710        let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
711        assert!(frame.is_none());
712
713        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
714        assert!(frame.is_none());
715
716        let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
717        assert!(frame.is_none());
718
719        let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
720        assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
721
722        let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
723        assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
724    }
725
726    #[rstest]
727    #[tokio::test]
728    async fn decode_mixed_chunked_and_unchunked_messages(
729        unchunked_message: (BytesMut, String),
730        two_chunks_message: ([BytesMut; 2], String),
731    ) {
732        let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
733        let (mut chunks, expected_chunked_message) = two_chunks_message;
734        let mut decoder = ChunkedGelfDecoder::default();
735
736        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
737        assert!(frame.is_none());
738
739        let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
740        assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
741
742        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
743        assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
744    }
745
746    #[tokio::test]
747    async fn decode_shuffled_messages() {
748        let mut rng = SmallRng::seed_from_u64(42);
749        let total_chunks = 100u8;
750        let first_message_id = 1u64;
751        let first_payload = "first payload";
752        let second_message_id = 2u64;
753        let second_payload = "second payload";
754        let first_message_chunks = (0..total_chunks).map(|sequence_number| {
755            create_chunk(
756                first_message_id,
757                sequence_number,
758                total_chunks,
759                &first_payload,
760            )
761        });
762        let second_message_chunks = (0..total_chunks).map(|sequence_number| {
763            create_chunk(
764                second_message_id,
765                sequence_number,
766                total_chunks,
767                &second_payload,
768            )
769        });
770        let expected_first_message = first_payload.repeat(total_chunks as usize);
771        let expected_second_message = second_payload.repeat(total_chunks as usize);
772        let mut merged_chunks = first_message_chunks
773            .chain(second_message_chunks)
774            .collect::<Vec<_>>();
775        merged_chunks.shuffle(&mut rng);
776        let mut decoder = ChunkedGelfDecoder::default();
777
778        let mut count = 0;
779        let first_retrieved_message = loop {
780            assert!(count < 2 * total_chunks as usize);
781            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
782                break message;
783            } else {
784                count += 1;
785            }
786        };
787        let second_retrieved_message = loop {
788            assert!(count < 2 * total_chunks as usize);
789            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
790                break message;
791            } else {
792                count += 1
793            }
794        };
795
796        assert_eq!(second_retrieved_message, expected_first_message);
797        assert_eq!(first_retrieved_message, expected_second_message);
798    }
799
800    #[rstest]
801    #[tokio::test(start_paused = true)]
802    #[traced_test]
803    async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
804        let (mut chunks, _) = two_chunks_message;
805        let mut decoder = ChunkedGelfDecoder::default();
806
807        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
808        assert!(frame.is_none());
809        assert!(!decoder.state.lock().unwrap().is_empty());
810
811        // The message state should be cleared after a certain time
812        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
813        assert!(decoder.state.lock().unwrap().is_empty());
814        assert!(logs_contain(
815            "Message was not fully received within the timeout window. Discarding it."
816        ));
817
818        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
819        assert!(frame.is_none());
820
821        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
822        assert!(decoder.state.lock().unwrap().is_empty());
823        assert!(logs_contain(
824            "Message was not fully received within the timeout window. Discarding it"
825        ));
826    }
827
828    #[tokio::test]
829    async fn decode_empty_input() {
830        let mut src = BytesMut::new();
831        let mut decoder = ChunkedGelfDecoder::default();
832
833        let frame = decoder.decode_eof(&mut src).unwrap();
834        assert!(frame.is_none());
835    }
836
837    #[tokio::test]
838    async fn decode_chunk_with_invalid_header() {
839        let mut src = BytesMut::new();
840        src.extend_from_slice(GELF_MAGIC);
841        // Invalid chunk header with less than 10 bytes
842        let invalid_chunk = [0x12, 0x34];
843        src.extend_from_slice(&invalid_chunk);
844        let mut decoder = ChunkedGelfDecoder::default();
845        let frame = decoder.decode_eof(&mut src);
846
847        let error = frame.unwrap_err();
848        let downcasted_error = downcast_framing_error(&error);
849        assert!(matches!(
850            downcasted_error,
851            ChunkedGelfDecoderError::InvalidChunkHeader { .. }
852        ));
853    }
854
855    #[tokio::test]
856    async fn decode_chunk_with_invalid_total_chunks() {
857        let message_id = 1u64;
858        let sequence_number = 1u8;
859        let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
860        let payload = "foo";
861        let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
862        let mut decoder = ChunkedGelfDecoder::default();
863
864        let frame = decoder.decode_eof(&mut chunk);
865        let error = frame.unwrap_err();
866        let downcasted_error = downcast_framing_error(&error);
867        assert!(matches!(
868            downcasted_error,
869            ChunkedGelfDecoderError::InvalidTotalChunks {
870                message_id: 1,
871                sequence_number: 1,
872                total_chunks: 129,
873            }
874        ));
875    }
876
877    #[tokio::test]
878    async fn decode_chunk_with_invalid_sequence_number() {
879        let message_id = 1u64;
880        let total_chunks = 2u8;
881        let invalid_sequence_number = total_chunks + 1;
882        let payload = "foo";
883        let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
884        let mut decoder = ChunkedGelfDecoder::default();
885
886        let frame = decoder.decode_eof(&mut chunk);
887        let error = frame.unwrap_err();
888        let downcasted_error = downcast_framing_error(&error);
889        assert!(matches!(
890            downcasted_error,
891            ChunkedGelfDecoderError::InvalidSequenceNumber {
892                message_id: 1,
893                sequence_number: 3,
894                total_chunks: 2,
895            }
896        ));
897    }
898
899    #[rstest]
900    #[tokio::test]
901    async fn decode_reached_pending_messages_limit(
902        two_chunks_message: ([BytesMut; 2], String),
903        three_chunks_message: ([BytesMut; 3], String),
904    ) {
905        let (mut two_chunks, _) = two_chunks_message;
906        let (mut three_chunks, _) = three_chunks_message;
907        let mut decoder = ChunkedGelfDecoder {
908            pending_messages_limit: Some(1),
909            ..Default::default()
910        };
911
912        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
913        assert!(frame.is_none());
914        assert!(decoder.state.lock().unwrap().len() == 1);
915
916        let frame = decoder.decode_eof(&mut three_chunks[0]);
917        let error = frame.unwrap_err();
918        let downcasted_error = downcast_framing_error(&error);
919        assert!(matches!(
920            downcasted_error,
921            ChunkedGelfDecoderError::PendingMessagesLimitReached {
922                message_id: 2u64,
923                sequence_number: 0u8,
924                pending_messages_limit: 1,
925            }
926        ));
927        assert!(decoder.state.lock().unwrap().len() == 1);
928    }
929
930    #[rstest]
931    #[tokio::test]
932    async fn decode_chunk_with_different_total_chunks() {
933        let message_id = 1u64;
934        let sequence_number = 0u8;
935        let total_chunks = 2u8;
936        let payload = "foo";
937        let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
938        let mut second_chunk =
939            create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
940        let mut decoder = ChunkedGelfDecoder::default();
941
942        let frame = decoder.decode_eof(&mut first_chunk).unwrap();
943        assert!(frame.is_none());
944
945        let frame = decoder.decode_eof(&mut second_chunk);
946        let error = frame.unwrap_err();
947        let downcasted_error = downcast_framing_error(&error);
948        assert!(matches!(
949            downcasted_error,
950            ChunkedGelfDecoderError::TotalChunksMismatch {
951                message_id: 1,
952                sequence_number: 1,
953                original_total_chunks: 2,
954                received_total_chunks: 3,
955            }
956        ));
957    }
958
959    #[rstest]
960    #[tokio::test]
961    async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
962        let (mut chunks, _) = two_chunks_message;
963        let mut decoder = ChunkedGelfDecoder {
964            max_length: Some(5),
965            ..Default::default()
966        };
967
968        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
969        assert!(frame.is_none());
970        let frame = decoder.decode_eof(&mut chunks[1]);
971        let error = frame.unwrap_err();
972        let downcasted_error = downcast_framing_error(&error);
973        assert!(matches!(
974            downcasted_error,
975            ChunkedGelfDecoderError::MaxLengthExceed {
976                message_id: 1,
977                sequence_number: 1,
978                length: 6,
979                max_length: 5,
980            }
981        ));
982        assert_eq!(decoder.state.lock().unwrap().len(), 0);
983    }
984
985    #[rstest]
986    #[tokio::test]
987    #[traced_test]
988    async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
989        let (mut chunks, _) = two_chunks_message;
990        let mut decoder = ChunkedGelfDecoder::default();
991
992        let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
993        assert!(frame.is_none());
994
995        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
996        assert!(frame.is_none());
997        assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
998    }
999
1000    #[tokio::test]
1001    #[rstest]
1002    #[case::gzip(Compression::Gzip)]
1003    #[case::zlib(Compression::Zlib)]
1004    async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1005        let payload = (0..100).fold(String::new(), |mut payload, n| {
1006            write!(payload, "foo{n}").unwrap();
1007            payload
1008        });
1009        let compressed_payload = compression.compress(&payload);
1010        let mut decoder = ChunkedGelfDecoder::default();
1011
1012        let frame = decoder
1013            .decode_eof(&mut compressed_payload.into())
1014            .expect("decoding should not fail")
1015            .expect("decoding should return a frame");
1016
1017        assert_eq!(frame, payload);
1018    }
1019
1020    #[tokio::test]
1021    #[rstest]
1022    #[case::gzip(Compression::Gzip)]
1023    #[case::zlib(Compression::Zlib)]
1024    async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1025        let message_id = 1u64;
1026        let max_chunk_size = 5;
1027        let payload = (0..100).fold(String::new(), |mut payload, n| {
1028            write!(payload, "foo{n}").unwrap();
1029            payload
1030        });
1031        let compressed_payload = compression.compress(&payload);
1032        let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1033        assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1034        let mut chunks = compressed_payload
1035            .chunks(max_chunk_size)
1036            .enumerate()
1037            .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1038            .collect::<Vec<_>>();
1039        let (last_chunk, first_chunks) =
1040            chunks.split_last_mut().expect("chunks should not be empty");
1041        let mut decoder = ChunkedGelfDecoder::default();
1042
1043        for chunk in first_chunks {
1044            let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1045            assert!(frame.is_none());
1046        }
1047        let frame = decoder
1048            .decode_eof(last_chunk)
1049            .expect("decoding should not fail")
1050            .expect("decoding should return a frame");
1051
1052        assert_eq!(frame, payload);
1053    }
1054
1055    #[tokio::test]
1056    async fn decode_malformed_gzip_message() {
1057        let mut compressed_payload = BytesMut::new();
1058        compressed_payload.extend(GZIP_MAGIC);
1059        compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1060        let mut decoder = ChunkedGelfDecoder::default();
1061
1062        let error = decoder
1063            .decode_eof(&mut compressed_payload)
1064            .expect_err("decoding should fail");
1065
1066        let downcasted_error = downcast_framing_error(&error);
1067        assert!(matches!(
1068            downcasted_error,
1069            ChunkedGelfDecoderError::Decompression {
1070                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1071            }
1072        ));
1073    }
1074
1075    #[tokio::test]
1076    async fn decode_malformed_zlib_message() {
1077        let mut compressed_payload = BytesMut::new();
1078        compressed_payload.extend(ZLIB_MAGIC);
1079        compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1080        let mut decoder = ChunkedGelfDecoder::default();
1081
1082        let error = decoder
1083            .decode_eof(&mut compressed_payload)
1084            .expect_err("decoding should fail");
1085
1086        let downcasted_error = downcast_framing_error(&error);
1087        assert!(matches!(
1088            downcasted_error,
1089            ChunkedGelfDecoderError::Decompression {
1090                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1091            }
1092        ));
1093    }
1094
1095    #[tokio::test]
1096    async fn decode_zlib_payload_with_zlib_decoder() {
1097        let payload = "foo";
1098        let compressed_payload = Compression::Zlib.compress(&payload);
1099        let mut decoder = ChunkedGelfDecoder {
1100            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1101            ..Default::default()
1102        };
1103
1104        let frame = decoder
1105            .decode_eof(&mut compressed_payload.into())
1106            .expect("decoding should not fail")
1107            .expect("decoding should return a frame");
1108
1109        assert_eq!(frame, payload);
1110    }
1111
1112    #[tokio::test]
1113    async fn decode_gzip_payload_with_zlib_decoder() {
1114        let payload = "foo";
1115        let compressed_payload = Compression::Gzip.compress(&payload);
1116        let mut decoder = ChunkedGelfDecoder {
1117            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1118            ..Default::default()
1119        };
1120
1121        let error = decoder
1122            .decode_eof(&mut compressed_payload.into())
1123            .expect_err("decoding should fail");
1124
1125        let downcasted_error = downcast_framing_error(&error);
1126        assert!(matches!(
1127            downcasted_error,
1128            ChunkedGelfDecoderError::Decompression {
1129                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1130            }
1131        ));
1132    }
1133
1134    #[tokio::test]
1135    async fn decode_uncompressed_payload_with_zlib_decoder() {
1136        let payload = "foo";
1137        let mut decoder = ChunkedGelfDecoder {
1138            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1139            ..Default::default()
1140        };
1141
1142        let error = decoder
1143            .decode_eof(&mut payload.into())
1144            .expect_err("decoding should fail");
1145
1146        let downcasted_error = downcast_framing_error(&error);
1147        assert!(matches!(
1148            downcasted_error,
1149            ChunkedGelfDecoderError::Decompression {
1150                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1151            }
1152        ));
1153    }
1154
1155    #[tokio::test]
1156    async fn decode_gzip_payload_with_gzip_decoder() {
1157        let payload = "foo";
1158        let compressed_payload = Compression::Gzip.compress(&payload);
1159        let mut decoder = ChunkedGelfDecoder {
1160            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1161            ..Default::default()
1162        };
1163
1164        let frame = decoder
1165            .decode_eof(&mut compressed_payload.into())
1166            .expect("decoding should not fail")
1167            .expect("decoding should return a frame");
1168
1169        assert_eq!(frame, payload);
1170    }
1171
1172    #[tokio::test]
1173    async fn decode_zlib_payload_with_gzip_decoder() {
1174        let payload = "foo";
1175        let compressed_payload = Compression::Zlib.compress(&payload);
1176        let mut decoder = ChunkedGelfDecoder {
1177            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1178            ..Default::default()
1179        };
1180
1181        let error = decoder
1182            .decode_eof(&mut compressed_payload.into())
1183            .expect_err("decoding should fail");
1184
1185        let downcasted_error = downcast_framing_error(&error);
1186        assert!(matches!(
1187            downcasted_error,
1188            ChunkedGelfDecoderError::Decompression {
1189                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1190            }
1191        ));
1192    }
1193
1194    #[tokio::test]
1195    async fn decode_uncompressed_payload_with_gzip_decoder() {
1196        let payload = "foo";
1197        let mut decoder = ChunkedGelfDecoder {
1198            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1199            ..Default::default()
1200        };
1201
1202        let error = decoder
1203            .decode_eof(&mut payload.into())
1204            .expect_err("decoding should fail");
1205
1206        let downcasted_error = downcast_framing_error(&error);
1207        assert!(matches!(
1208            downcasted_error,
1209            ChunkedGelfDecoderError::Decompression {
1210                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1211            }
1212        ));
1213    }
1214
1215    #[tokio::test]
1216    #[rstest]
1217    #[case::gzip(Compression::Gzip)]
1218    #[case::zlib(Compression::Zlib)]
1219    async fn decode_compressed_payload_with_no_decompression_decoder(
1220        #[case] compression: Compression,
1221    ) {
1222        let payload = "foo";
1223        let compressed_payload = compression.compress(&payload);
1224        let mut decoder = ChunkedGelfDecoder {
1225            decompression_config: ChunkedGelfDecompressionConfig::None,
1226            ..Default::default()
1227        };
1228
1229        let frame = decoder
1230            .decode_eof(&mut compressed_payload.clone().into())
1231            .expect("decoding should not fail")
1232            .expect("decoding should return a frame");
1233
1234        assert_eq!(frame, compressed_payload);
1235    }
1236
1237    #[test]
1238    fn detect_gzip_compression() {
1239        let payload = "foo";
1240
1241        for level in 0..=9 {
1242            let level = flate2::Compression::new(level);
1243            let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1244            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1245            assert_eq!(
1246                actual,
1247                ChunkedGelfDecompression::Gzip,
1248                "Failed for level {}",
1249                level.level()
1250            );
1251        }
1252    }
1253
1254    #[test]
1255    fn detect_zlib_compression() {
1256        let payload = "foo";
1257
1258        for level in 0..=9 {
1259            let level = flate2::Compression::new(level);
1260            let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1261            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1262            assert_eq!(
1263                actual,
1264                ChunkedGelfDecompression::Zlib,
1265                "Failed for level {}",
1266                level.level()
1267            );
1268        }
1269    }
1270
1271    #[test]
1272    fn detect_no_compression() {
1273        let payload = "foo";
1274
1275        let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1276
1277        assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1278    }
1279}