codecs/decoding/framing/
chunked_gelf.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    io::Read,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27    DEFAULT_TIMEOUT_SECS
28}
29
30/// Config used to build a `ChunkedGelfDecoder`.
31#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34    /// Options for the chunked GELF decoder.
35    #[serde(default)]
36    pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40    /// Build the `ChunkedGelfDecoder` from this configuration.
41    pub fn build(&self) -> ChunkedGelfDecoder {
42        ChunkedGelfDecoder::new(
43            self.chunked_gelf.timeout_secs,
44            self.chunked_gelf.pending_messages_limit,
45            self.chunked_gelf.max_length,
46            self.chunked_gelf.decompression,
47        )
48    }
49}
50
51/// Options for building a `ChunkedGelfDecoder`.
52#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56    /// The timeout, in seconds, for a message to be fully received. If the timeout is reached, the
57    /// decoder drops all the received chunks of the timed out message.
58    #[serde(default = "default_timeout_secs")]
59    #[derivative(Default(value = "default_timeout_secs()"))]
60    pub timeout_secs: f64,
61
62    /// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts
63    /// dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded.
64    /// If this option is not set, the decoder does not limit the number of pending messages and the memory usage
65    /// of its messages buffer can grow unbounded. This matches Graylog Server's behavior.
66    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67    pub pending_messages_limit: Option<usize>,
68
69    /// The maximum length of a single GELF message, in bytes. Messages longer than this length will
70    /// be dropped. If this option is not set, the decoder does not limit the length of messages and
71    /// the per-message memory is unbounded.
72    ///
73    /// **Note**: A message can be composed of multiple chunks and this limit is applied to the whole
74    /// message, not to individual chunks.
75    ///
76    /// This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation.
77    /// The message's payload is the concatenation of all the chunks' payloads.
78    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79    pub max_length: Option<usize>,
80
81    /// Decompression configuration for GELF messages.
82    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83    pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86/// Decompression options for ChunkedGelfDecoder.
87#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
89#[derivative(Default)]
90pub enum ChunkedGelfDecompressionConfig {
91    /// Automatically detect the decompression method based on the magic bytes of the message.
92    #[derivative(Default)]
93    Auto,
94    /// Use Gzip decompression.
95    Gzip,
96    /// Use Zlib decompression.
97    Zlib,
98    /// Do not decompress the message.
99    None,
100}
101
102impl ChunkedGelfDecompressionConfig {
103    pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
104        match self {
105            Self::Auto => ChunkedGelfDecompression::from_magic(data),
106            Self::Gzip => ChunkedGelfDecompression::Gzip,
107            Self::Zlib => ChunkedGelfDecompression::Zlib,
108            Self::None => ChunkedGelfDecompression::None,
109        }
110    }
111}
112
113#[derive(Debug)]
114struct MessageState {
115    total_chunks: u8,
116    chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
117    chunks_bitmap: u128,
118    current_length: usize,
119    timeout_task: JoinHandle<()>,
120}
121
122impl MessageState {
123    pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
124        Self {
125            total_chunks,
126            chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
127            chunks_bitmap: 0,
128            current_length: 0,
129            timeout_task,
130        }
131    }
132
133    fn is_chunk_present(&self, sequence_number: u8) -> bool {
134        let chunk_bitmap_id = 1 << sequence_number;
135        self.chunks_bitmap & chunk_bitmap_id != 0
136    }
137
138    fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
139        let chunk_bitmap_id = 1 << sequence_number;
140        self.chunks_bitmap |= chunk_bitmap_id;
141        self.current_length += chunk.remaining();
142        self.chunks[sequence_number as usize] = chunk;
143    }
144
145    fn is_complete(&self) -> bool {
146        self.chunks_bitmap.count_ones() == self.total_chunks as u32
147    }
148
149    fn current_length(&self) -> usize {
150        self.current_length
151    }
152
153    fn retrieve_message(&self) -> Option<Bytes> {
154        if self.is_complete() {
155            self.timeout_task.abort();
156            let chunks = &self.chunks[0..self.total_chunks as usize];
157            let mut message = BytesMut::new();
158            for chunk in chunks {
159                message.extend_from_slice(chunk);
160            }
161            Some(message.freeze())
162        } else {
163            None
164        }
165    }
166}
167
168#[derive(Debug, PartialEq, Eq)]
169pub enum ChunkedGelfDecompression {
170    Gzip,
171    Zlib,
172    None,
173}
174
175impl ChunkedGelfDecompression {
176    pub fn from_magic(data: &Bytes) -> Self {
177        if data.starts_with(GZIP_MAGIC) {
178            trace!("Detected Gzip compression");
179            return Self::Gzip;
180        }
181
182        if data.starts_with(ZLIB_MAGIC) {
183            // Based on https://datatracker.ietf.org/doc/html/rfc1950#section-2.2
184            if let Some([first_byte, second_byte]) = data.get(0..2)
185                && (*first_byte as u16 * 256 + *second_byte as u16) % 31 == 0
186            {
187                trace!("Detected Zlib compression");
188                return Self::Zlib;
189            };
190
191            warn!(
192                "Detected Zlib magic bytes but the header is invalid: {:?}",
193                data.get(0..2)
194            );
195        };
196
197        trace!("No compression detected",);
198        Self::None
199    }
200
201    pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
202        let decompressed = match self {
203            Self::Gzip => {
204                let mut decoder = MultiGzDecoder::new(data.reader());
205                let mut decompressed = Vec::new();
206                decoder
207                    .read_to_end(&mut decompressed)
208                    .context(GzipDecompressionSnafu)?;
209                Bytes::from(decompressed)
210            }
211            Self::Zlib => {
212                let mut decoder = ZlibDecoder::new(data.reader());
213                let mut decompressed = Vec::new();
214                decoder
215                    .read_to_end(&mut decompressed)
216                    .context(ZlibDecompressionSnafu)?;
217                Bytes::from(decompressed)
218            }
219            Self::None => data,
220        };
221        Ok(decompressed)
222    }
223}
224
225#[derive(Debug, Snafu)]
226pub enum ChunkedGelfDecompressionError {
227    #[snafu(display("Gzip decompression error: {source}"))]
228    GzipDecompression { source: std::io::Error },
229    #[snafu(display("Zlib decompression error: {source}"))]
230    ZlibDecompression { source: std::io::Error },
231}
232
233#[derive(Debug, Snafu)]
234pub enum ChunkedGelfDecoderError {
235    #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
236    InvalidChunkHeader { header: Bytes },
237    #[snafu(display(
238        "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
239    ))]
240    InvalidTotalChunks {
241        message_id: u64,
242        sequence_number: u8,
243        total_chunks: u8,
244    },
245    #[snafu(display(
246        "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
247    ))]
248    InvalidSequenceNumber {
249        message_id: u64,
250        sequence_number: u8,
251        total_chunks: u8,
252    },
253    #[snafu(display(
254        "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
255    ))]
256    PendingMessagesLimitReached {
257        message_id: u64,
258        sequence_number: u8,
259        pending_messages_limit: usize,
260    },
261    #[snafu(display(
262        "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
263    ))]
264    TotalChunksMismatch {
265        message_id: u64,
266        sequence_number: u8,
267        original_total_chunks: u8,
268        received_total_chunks: u8,
269    },
270    #[snafu(display(
271        "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
272    ))]
273    MaxLengthExceed {
274        message_id: u64,
275        sequence_number: u8,
276        length: usize,
277        max_length: usize,
278    },
279    #[snafu(display("Error while decompressing message. {source}"))]
280    Decompression {
281        source: ChunkedGelfDecompressionError,
282    },
283}
284
285impl StreamDecodingError for ChunkedGelfDecoderError {
286    fn can_continue(&self) -> bool {
287        true
288    }
289}
290
291impl FramingError for ChunkedGelfDecoderError {
292    fn as_any(&self) -> &dyn Any {
293        self as &dyn Any
294    }
295}
296
297/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP)
298/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go).
299#[derive(Debug, Clone)]
300pub struct ChunkedGelfDecoder {
301    // We have to use this decoder to read all the bytes from the buffer first and don't let tokio
302    // read it buffered, as tokio FramedRead will not always call the decode method with the
303    // whole message. (see https://docs.rs/tokio-util/latest/src/tokio_util/codec/framed_impl.rs.html#26).
304    // This limitation is due to the fact that the GELF format does not specify the length of the
305    // message, so we have to read all the bytes from the message (datagram)
306    bytes_decoder: BytesDecoder,
307    decompression_config: ChunkedGelfDecompressionConfig,
308    state: Arc<Mutex<HashMap<u64, MessageState>>>,
309    timeout: Duration,
310    pending_messages_limit: Option<usize>,
311    max_length: Option<usize>,
312}
313
314impl ChunkedGelfDecoder {
315    /// Creates a new `ChunkedGelfDecoder`.
316    pub fn new(
317        timeout_secs: f64,
318        pending_messages_limit: Option<usize>,
319        max_length: Option<usize>,
320        decompression_config: ChunkedGelfDecompressionConfig,
321    ) -> Self {
322        Self {
323            bytes_decoder: BytesDecoder::new(),
324            decompression_config,
325            state: Arc::new(Mutex::new(HashMap::new())),
326            timeout: Duration::from_secs_f64(timeout_secs),
327            pending_messages_limit,
328            max_length,
329        }
330    }
331
332    /// Decode a GELF chunk
333    pub fn decode_chunk(
334        &mut self,
335        mut chunk: Bytes,
336    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
337        // Encoding scheme:
338        //
339        // +------------+-----------------+--------------+----------------------+
340        // | Message id | Sequence number | Total chunks |    Chunk payload     |
341        // +------------+-----------------+--------------+----------------------+
342        // | 64 bits    | 8 bits          | 8 bits       | remaining bits       |
343        // +------------+-----------------+--------------+----------------------+
344        //
345        // As this codec is oriented for UDP, the chunks (datagrams) are not guaranteed to be received in order,
346        // nor to be received at all. So, we have to store the chunks in a buffer (state field) until we receive
347        // all the chunks of a message. When we receive all the chunks of a message, we can concatenate them
348        // and return the complete payload.
349
350        // We need 10 bytes to read the message id, sequence number and total chunks
351        ensure!(
352            chunk.remaining() >= 10,
353            InvalidChunkHeaderSnafu { header: chunk }
354        );
355
356        let message_id = chunk.get_u64();
357        let sequence_number = chunk.get_u8();
358        let total_chunks = chunk.get_u8();
359
360        ensure!(
361            total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
362            InvalidTotalChunksSnafu {
363                message_id,
364                sequence_number,
365                total_chunks
366            }
367        );
368
369        ensure!(
370            sequence_number < total_chunks,
371            InvalidSequenceNumberSnafu {
372                message_id,
373                sequence_number,
374                total_chunks
375            }
376        );
377
378        let mut state_lock = self.state.lock().expect("poisoned lock");
379
380        if let Some(pending_messages_limit) = self.pending_messages_limit {
381            ensure!(
382                state_lock.len() < pending_messages_limit,
383                PendingMessagesLimitReachedSnafu {
384                    message_id,
385                    sequence_number,
386                    pending_messages_limit
387                }
388            );
389        }
390
391        let message_state = state_lock.entry(message_id).or_insert_with(|| {
392            // We need to spawn a task that will clear the message state after a certain time
393            // otherwise we will have a memory leak due to messages that never complete
394            let state = Arc::clone(&self.state);
395            let timeout = self.timeout;
396            let timeout_handle = tokio::spawn(async move {
397                tokio::time::sleep(timeout).await;
398                let mut state_lock = state.lock().expect("poisoned lock");
399                if state_lock.remove(&message_id).is_some() {
400                    warn!(
401                        message_id = message_id,
402                        timeout_secs = timeout.as_secs_f64(),
403                        internal_log_rate_limit = true,
404                        "Message was not fully received within the timeout window. Discarding it."
405                    );
406                }
407            });
408            MessageState::new(total_chunks, timeout_handle)
409        });
410
411        ensure!(
412            message_state.total_chunks == total_chunks,
413            TotalChunksMismatchSnafu {
414                message_id,
415                sequence_number,
416                original_total_chunks: message_state.total_chunks,
417                received_total_chunks: total_chunks
418            }
419        );
420
421        if message_state.is_chunk_present(sequence_number) {
422            debug!(
423                message_id = message_id,
424                sequence_number = sequence_number,
425                internal_log_rate_limit = true,
426                "Received a duplicate chunk. Ignoring it."
427            );
428            return Ok(None);
429        }
430
431        message_state.add_chunk(sequence_number, chunk);
432
433        if let Some(max_length) = self.max_length {
434            let length = message_state.current_length();
435            if length > max_length {
436                state_lock.remove(&message_id);
437                return Err(ChunkedGelfDecoderError::MaxLengthExceed {
438                    message_id,
439                    sequence_number,
440                    length,
441                    max_length,
442                });
443            }
444        }
445
446        if let Some(message) = message_state.retrieve_message() {
447            state_lock.remove(&message_id);
448            Ok(Some(message))
449        } else {
450            Ok(None)
451        }
452    }
453
454    /// Decode a GELF message that may be chunked or not. The source bytes are expected to be
455    /// datagram-based (or message-based), so it must not contain multiple GELF messages
456    /// delimited by '\0', such as it would be in a stream-based protocol.
457    pub fn decode_message(
458        &mut self,
459        mut src: Bytes,
460    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
461        let message = if src.starts_with(GELF_MAGIC) {
462            trace!("Received a chunked GELF message based on the magic bytes");
463            src.advance(2);
464            self.decode_chunk(src)?
465        } else {
466            trace!(
467                "Received an unchunked GELF message. First two bytes of message: {:?}",
468                &src[0..2]
469            );
470            Some(src)
471        };
472
473        // We can have both chunked and unchunked messages that are compressed
474        message
475            .map(|message| {
476                self.decompression_config
477                    .get_decompression(&message)
478                    .decompress(message)
479                    .context(DecompressionSnafu)
480            })
481            .transpose()
482    }
483}
484
485impl Default for ChunkedGelfDecoder {
486    fn default() -> Self {
487        Self::new(
488            DEFAULT_TIMEOUT_SECS,
489            None,
490            None,
491            ChunkedGelfDecompressionConfig::Auto,
492        )
493    }
494}
495
496impl Decoder for ChunkedGelfDecoder {
497    type Item = Bytes;
498
499    type Error = BoxedFramingError;
500
501    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
502        if src.is_empty() {
503            return Ok(None);
504        }
505
506        Ok(self
507            .bytes_decoder
508            .decode(src)?
509            .and_then(|frame| self.decode_message(frame).transpose())
510            .transpose()?)
511    }
512    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
513        if buf.is_empty() {
514            return Ok(None);
515        }
516
517        Ok(self
518            .bytes_decoder
519            .decode_eof(buf)?
520            .and_then(|frame| self.decode_message(frame).transpose())
521            .transpose()?)
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
528
529    use bytes::{BufMut, BytesMut};
530    use flate2::write::{GzEncoder, ZlibEncoder};
531    use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
532    use rstest::{fixture, rstest};
533    use tracing_test::traced_test;
534
535    use super::*;
536
537    pub enum Compression {
538        Gzip,
539        Zlib,
540    }
541
542    impl Compression {
543        pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
544            self.compress_with_level(payload, flate2::Compression::default())
545        }
546
547        pub fn compress_with_level(
548            &self,
549            payload: &impl AsRef<[u8]>,
550            level: flate2::Compression,
551        ) -> Bytes {
552            match self {
553                Compression::Gzip => {
554                    let mut encoder = GzEncoder::new(Vec::new(), level);
555                    encoder
556                        .write_all(payload.as_ref())
557                        .expect("failed to write to encoder");
558                    encoder.finish().expect("failed to finish encoder").into()
559                }
560                Compression::Zlib => {
561                    let mut encoder = ZlibEncoder::new(Vec::new(), level);
562                    encoder
563                        .write_all(payload.as_ref())
564                        .expect("failed to write to encoder");
565                    encoder.finish().expect("failed to finish encoder").into()
566                }
567            }
568        }
569    }
570
571    fn create_chunk(
572        message_id: u64,
573        sequence_number: u8,
574        total_chunks: u8,
575        payload: &impl AsRef<[u8]>,
576    ) -> BytesMut {
577        let mut chunk = BytesMut::new();
578        chunk.put_slice(GELF_MAGIC);
579        chunk.put_u64(message_id);
580        chunk.put_u8(sequence_number);
581        chunk.put_u8(total_chunks);
582        chunk.extend_from_slice(payload.as_ref());
583        chunk
584    }
585
586    #[fixture]
587    fn unchunked_message() -> (BytesMut, String) {
588        let payload = "foo";
589        (BytesMut::from(payload), payload.to_string())
590    }
591
592    #[fixture]
593    fn two_chunks_message() -> ([BytesMut; 2], String) {
594        let message_id = 1u64;
595        let total_chunks = 2u8;
596
597        let first_sequence_number = 0u8;
598        let first_payload = "foo";
599        let first_chunk = create_chunk(
600            message_id,
601            first_sequence_number,
602            total_chunks,
603            &first_payload,
604        );
605
606        let second_sequence_number = 1u8;
607        let second_payload = "bar";
608        let second_chunk = create_chunk(
609            message_id,
610            second_sequence_number,
611            total_chunks,
612            &second_payload,
613        );
614
615        (
616            [first_chunk, second_chunk],
617            format!("{first_payload}{second_payload}"),
618        )
619    }
620
621    #[fixture]
622    fn three_chunks_message() -> ([BytesMut; 3], String) {
623        let message_id = 2u64;
624        let total_chunks = 3u8;
625
626        let first_sequence_number = 0u8;
627        let first_payload = "foo";
628        let first_chunk = create_chunk(
629            message_id,
630            first_sequence_number,
631            total_chunks,
632            &first_payload,
633        );
634
635        let second_sequence_number = 1u8;
636        let second_payload = "bar";
637        let second_chunk = create_chunk(
638            message_id,
639            second_sequence_number,
640            total_chunks,
641            &second_payload,
642        );
643
644        let third_sequence_number = 2u8;
645        let third_payload = "baz";
646        let third_chunk = create_chunk(
647            message_id,
648            third_sequence_number,
649            total_chunks,
650            &third_payload,
651        );
652
653        (
654            [first_chunk, second_chunk, third_chunk],
655            format!("{first_payload}{second_payload}{third_payload}"),
656        )
657    }
658
659    fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
660        error
661            .as_any()
662            .downcast_ref::<ChunkedGelfDecoderError>()
663            .expect("Expected ChunkedGelfDecoderError to be downcasted")
664    }
665
666    #[rstest]
667    #[tokio::test]
668    async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
669        let (mut chunks, expected_message) = two_chunks_message;
670        let mut decoder = ChunkedGelfDecoder::default();
671
672        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
673        assert!(frame.is_none());
674
675        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
676        assert_eq!(frame, Some(Bytes::from(expected_message)));
677    }
678
679    #[rstest]
680    #[tokio::test]
681    async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
682        let (mut message, expected_message) = unchunked_message;
683        let mut decoder = ChunkedGelfDecoder::default();
684
685        let frame = decoder.decode_eof(&mut message).unwrap();
686        assert_eq!(frame, Some(Bytes::from(expected_message)));
687    }
688
689    #[rstest]
690    #[tokio::test]
691    async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
692        let (mut chunks, expected_message) = two_chunks_message;
693        let mut decoder = ChunkedGelfDecoder::default();
694
695        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
696        assert!(frame.is_none());
697
698        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
699        assert_eq!(frame, Some(Bytes::from(expected_message)));
700    }
701
702    #[rstest]
703    #[tokio::test]
704    async fn decode_unordered_messages(
705        two_chunks_message: ([BytesMut; 2], String),
706        three_chunks_message: ([BytesMut; 3], String),
707    ) {
708        let (mut two_chunks, two_chunks_expected) = two_chunks_message;
709        let (mut three_chunks, three_chunks_expected) = three_chunks_message;
710        let mut decoder = ChunkedGelfDecoder::default();
711
712        let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
713        assert!(frame.is_none());
714
715        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
716        assert!(frame.is_none());
717
718        let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
719        assert!(frame.is_none());
720
721        let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
722        assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
723
724        let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
725        assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
726    }
727
728    #[rstest]
729    #[tokio::test]
730    async fn decode_mixed_chunked_and_unchunked_messages(
731        unchunked_message: (BytesMut, String),
732        two_chunks_message: ([BytesMut; 2], String),
733    ) {
734        let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
735        let (mut chunks, expected_chunked_message) = two_chunks_message;
736        let mut decoder = ChunkedGelfDecoder::default();
737
738        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
739        assert!(frame.is_none());
740
741        let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
742        assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
743
744        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
745        assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
746    }
747
748    #[tokio::test]
749    async fn decode_shuffled_messages() {
750        let mut rng = SmallRng::seed_from_u64(42);
751        let total_chunks = 100u8;
752        let first_message_id = 1u64;
753        let first_payload = "first payload";
754        let second_message_id = 2u64;
755        let second_payload = "second payload";
756        let first_message_chunks = (0..total_chunks).map(|sequence_number| {
757            create_chunk(
758                first_message_id,
759                sequence_number,
760                total_chunks,
761                &first_payload,
762            )
763        });
764        let second_message_chunks = (0..total_chunks).map(|sequence_number| {
765            create_chunk(
766                second_message_id,
767                sequence_number,
768                total_chunks,
769                &second_payload,
770            )
771        });
772        let expected_first_message = first_payload.repeat(total_chunks as usize);
773        let expected_second_message = second_payload.repeat(total_chunks as usize);
774        let mut merged_chunks = first_message_chunks
775            .chain(second_message_chunks)
776            .collect::<Vec<_>>();
777        merged_chunks.shuffle(&mut rng);
778        let mut decoder = ChunkedGelfDecoder::default();
779
780        let mut count = 0;
781        let first_retrieved_message = loop {
782            assert!(count < 2 * total_chunks as usize);
783            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
784                break message;
785            } else {
786                count += 1;
787            }
788        };
789        let second_retrieved_message = loop {
790            assert!(count < 2 * total_chunks as usize);
791            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
792                break message;
793            } else {
794                count += 1
795            }
796        };
797
798        assert_eq!(second_retrieved_message, expected_first_message);
799        assert_eq!(first_retrieved_message, expected_second_message);
800    }
801
802    #[rstest]
803    #[tokio::test(start_paused = true)]
804    #[traced_test]
805    async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
806        let (mut chunks, _) = two_chunks_message;
807        let mut decoder = ChunkedGelfDecoder::default();
808
809        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
810        assert!(frame.is_none());
811        assert!(!decoder.state.lock().unwrap().is_empty());
812
813        // The message state should be cleared after a certain time
814        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
815        assert!(decoder.state.lock().unwrap().is_empty());
816        assert!(logs_contain(
817            "Message was not fully received within the timeout window. Discarding it."
818        ));
819
820        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
821        assert!(frame.is_none());
822
823        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
824        assert!(decoder.state.lock().unwrap().is_empty());
825        assert!(logs_contain(
826            "Message was not fully received within the timeout window. Discarding it"
827        ));
828    }
829
830    #[tokio::test]
831    async fn decode_empty_input() {
832        let mut src = BytesMut::new();
833        let mut decoder = ChunkedGelfDecoder::default();
834
835        let frame = decoder.decode_eof(&mut src).unwrap();
836        assert!(frame.is_none());
837    }
838
839    #[tokio::test]
840    async fn decode_chunk_with_invalid_header() {
841        let mut src = BytesMut::new();
842        src.extend_from_slice(GELF_MAGIC);
843        // Invalid chunk header with less than 10 bytes
844        let invalid_chunk = [0x12, 0x34];
845        src.extend_from_slice(&invalid_chunk);
846        let mut decoder = ChunkedGelfDecoder::default();
847        let frame = decoder.decode_eof(&mut src);
848
849        let error = frame.unwrap_err();
850        let downcasted_error = downcast_framing_error(&error);
851        assert!(matches!(
852            downcasted_error,
853            ChunkedGelfDecoderError::InvalidChunkHeader { .. }
854        ));
855    }
856
857    #[tokio::test]
858    async fn decode_chunk_with_invalid_total_chunks() {
859        let message_id = 1u64;
860        let sequence_number = 1u8;
861        let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
862        let payload = "foo";
863        let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
864        let mut decoder = ChunkedGelfDecoder::default();
865
866        let frame = decoder.decode_eof(&mut chunk);
867        let error = frame.unwrap_err();
868        let downcasted_error = downcast_framing_error(&error);
869        assert!(matches!(
870            downcasted_error,
871            ChunkedGelfDecoderError::InvalidTotalChunks {
872                message_id: 1,
873                sequence_number: 1,
874                total_chunks: 129,
875            }
876        ));
877    }
878
879    #[tokio::test]
880    async fn decode_chunk_with_invalid_sequence_number() {
881        let message_id = 1u64;
882        let total_chunks = 2u8;
883        let invalid_sequence_number = total_chunks + 1;
884        let payload = "foo";
885        let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
886        let mut decoder = ChunkedGelfDecoder::default();
887
888        let frame = decoder.decode_eof(&mut chunk);
889        let error = frame.unwrap_err();
890        let downcasted_error = downcast_framing_error(&error);
891        assert!(matches!(
892            downcasted_error,
893            ChunkedGelfDecoderError::InvalidSequenceNumber {
894                message_id: 1,
895                sequence_number: 3,
896                total_chunks: 2,
897            }
898        ));
899    }
900
901    #[rstest]
902    #[tokio::test]
903    async fn decode_reached_pending_messages_limit(
904        two_chunks_message: ([BytesMut; 2], String),
905        three_chunks_message: ([BytesMut; 3], String),
906    ) {
907        let (mut two_chunks, _) = two_chunks_message;
908        let (mut three_chunks, _) = three_chunks_message;
909        let mut decoder = ChunkedGelfDecoder {
910            pending_messages_limit: Some(1),
911            ..Default::default()
912        };
913
914        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
915        assert!(frame.is_none());
916        assert!(decoder.state.lock().unwrap().len() == 1);
917
918        let frame = decoder.decode_eof(&mut three_chunks[0]);
919        let error = frame.unwrap_err();
920        let downcasted_error = downcast_framing_error(&error);
921        assert!(matches!(
922            downcasted_error,
923            ChunkedGelfDecoderError::PendingMessagesLimitReached {
924                message_id: 2u64,
925                sequence_number: 0u8,
926                pending_messages_limit: 1,
927            }
928        ));
929        assert!(decoder.state.lock().unwrap().len() == 1);
930    }
931
932    #[rstest]
933    #[tokio::test]
934    async fn decode_chunk_with_different_total_chunks() {
935        let message_id = 1u64;
936        let sequence_number = 0u8;
937        let total_chunks = 2u8;
938        let payload = "foo";
939        let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
940        let mut second_chunk =
941            create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
942        let mut decoder = ChunkedGelfDecoder::default();
943
944        let frame = decoder.decode_eof(&mut first_chunk).unwrap();
945        assert!(frame.is_none());
946
947        let frame = decoder.decode_eof(&mut second_chunk);
948        let error = frame.unwrap_err();
949        let downcasted_error = downcast_framing_error(&error);
950        assert!(matches!(
951            downcasted_error,
952            ChunkedGelfDecoderError::TotalChunksMismatch {
953                message_id: 1,
954                sequence_number: 1,
955                original_total_chunks: 2,
956                received_total_chunks: 3,
957            }
958        ));
959    }
960
961    #[rstest]
962    #[tokio::test]
963    async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
964        let (mut chunks, _) = two_chunks_message;
965        let mut decoder = ChunkedGelfDecoder {
966            max_length: Some(5),
967            ..Default::default()
968        };
969
970        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
971        assert!(frame.is_none());
972        let frame = decoder.decode_eof(&mut chunks[1]);
973        let error = frame.unwrap_err();
974        let downcasted_error = downcast_framing_error(&error);
975        assert!(matches!(
976            downcasted_error,
977            ChunkedGelfDecoderError::MaxLengthExceed {
978                message_id: 1,
979                sequence_number: 1,
980                length: 6,
981                max_length: 5,
982            }
983        ));
984        assert_eq!(decoder.state.lock().unwrap().len(), 0);
985    }
986
987    #[rstest]
988    #[tokio::test]
989    #[traced_test]
990    async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
991        let (mut chunks, _) = two_chunks_message;
992        let mut decoder = ChunkedGelfDecoder::default();
993
994        let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
995        assert!(frame.is_none());
996
997        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
998        assert!(frame.is_none());
999        assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
1000    }
1001
1002    #[tokio::test]
1003    #[rstest]
1004    #[case::gzip(Compression::Gzip)]
1005    #[case::zlib(Compression::Zlib)]
1006    async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1007        let payload = (0..100).fold(String::new(), |mut payload, n| {
1008            write!(payload, "foo{n}").unwrap();
1009            payload
1010        });
1011        let compressed_payload = compression.compress(&payload);
1012        let mut decoder = ChunkedGelfDecoder::default();
1013
1014        let frame = decoder
1015            .decode_eof(&mut compressed_payload.into())
1016            .expect("decoding should not fail")
1017            .expect("decoding should return a frame");
1018
1019        assert_eq!(frame, payload);
1020    }
1021
1022    #[tokio::test]
1023    #[rstest]
1024    #[case::gzip(Compression::Gzip)]
1025    #[case::zlib(Compression::Zlib)]
1026    async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1027        let message_id = 1u64;
1028        let max_chunk_size = 5;
1029        let payload = (0..100).fold(String::new(), |mut payload, n| {
1030            write!(payload, "foo{n}").unwrap();
1031            payload
1032        });
1033        let compressed_payload = compression.compress(&payload);
1034        let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1035        assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1036        let mut chunks = compressed_payload
1037            .chunks(max_chunk_size)
1038            .enumerate()
1039            .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1040            .collect::<Vec<_>>();
1041        let (last_chunk, first_chunks) =
1042            chunks.split_last_mut().expect("chunks should not be empty");
1043        let mut decoder = ChunkedGelfDecoder::default();
1044
1045        for chunk in first_chunks {
1046            let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1047            assert!(frame.is_none());
1048        }
1049        let frame = decoder
1050            .decode_eof(last_chunk)
1051            .expect("decoding should not fail")
1052            .expect("decoding should return a frame");
1053
1054        assert_eq!(frame, payload);
1055    }
1056
1057    #[tokio::test]
1058    async fn decode_malformed_gzip_message() {
1059        let mut compressed_payload = BytesMut::new();
1060        compressed_payload.extend(GZIP_MAGIC);
1061        compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1062        let mut decoder = ChunkedGelfDecoder::default();
1063
1064        let error = decoder
1065            .decode_eof(&mut compressed_payload)
1066            .expect_err("decoding should fail");
1067
1068        let downcasted_error = downcast_framing_error(&error);
1069        assert!(matches!(
1070            downcasted_error,
1071            ChunkedGelfDecoderError::Decompression {
1072                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1073            }
1074        ));
1075    }
1076
1077    #[tokio::test]
1078    async fn decode_malformed_zlib_message() {
1079        let mut compressed_payload = BytesMut::new();
1080        compressed_payload.extend(ZLIB_MAGIC);
1081        compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1082        let mut decoder = ChunkedGelfDecoder::default();
1083
1084        let error = decoder
1085            .decode_eof(&mut compressed_payload)
1086            .expect_err("decoding should fail");
1087
1088        let downcasted_error = downcast_framing_error(&error);
1089        assert!(matches!(
1090            downcasted_error,
1091            ChunkedGelfDecoderError::Decompression {
1092                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1093            }
1094        ));
1095    }
1096
1097    #[tokio::test]
1098    async fn decode_zlib_payload_with_zlib_decoder() {
1099        let payload = "foo";
1100        let compressed_payload = Compression::Zlib.compress(&payload);
1101        let mut decoder = ChunkedGelfDecoder {
1102            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1103            ..Default::default()
1104        };
1105
1106        let frame = decoder
1107            .decode_eof(&mut compressed_payload.into())
1108            .expect("decoding should not fail")
1109            .expect("decoding should return a frame");
1110
1111        assert_eq!(frame, payload);
1112    }
1113
1114    #[tokio::test]
1115    async fn decode_gzip_payload_with_zlib_decoder() {
1116        let payload = "foo";
1117        let compressed_payload = Compression::Gzip.compress(&payload);
1118        let mut decoder = ChunkedGelfDecoder {
1119            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1120            ..Default::default()
1121        };
1122
1123        let error = decoder
1124            .decode_eof(&mut compressed_payload.into())
1125            .expect_err("decoding should fail");
1126
1127        let downcasted_error = downcast_framing_error(&error);
1128        assert!(matches!(
1129            downcasted_error,
1130            ChunkedGelfDecoderError::Decompression {
1131                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1132            }
1133        ));
1134    }
1135
1136    #[tokio::test]
1137    async fn decode_uncompressed_payload_with_zlib_decoder() {
1138        let payload = "foo";
1139        let mut decoder = ChunkedGelfDecoder {
1140            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1141            ..Default::default()
1142        };
1143
1144        let error = decoder
1145            .decode_eof(&mut payload.into())
1146            .expect_err("decoding should fail");
1147
1148        let downcasted_error = downcast_framing_error(&error);
1149        assert!(matches!(
1150            downcasted_error,
1151            ChunkedGelfDecoderError::Decompression {
1152                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1153            }
1154        ));
1155    }
1156
1157    #[tokio::test]
1158    async fn decode_gzip_payload_with_gzip_decoder() {
1159        let payload = "foo";
1160        let compressed_payload = Compression::Gzip.compress(&payload);
1161        let mut decoder = ChunkedGelfDecoder {
1162            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1163            ..Default::default()
1164        };
1165
1166        let frame = decoder
1167            .decode_eof(&mut compressed_payload.into())
1168            .expect("decoding should not fail")
1169            .expect("decoding should return a frame");
1170
1171        assert_eq!(frame, payload);
1172    }
1173
1174    #[tokio::test]
1175    async fn decode_zlib_payload_with_gzip_decoder() {
1176        let payload = "foo";
1177        let compressed_payload = Compression::Zlib.compress(&payload);
1178        let mut decoder = ChunkedGelfDecoder {
1179            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1180            ..Default::default()
1181        };
1182
1183        let error = decoder
1184            .decode_eof(&mut compressed_payload.into())
1185            .expect_err("decoding should fail");
1186
1187        let downcasted_error = downcast_framing_error(&error);
1188        assert!(matches!(
1189            downcasted_error,
1190            ChunkedGelfDecoderError::Decompression {
1191                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1192            }
1193        ));
1194    }
1195
1196    #[tokio::test]
1197    async fn decode_uncompressed_payload_with_gzip_decoder() {
1198        let payload = "foo";
1199        let mut decoder = ChunkedGelfDecoder {
1200            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1201            ..Default::default()
1202        };
1203
1204        let error = decoder
1205            .decode_eof(&mut payload.into())
1206            .expect_err("decoding should fail");
1207
1208        let downcasted_error = downcast_framing_error(&error);
1209        assert!(matches!(
1210            downcasted_error,
1211            ChunkedGelfDecoderError::Decompression {
1212                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1213            }
1214        ));
1215    }
1216
1217    #[tokio::test]
1218    #[rstest]
1219    #[case::gzip(Compression::Gzip)]
1220    #[case::zlib(Compression::Zlib)]
1221    async fn decode_compressed_payload_with_no_decompression_decoder(
1222        #[case] compression: Compression,
1223    ) {
1224        let payload = "foo";
1225        let compressed_payload = compression.compress(&payload);
1226        let mut decoder = ChunkedGelfDecoder {
1227            decompression_config: ChunkedGelfDecompressionConfig::None,
1228            ..Default::default()
1229        };
1230
1231        let frame = decoder
1232            .decode_eof(&mut compressed_payload.clone().into())
1233            .expect("decoding should not fail")
1234            .expect("decoding should return a frame");
1235
1236        assert_eq!(frame, compressed_payload);
1237    }
1238
1239    #[test]
1240    fn detect_gzip_compression() {
1241        let payload = "foo";
1242
1243        for level in 0..=9 {
1244            let level = flate2::Compression::new(level);
1245            let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1246            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1247            assert_eq!(
1248                actual,
1249                ChunkedGelfDecompression::Gzip,
1250                "Failed for level {}",
1251                level.level()
1252            );
1253        }
1254    }
1255
1256    #[test]
1257    fn detect_zlib_compression() {
1258        let payload = "foo";
1259
1260        for level in 0..=9 {
1261            let level = flate2::Compression::new(level);
1262            let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1263            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1264            assert_eq!(
1265                actual,
1266                ChunkedGelfDecompression::Zlib,
1267                "Failed for level {}",
1268                level.level()
1269            );
1270        }
1271    }
1272
1273    #[test]
1274    fn detect_no_compression() {
1275        let payload = "foo";
1276
1277        let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1278
1279        assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1280    }
1281}