codecs/decoding/framing/
chunked_gelf.rs

1use super::{BoxedFramingError, FramingError};
2use crate::{BytesDecoder, StreamDecodingError};
3use bytes::{Buf, Bytes, BytesMut};
4use derivative::Derivative;
5use flate2::read::{MultiGzDecoder, ZlibDecoder};
6use snafu::{ensure, ResultExt, Snafu};
7use std::any::Any;
8use std::collections::HashMap;
9use std::io::Read;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio;
13use tokio::task::JoinHandle;
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
20const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
21const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
22
23const fn default_timeout_secs() -> f64 {
24    DEFAULT_TIMEOUT_SECS
25}
26
27/// Config used to build a `ChunkedGelfDecoder`.
28#[configurable_component]
29#[derive(Debug, Clone, Default)]
30pub struct ChunkedGelfDecoderConfig {
31    /// Options for the chunked GELF decoder.
32    #[serde(default)]
33    pub chunked_gelf: ChunkedGelfDecoderOptions,
34}
35
36impl ChunkedGelfDecoderConfig {
37    /// Build the `ChunkedGelfDecoder` from this configuration.
38    pub fn build(&self) -> ChunkedGelfDecoder {
39        ChunkedGelfDecoder::new(
40            self.chunked_gelf.timeout_secs,
41            self.chunked_gelf.pending_messages_limit,
42            self.chunked_gelf.max_length,
43            self.chunked_gelf.decompression,
44        )
45    }
46}
47
48/// Options for building a `ChunkedGelfDecoder`.
49#[configurable_component]
50#[derive(Clone, Debug, Derivative)]
51#[derivative(Default)]
52pub struct ChunkedGelfDecoderOptions {
53    /// The timeout, in seconds, for a message to be fully received. If the timeout is reached, the
54    /// decoder drops all the received chunks of the timed out message.
55    #[serde(default = "default_timeout_secs")]
56    #[derivative(Default(value = "default_timeout_secs()"))]
57    pub timeout_secs: f64,
58
59    /// The maximum number of pending incomplete messages. If this limit is reached, the decoder starts
60    /// dropping chunks of new messages, ensuring the memory usage of the decoder's state is bounded.
61    /// If this option is not set, the decoder does not limit the number of pending messages and the memory usage
62    /// of its messages buffer can grow unbounded. This matches Graylog Server's behavior.
63    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
64    pub pending_messages_limit: Option<usize>,
65
66    /// The maximum length of a single GELF message, in bytes. Messages longer than this length will
67    /// be dropped. If this option is not set, the decoder does not limit the length of messages and
68    /// the per-message memory is unbounded.
69    ///
70    /// **Note**: A message can be composed of multiple chunks and this limit is applied to the whole
71    /// message, not to individual chunks.
72    ///
73    /// This limit takes only into account the message's payload and the GELF header bytes are excluded from the calculation.
74    /// The message's payload is the concatenation of all the chunks' payloads.
75    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
76    pub max_length: Option<usize>,
77
78    /// Decompression configuration for GELF messages.
79    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
80    pub decompression: ChunkedGelfDecompressionConfig,
81}
82
83/// Decompression options for ChunkedGelfDecoder.
84#[configurable_component]
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
86#[derivative(Default)]
87pub enum ChunkedGelfDecompressionConfig {
88    /// Automatically detect the decompression method based on the magic bytes of the message.
89    #[derivative(Default)]
90    Auto,
91    /// Use Gzip decompression.
92    Gzip,
93    /// Use Zlib decompression.
94    Zlib,
95    /// Do not decompress the message.
96    None,
97}
98
99impl ChunkedGelfDecompressionConfig {
100    pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
101        match self {
102            Self::Auto => ChunkedGelfDecompression::from_magic(data),
103            Self::Gzip => ChunkedGelfDecompression::Gzip,
104            Self::Zlib => ChunkedGelfDecompression::Zlib,
105            Self::None => ChunkedGelfDecompression::None,
106        }
107    }
108}
109
110#[derive(Debug)]
111struct MessageState {
112    total_chunks: u8,
113    chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
114    chunks_bitmap: u128,
115    current_length: usize,
116    timeout_task: JoinHandle<()>,
117}
118
119impl MessageState {
120    pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
121        Self {
122            total_chunks,
123            chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
124            chunks_bitmap: 0,
125            current_length: 0,
126            timeout_task,
127        }
128    }
129
130    fn is_chunk_present(&self, sequence_number: u8) -> bool {
131        let chunk_bitmap_id = 1 << sequence_number;
132        self.chunks_bitmap & chunk_bitmap_id != 0
133    }
134
135    fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
136        let chunk_bitmap_id = 1 << sequence_number;
137        self.chunks_bitmap |= chunk_bitmap_id;
138        self.current_length += chunk.remaining();
139        self.chunks[sequence_number as usize] = chunk;
140    }
141
142    fn is_complete(&self) -> bool {
143        self.chunks_bitmap.count_ones() == self.total_chunks as u32
144    }
145
146    fn current_length(&self) -> usize {
147        self.current_length
148    }
149
150    fn retrieve_message(&self) -> Option<Bytes> {
151        if self.is_complete() {
152            self.timeout_task.abort();
153            let chunks = &self.chunks[0..self.total_chunks as usize];
154            let mut message = BytesMut::new();
155            for chunk in chunks {
156                message.extend_from_slice(chunk);
157            }
158            Some(message.freeze())
159        } else {
160            None
161        }
162    }
163}
164
165#[derive(Debug, PartialEq, Eq)]
166pub enum ChunkedGelfDecompression {
167    Gzip,
168    Zlib,
169    None,
170}
171
172impl ChunkedGelfDecompression {
173    pub fn from_magic(data: &Bytes) -> Self {
174        if data.starts_with(GZIP_MAGIC) {
175            trace!("Detected Gzip compression");
176            return Self::Gzip;
177        }
178
179        if data.starts_with(ZLIB_MAGIC) {
180            // Based on https://datatracker.ietf.org/doc/html/rfc1950#section-2.2
181            if let Some([first_byte, second_byte]) = data.get(0..2) {
182                if (*first_byte as u16 * 256 + *second_byte as u16) % 31 == 0 {
183                    trace!("Detected Zlib compression");
184                    return Self::Zlib;
185                }
186            };
187
188            warn!(
189                "Detected Zlib magic bytes but the header is invalid: {:?}",
190                data.get(0..2)
191            );
192        };
193
194        trace!("No compression detected",);
195        Self::None
196    }
197
198    pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
199        let decompressed = match self {
200            Self::Gzip => {
201                let mut decoder = MultiGzDecoder::new(data.reader());
202                let mut decompressed = Vec::new();
203                decoder
204                    .read_to_end(&mut decompressed)
205                    .context(GzipDecompressionSnafu)?;
206                Bytes::from(decompressed)
207            }
208            Self::Zlib => {
209                let mut decoder = ZlibDecoder::new(data.reader());
210                let mut decompressed = Vec::new();
211                decoder
212                    .read_to_end(&mut decompressed)
213                    .context(ZlibDecompressionSnafu)?;
214                Bytes::from(decompressed)
215            }
216            Self::None => data,
217        };
218        Ok(decompressed)
219    }
220}
221
222#[derive(Debug, Snafu)]
223pub enum ChunkedGelfDecompressionError {
224    #[snafu(display("Gzip decompression error: {source}"))]
225    GzipDecompression { source: std::io::Error },
226    #[snafu(display("Zlib decompression error: {source}"))]
227    ZlibDecompression { source: std::io::Error },
228}
229
230#[derive(Debug, Snafu)]
231pub enum ChunkedGelfDecoderError {
232    #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
233    InvalidChunkHeader { header: Bytes },
234    #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."))]
235    InvalidTotalChunks {
236        message_id: u64,
237        sequence_number: u8,
238        total_chunks: u8,
239    },
240    #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"))]
241    InvalidSequenceNumber {
242        message_id: u64,
243        sequence_number: u8,
244        total_chunks: u8,
245    },
246    #[snafu(display("Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"))]
247    PendingMessagesLimitReached {
248        message_id: u64,
249        sequence_number: u8,
250        pending_messages_limit: usize,
251    },
252    #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"))]
253    TotalChunksMismatch {
254        message_id: u64,
255        sequence_number: u8,
256        original_total_chunks: u8,
257        received_total_chunks: u8,
258    },
259    #[snafu(display("Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"))]
260    MaxLengthExceed {
261        message_id: u64,
262        sequence_number: u8,
263        length: usize,
264        max_length: usize,
265    },
266    #[snafu(display("Error while decompressing message. {source}"))]
267    Decompression {
268        source: ChunkedGelfDecompressionError,
269    },
270}
271
272impl StreamDecodingError for ChunkedGelfDecoderError {
273    fn can_continue(&self) -> bool {
274        true
275    }
276}
277
278impl FramingError for ChunkedGelfDecoderError {
279    fn as_any(&self) -> &dyn Any {
280        self as &dyn Any
281    }
282}
283
284/// A codec for handling GELF messages that may be chunked. The implementation is based on [Graylog's GELF documentation](https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html#GELFviaUDP)
285/// and [Graylog's go-gelf library](https://github.com/Graylog2/go-gelf/blob/v1/gelf/reader.go).
286#[derive(Debug, Clone)]
287pub struct ChunkedGelfDecoder {
288    // We have to use this decoder to read all the bytes from the buffer first and don't let tokio
289    // read it buffered, as tokio FramedRead will not always call the decode method with the
290    // whole message. (see https://docs.rs/tokio-util/latest/src/tokio_util/codec/framed_impl.rs.html#26).
291    // This limitation is due to the fact that the GELF format does not specify the length of the
292    // message, so we have to read all the bytes from the message (datagram)
293    bytes_decoder: BytesDecoder,
294    decompression_config: ChunkedGelfDecompressionConfig,
295    state: Arc<Mutex<HashMap<u64, MessageState>>>,
296    timeout: Duration,
297    pending_messages_limit: Option<usize>,
298    max_length: Option<usize>,
299}
300
301impl ChunkedGelfDecoder {
302    /// Creates a new `ChunkedGelfDecoder`.
303    pub fn new(
304        timeout_secs: f64,
305        pending_messages_limit: Option<usize>,
306        max_length: Option<usize>,
307        decompression_config: ChunkedGelfDecompressionConfig,
308    ) -> Self {
309        Self {
310            bytes_decoder: BytesDecoder::new(),
311            decompression_config,
312            state: Arc::new(Mutex::new(HashMap::new())),
313            timeout: Duration::from_secs_f64(timeout_secs),
314            pending_messages_limit,
315            max_length,
316        }
317    }
318
319    /// Decode a GELF chunk
320    pub fn decode_chunk(
321        &mut self,
322        mut chunk: Bytes,
323    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
324        // Encoding scheme:
325        //
326        // +------------+-----------------+--------------+----------------------+
327        // | Message id | Sequence number | Total chunks |    Chunk payload     |
328        // +------------+-----------------+--------------+----------------------+
329        // | 64 bits    | 8 bits          | 8 bits       | remaining bits       |
330        // +------------+-----------------+--------------+----------------------+
331        //
332        // As this codec is oriented for UDP, the chunks (datagrams) are not guaranteed to be received in order,
333        // nor to be received at all. So, we have to store the chunks in a buffer (state field) until we receive
334        // all the chunks of a message. When we receive all the chunks of a message, we can concatenate them
335        // and return the complete payload.
336
337        // We need 10 bytes to read the message id, sequence number and total chunks
338        ensure!(
339            chunk.remaining() >= 10,
340            InvalidChunkHeaderSnafu { header: chunk }
341        );
342
343        let message_id = chunk.get_u64();
344        let sequence_number = chunk.get_u8();
345        let total_chunks = chunk.get_u8();
346
347        ensure!(
348            total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
349            InvalidTotalChunksSnafu {
350                message_id,
351                sequence_number,
352                total_chunks
353            }
354        );
355
356        ensure!(
357            sequence_number < total_chunks,
358            InvalidSequenceNumberSnafu {
359                message_id,
360                sequence_number,
361                total_chunks
362            }
363        );
364
365        let mut state_lock = self.state.lock().expect("poisoned lock");
366
367        if let Some(pending_messages_limit) = self.pending_messages_limit {
368            ensure!(
369                state_lock.len() < pending_messages_limit,
370                PendingMessagesLimitReachedSnafu {
371                    message_id,
372                    sequence_number,
373                    pending_messages_limit
374                }
375            );
376        }
377
378        let message_state = state_lock.entry(message_id).or_insert_with(|| {
379            // We need to spawn a task that will clear the message state after a certain time
380            // otherwise we will have a memory leak due to messages that never complete
381            let state = Arc::clone(&self.state);
382            let timeout = self.timeout;
383            let timeout_handle = tokio::spawn(async move {
384                tokio::time::sleep(timeout).await;
385                let mut state_lock = state.lock().expect("poisoned lock");
386                if state_lock.remove(&message_id).is_some() {
387                    warn!(
388                        message_id = message_id,
389                        timeout_secs = timeout.as_secs_f64(),
390                        internal_log_rate_limit = true,
391                        "Message was not fully received within the timeout window. Discarding it."
392                    );
393                }
394            });
395            MessageState::new(total_chunks, timeout_handle)
396        });
397
398        ensure!(
399            message_state.total_chunks == total_chunks,
400            TotalChunksMismatchSnafu {
401                message_id,
402                sequence_number,
403                original_total_chunks: message_state.total_chunks,
404                received_total_chunks: total_chunks
405            }
406        );
407
408        if message_state.is_chunk_present(sequence_number) {
409            debug!(
410                message_id = message_id,
411                sequence_number = sequence_number,
412                internal_log_rate_limit = true,
413                "Received a duplicate chunk. Ignoring it."
414            );
415            return Ok(None);
416        }
417
418        message_state.add_chunk(sequence_number, chunk);
419
420        if let Some(max_length) = self.max_length {
421            let length = message_state.current_length();
422            if length > max_length {
423                state_lock.remove(&message_id);
424                return Err(ChunkedGelfDecoderError::MaxLengthExceed {
425                    message_id,
426                    sequence_number,
427                    length,
428                    max_length,
429                });
430            }
431        }
432
433        if let Some(message) = message_state.retrieve_message() {
434            state_lock.remove(&message_id);
435            Ok(Some(message))
436        } else {
437            Ok(None)
438        }
439    }
440
441    /// Decode a GELF message that may be chunked or not. The source bytes are expected to be
442    /// datagram-based (or message-based), so it must not contain multiple GELF messages
443    /// delimited by '\0', such as it would be in a stream-based protocol.
444    pub fn decode_message(
445        &mut self,
446        mut src: Bytes,
447    ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
448        let message = if src.starts_with(GELF_MAGIC) {
449            trace!("Received a chunked GELF message based on the magic bytes");
450            src.advance(2);
451            self.decode_chunk(src)?
452        } else {
453            trace!(
454                "Received an unchunked GELF message. First two bytes of message: {:?}",
455                &src[0..2]
456            );
457            Some(src)
458        };
459
460        // We can have both chunked and unchunked messages that are compressed
461        message
462            .map(|message| {
463                self.decompression_config
464                    .get_decompression(&message)
465                    .decompress(message)
466                    .context(DecompressionSnafu)
467            })
468            .transpose()
469    }
470}
471
472impl Default for ChunkedGelfDecoder {
473    fn default() -> Self {
474        Self::new(
475            DEFAULT_TIMEOUT_SECS,
476            None,
477            None,
478            ChunkedGelfDecompressionConfig::Auto,
479        )
480    }
481}
482
483impl Decoder for ChunkedGelfDecoder {
484    type Item = Bytes;
485
486    type Error = BoxedFramingError;
487
488    fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
489        if src.is_empty() {
490            return Ok(None);
491        }
492
493        Ok(self
494            .bytes_decoder
495            .decode(src)?
496            .and_then(|frame| self.decode_message(frame).transpose())
497            .transpose()?)
498    }
499    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
500        if buf.is_empty() {
501            return Ok(None);
502        }
503
504        Ok(self
505            .bytes_decoder
506            .decode_eof(buf)?
507            .and_then(|frame| self.decode_message(frame).transpose())
508            .transpose()?)
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use bytes::{BufMut, BytesMut};
516    use flate2::{write::GzEncoder, write::ZlibEncoder};
517    use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
518    use rstest::{fixture, rstest};
519    use std::fmt::Write as FmtWrite;
520    use std::io::Write as IoWrite;
521    use tracing_test::traced_test;
522
523    pub enum Compression {
524        Gzip,
525        Zlib,
526    }
527
528    impl Compression {
529        pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
530            self.compress_with_level(payload, flate2::Compression::default())
531        }
532
533        pub fn compress_with_level(
534            &self,
535            payload: &impl AsRef<[u8]>,
536            level: flate2::Compression,
537        ) -> Bytes {
538            match self {
539                Compression::Gzip => {
540                    let mut encoder = GzEncoder::new(Vec::new(), level);
541                    encoder
542                        .write_all(payload.as_ref())
543                        .expect("failed to write to encoder");
544                    encoder.finish().expect("failed to finish encoder").into()
545                }
546                Compression::Zlib => {
547                    let mut encoder = ZlibEncoder::new(Vec::new(), level);
548                    encoder
549                        .write_all(payload.as_ref())
550                        .expect("failed to write to encoder");
551                    encoder.finish().expect("failed to finish encoder").into()
552                }
553            }
554        }
555    }
556
557    fn create_chunk(
558        message_id: u64,
559        sequence_number: u8,
560        total_chunks: u8,
561        payload: &impl AsRef<[u8]>,
562    ) -> BytesMut {
563        let mut chunk = BytesMut::new();
564        chunk.put_slice(GELF_MAGIC);
565        chunk.put_u64(message_id);
566        chunk.put_u8(sequence_number);
567        chunk.put_u8(total_chunks);
568        chunk.extend_from_slice(payload.as_ref());
569        chunk
570    }
571
572    #[fixture]
573    fn unchunked_message() -> (BytesMut, String) {
574        let payload = "foo";
575        (BytesMut::from(payload), payload.to_string())
576    }
577
578    #[fixture]
579    fn two_chunks_message() -> ([BytesMut; 2], String) {
580        let message_id = 1u64;
581        let total_chunks = 2u8;
582
583        let first_sequence_number = 0u8;
584        let first_payload = "foo";
585        let first_chunk = create_chunk(
586            message_id,
587            first_sequence_number,
588            total_chunks,
589            &first_payload,
590        );
591
592        let second_sequence_number = 1u8;
593        let second_payload = "bar";
594        let second_chunk = create_chunk(
595            message_id,
596            second_sequence_number,
597            total_chunks,
598            &second_payload,
599        );
600
601        (
602            [first_chunk, second_chunk],
603            format!("{first_payload}{second_payload}"),
604        )
605    }
606
607    #[fixture]
608    fn three_chunks_message() -> ([BytesMut; 3], String) {
609        let message_id = 2u64;
610        let total_chunks = 3u8;
611
612        let first_sequence_number = 0u8;
613        let first_payload = "foo";
614        let first_chunk = create_chunk(
615            message_id,
616            first_sequence_number,
617            total_chunks,
618            &first_payload,
619        );
620
621        let second_sequence_number = 1u8;
622        let second_payload = "bar";
623        let second_chunk = create_chunk(
624            message_id,
625            second_sequence_number,
626            total_chunks,
627            &second_payload,
628        );
629
630        let third_sequence_number = 2u8;
631        let third_payload = "baz";
632        let third_chunk = create_chunk(
633            message_id,
634            third_sequence_number,
635            total_chunks,
636            &third_payload,
637        );
638
639        (
640            [first_chunk, second_chunk, third_chunk],
641            format!("{first_payload}{second_payload}{third_payload}"),
642        )
643    }
644
645    fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
646        error
647            .as_any()
648            .downcast_ref::<ChunkedGelfDecoderError>()
649            .expect("Expected ChunkedGelfDecoderError to be downcasted")
650    }
651
652    #[rstest]
653    #[tokio::test]
654    async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
655        let (mut chunks, expected_message) = two_chunks_message;
656        let mut decoder = ChunkedGelfDecoder::default();
657
658        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
659        assert!(frame.is_none());
660
661        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
662        assert_eq!(frame, Some(Bytes::from(expected_message)));
663    }
664
665    #[rstest]
666    #[tokio::test]
667    async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
668        let (mut message, expected_message) = unchunked_message;
669        let mut decoder = ChunkedGelfDecoder::default();
670
671        let frame = decoder.decode_eof(&mut message).unwrap();
672        assert_eq!(frame, Some(Bytes::from(expected_message)));
673    }
674
675    #[rstest]
676    #[tokio::test]
677    async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
678        let (mut chunks, expected_message) = two_chunks_message;
679        let mut decoder = ChunkedGelfDecoder::default();
680
681        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
682        assert!(frame.is_none());
683
684        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
685        assert_eq!(frame, Some(Bytes::from(expected_message)));
686    }
687
688    #[rstest]
689    #[tokio::test]
690    async fn decode_unordered_messages(
691        two_chunks_message: ([BytesMut; 2], String),
692        three_chunks_message: ([BytesMut; 3], String),
693    ) {
694        let (mut two_chunks, two_chunks_expected) = two_chunks_message;
695        let (mut three_chunks, three_chunks_expected) = three_chunks_message;
696        let mut decoder = ChunkedGelfDecoder::default();
697
698        let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
699        assert!(frame.is_none());
700
701        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
702        assert!(frame.is_none());
703
704        let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
705        assert!(frame.is_none());
706
707        let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
708        assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
709
710        let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
711        assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
712    }
713
714    #[rstest]
715    #[tokio::test]
716    async fn decode_mixed_chunked_and_unchunked_messages(
717        unchunked_message: (BytesMut, String),
718        two_chunks_message: ([BytesMut; 2], String),
719    ) {
720        let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
721        let (mut chunks, expected_chunked_message) = two_chunks_message;
722        let mut decoder = ChunkedGelfDecoder::default();
723
724        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
725        assert!(frame.is_none());
726
727        let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
728        assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
729
730        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
731        assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
732    }
733
734    #[tokio::test]
735    async fn decode_shuffled_messages() {
736        let mut rng = SmallRng::seed_from_u64(42);
737        let total_chunks = 100u8;
738        let first_message_id = 1u64;
739        let first_payload = "first payload";
740        let second_message_id = 2u64;
741        let second_payload = "second payload";
742        let first_message_chunks = (0..total_chunks).map(|sequence_number| {
743            create_chunk(
744                first_message_id,
745                sequence_number,
746                total_chunks,
747                &first_payload,
748            )
749        });
750        let second_message_chunks = (0..total_chunks).map(|sequence_number| {
751            create_chunk(
752                second_message_id,
753                sequence_number,
754                total_chunks,
755                &second_payload,
756            )
757        });
758        let expected_first_message = first_payload.repeat(total_chunks as usize);
759        let expected_second_message = second_payload.repeat(total_chunks as usize);
760        let mut merged_chunks = first_message_chunks
761            .chain(second_message_chunks)
762            .collect::<Vec<_>>();
763        merged_chunks.shuffle(&mut rng);
764        let mut decoder = ChunkedGelfDecoder::default();
765
766        let mut count = 0;
767        let first_retrieved_message = loop {
768            assert!(count < 2 * total_chunks as usize);
769            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
770                break message;
771            } else {
772                count += 1;
773            }
774        };
775        let second_retrieved_message = loop {
776            assert!(count < 2 * total_chunks as usize);
777            if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
778                break message;
779            } else {
780                count += 1
781            }
782        };
783
784        assert_eq!(second_retrieved_message, expected_first_message);
785        assert_eq!(first_retrieved_message, expected_second_message);
786    }
787
788    #[rstest]
789    #[tokio::test(start_paused = true)]
790    #[traced_test]
791    async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
792        let (mut chunks, _) = two_chunks_message;
793        let mut decoder = ChunkedGelfDecoder::default();
794
795        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
796        assert!(frame.is_none());
797        assert!(!decoder.state.lock().unwrap().is_empty());
798
799        // The message state should be cleared after a certain time
800        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
801        assert!(decoder.state.lock().unwrap().is_empty());
802        assert!(logs_contain(
803            "Message was not fully received within the timeout window. Discarding it."
804        ));
805
806        let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
807        assert!(frame.is_none());
808
809        tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
810        assert!(decoder.state.lock().unwrap().is_empty());
811        assert!(logs_contain(
812            "Message was not fully received within the timeout window. Discarding it"
813        ));
814    }
815
816    #[tokio::test]
817    async fn decode_empty_input() {
818        let mut src = BytesMut::new();
819        let mut decoder = ChunkedGelfDecoder::default();
820
821        let frame = decoder.decode_eof(&mut src).unwrap();
822        assert!(frame.is_none());
823    }
824
825    #[tokio::test]
826    async fn decode_chunk_with_invalid_header() {
827        let mut src = BytesMut::new();
828        src.extend_from_slice(GELF_MAGIC);
829        // Invalid chunk header with less than 10 bytes
830        let invalid_chunk = [0x12, 0x34];
831        src.extend_from_slice(&invalid_chunk);
832        let mut decoder = ChunkedGelfDecoder::default();
833        let frame = decoder.decode_eof(&mut src);
834
835        let error = frame.unwrap_err();
836        let downcasted_error = downcast_framing_error(&error);
837        assert!(matches!(
838            downcasted_error,
839            ChunkedGelfDecoderError::InvalidChunkHeader { .. }
840        ));
841    }
842
843    #[tokio::test]
844    async fn decode_chunk_with_invalid_total_chunks() {
845        let message_id = 1u64;
846        let sequence_number = 1u8;
847        let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
848        let payload = "foo";
849        let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
850        let mut decoder = ChunkedGelfDecoder::default();
851
852        let frame = decoder.decode_eof(&mut chunk);
853        let error = frame.unwrap_err();
854        let downcasted_error = downcast_framing_error(&error);
855        assert!(matches!(
856            downcasted_error,
857            ChunkedGelfDecoderError::InvalidTotalChunks {
858                message_id: 1,
859                sequence_number: 1,
860                total_chunks: 129,
861            }
862        ));
863    }
864
865    #[tokio::test]
866    async fn decode_chunk_with_invalid_sequence_number() {
867        let message_id = 1u64;
868        let total_chunks = 2u8;
869        let invalid_sequence_number = total_chunks + 1;
870        let payload = "foo";
871        let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
872        let mut decoder = ChunkedGelfDecoder::default();
873
874        let frame = decoder.decode_eof(&mut chunk);
875        let error = frame.unwrap_err();
876        let downcasted_error = downcast_framing_error(&error);
877        assert!(matches!(
878            downcasted_error,
879            ChunkedGelfDecoderError::InvalidSequenceNumber {
880                message_id: 1,
881                sequence_number: 3,
882                total_chunks: 2,
883            }
884        ));
885    }
886
887    #[rstest]
888    #[tokio::test]
889    async fn decode_reached_pending_messages_limit(
890        two_chunks_message: ([BytesMut; 2], String),
891        three_chunks_message: ([BytesMut; 3], String),
892    ) {
893        let (mut two_chunks, _) = two_chunks_message;
894        let (mut three_chunks, _) = three_chunks_message;
895        let mut decoder = ChunkedGelfDecoder {
896            pending_messages_limit: Some(1),
897            ..Default::default()
898        };
899
900        let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
901        assert!(frame.is_none());
902        assert!(decoder.state.lock().unwrap().len() == 1);
903
904        let frame = decoder.decode_eof(&mut three_chunks[0]);
905        let error = frame.unwrap_err();
906        let downcasted_error = downcast_framing_error(&error);
907        assert!(matches!(
908            downcasted_error,
909            ChunkedGelfDecoderError::PendingMessagesLimitReached {
910                message_id: 2u64,
911                sequence_number: 0u8,
912                pending_messages_limit: 1,
913            }
914        ));
915        assert!(decoder.state.lock().unwrap().len() == 1);
916    }
917
918    #[rstest]
919    #[tokio::test]
920    async fn decode_chunk_with_different_total_chunks() {
921        let message_id = 1u64;
922        let sequence_number = 0u8;
923        let total_chunks = 2u8;
924        let payload = "foo";
925        let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
926        let mut second_chunk =
927            create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
928        let mut decoder = ChunkedGelfDecoder::default();
929
930        let frame = decoder.decode_eof(&mut first_chunk).unwrap();
931        assert!(frame.is_none());
932
933        let frame = decoder.decode_eof(&mut second_chunk);
934        let error = frame.unwrap_err();
935        let downcasted_error = downcast_framing_error(&error);
936        assert!(matches!(
937            downcasted_error,
938            ChunkedGelfDecoderError::TotalChunksMismatch {
939                message_id: 1,
940                sequence_number: 1,
941                original_total_chunks: 2,
942                received_total_chunks: 3,
943            }
944        ));
945    }
946
947    #[rstest]
948    #[tokio::test]
949    async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
950        let (mut chunks, _) = two_chunks_message;
951        let mut decoder = ChunkedGelfDecoder {
952            max_length: Some(5),
953            ..Default::default()
954        };
955
956        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
957        assert!(frame.is_none());
958        let frame = decoder.decode_eof(&mut chunks[1]);
959        let error = frame.unwrap_err();
960        let downcasted_error = downcast_framing_error(&error);
961        assert!(matches!(
962            downcasted_error,
963            ChunkedGelfDecoderError::MaxLengthExceed {
964                message_id: 1,
965                sequence_number: 1,
966                length: 6,
967                max_length: 5,
968            }
969        ));
970        assert_eq!(decoder.state.lock().unwrap().len(), 0);
971    }
972
973    #[rstest]
974    #[tokio::test]
975    #[traced_test]
976    async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
977        let (mut chunks, _) = two_chunks_message;
978        let mut decoder = ChunkedGelfDecoder::default();
979
980        let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
981        assert!(frame.is_none());
982
983        let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
984        assert!(frame.is_none());
985        assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
986    }
987
988    #[tokio::test]
989    #[rstest]
990    #[case::gzip(Compression::Gzip)]
991    #[case::zlib(Compression::Zlib)]
992    async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
993        let payload = (0..100).fold(String::new(), |mut payload, n| {
994            write!(payload, "foo{n}").unwrap();
995            payload
996        });
997        let compressed_payload = compression.compress(&payload);
998        let mut decoder = ChunkedGelfDecoder::default();
999
1000        let frame = decoder
1001            .decode_eof(&mut compressed_payload.into())
1002            .expect("decoding should not fail")
1003            .expect("decoding should return a frame");
1004
1005        assert_eq!(frame, payload);
1006    }
1007
1008    #[tokio::test]
1009    #[rstest]
1010    #[case::gzip(Compression::Gzip)]
1011    #[case::zlib(Compression::Zlib)]
1012    async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1013        let message_id = 1u64;
1014        let max_chunk_size = 5;
1015        let payload = (0..100).fold(String::new(), |mut payload, n| {
1016            write!(payload, "foo{n}").unwrap();
1017            payload
1018        });
1019        let compressed_payload = compression.compress(&payload);
1020        let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1021        assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1022        let mut chunks = compressed_payload
1023            .chunks(max_chunk_size)
1024            .enumerate()
1025            .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1026            .collect::<Vec<_>>();
1027        let (last_chunk, first_chunks) =
1028            chunks.split_last_mut().expect("chunks should not be empty");
1029        let mut decoder = ChunkedGelfDecoder::default();
1030
1031        for chunk in first_chunks {
1032            let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1033            assert!(frame.is_none());
1034        }
1035        let frame = decoder
1036            .decode_eof(last_chunk)
1037            .expect("decoding should not fail")
1038            .expect("decoding should return a frame");
1039
1040        assert_eq!(frame, payload);
1041    }
1042
1043    #[tokio::test]
1044    async fn decode_malformed_gzip_message() {
1045        let mut compressed_payload = BytesMut::new();
1046        compressed_payload.extend(GZIP_MAGIC);
1047        compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1048        let mut decoder = ChunkedGelfDecoder::default();
1049
1050        let error = decoder
1051            .decode_eof(&mut compressed_payload)
1052            .expect_err("decoding should fail");
1053
1054        let downcasted_error = downcast_framing_error(&error);
1055        assert!(matches!(
1056            downcasted_error,
1057            ChunkedGelfDecoderError::Decompression {
1058                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1059            }
1060        ));
1061    }
1062
1063    #[tokio::test]
1064    async fn decode_malformed_zlib_message() {
1065        let mut compressed_payload = BytesMut::new();
1066        compressed_payload.extend(ZLIB_MAGIC);
1067        compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1068        let mut decoder = ChunkedGelfDecoder::default();
1069
1070        let error = decoder
1071            .decode_eof(&mut compressed_payload)
1072            .expect_err("decoding should fail");
1073
1074        let downcasted_error = downcast_framing_error(&error);
1075        assert!(matches!(
1076            downcasted_error,
1077            ChunkedGelfDecoderError::Decompression {
1078                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1079            }
1080        ));
1081    }
1082
1083    #[tokio::test]
1084    async fn decode_zlib_payload_with_zlib_decoder() {
1085        let payload = "foo";
1086        let compressed_payload = Compression::Zlib.compress(&payload);
1087        let mut decoder = ChunkedGelfDecoder {
1088            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1089            ..Default::default()
1090        };
1091
1092        let frame = decoder
1093            .decode_eof(&mut compressed_payload.into())
1094            .expect("decoding should not fail")
1095            .expect("decoding should return a frame");
1096
1097        assert_eq!(frame, payload);
1098    }
1099
1100    #[tokio::test]
1101    async fn decode_gzip_payload_with_zlib_decoder() {
1102        let payload = "foo";
1103        let compressed_payload = Compression::Gzip.compress(&payload);
1104        let mut decoder = ChunkedGelfDecoder {
1105            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1106            ..Default::default()
1107        };
1108
1109        let error = decoder
1110            .decode_eof(&mut compressed_payload.into())
1111            .expect_err("decoding should fail");
1112
1113        let downcasted_error = downcast_framing_error(&error);
1114        assert!(matches!(
1115            downcasted_error,
1116            ChunkedGelfDecoderError::Decompression {
1117                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1118            }
1119        ));
1120    }
1121
1122    #[tokio::test]
1123    async fn decode_uncompressed_payload_with_zlib_decoder() {
1124        let payload = "foo";
1125        let mut decoder = ChunkedGelfDecoder {
1126            decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1127            ..Default::default()
1128        };
1129
1130        let error = decoder
1131            .decode_eof(&mut payload.into())
1132            .expect_err("decoding should fail");
1133
1134        let downcasted_error = downcast_framing_error(&error);
1135        assert!(matches!(
1136            downcasted_error,
1137            ChunkedGelfDecoderError::Decompression {
1138                source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1139            }
1140        ));
1141    }
1142
1143    #[tokio::test]
1144    async fn decode_gzip_payload_with_gzip_decoder() {
1145        let payload = "foo";
1146        let compressed_payload = Compression::Gzip.compress(&payload);
1147        let mut decoder = ChunkedGelfDecoder {
1148            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1149            ..Default::default()
1150        };
1151
1152        let frame = decoder
1153            .decode_eof(&mut compressed_payload.into())
1154            .expect("decoding should not fail")
1155            .expect("decoding should return a frame");
1156
1157        assert_eq!(frame, payload);
1158    }
1159
1160    #[tokio::test]
1161    async fn decode_zlib_payload_with_gzip_decoder() {
1162        let payload = "foo";
1163        let compressed_payload = Compression::Zlib.compress(&payload);
1164        let mut decoder = ChunkedGelfDecoder {
1165            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1166            ..Default::default()
1167        };
1168
1169        let error = decoder
1170            .decode_eof(&mut compressed_payload.into())
1171            .expect_err("decoding should fail");
1172
1173        let downcasted_error = downcast_framing_error(&error);
1174        assert!(matches!(
1175            downcasted_error,
1176            ChunkedGelfDecoderError::Decompression {
1177                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1178            }
1179        ));
1180    }
1181
1182    #[tokio::test]
1183    async fn decode_uncompressed_payload_with_gzip_decoder() {
1184        let payload = "foo";
1185        let mut decoder = ChunkedGelfDecoder {
1186            decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1187            ..Default::default()
1188        };
1189
1190        let error = decoder
1191            .decode_eof(&mut payload.into())
1192            .expect_err("decoding should fail");
1193
1194        let downcasted_error = downcast_framing_error(&error);
1195        assert!(matches!(
1196            downcasted_error,
1197            ChunkedGelfDecoderError::Decompression {
1198                source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1199            }
1200        ));
1201    }
1202
1203    #[tokio::test]
1204    #[rstest]
1205    #[case::gzip(Compression::Gzip)]
1206    #[case::zlib(Compression::Zlib)]
1207    async fn decode_compressed_payload_with_no_decompression_decoder(
1208        #[case] compression: Compression,
1209    ) {
1210        let payload = "foo";
1211        let compressed_payload = compression.compress(&payload);
1212        let mut decoder = ChunkedGelfDecoder {
1213            decompression_config: ChunkedGelfDecompressionConfig::None,
1214            ..Default::default()
1215        };
1216
1217        let frame = decoder
1218            .decode_eof(&mut compressed_payload.clone().into())
1219            .expect("decoding should not fail")
1220            .expect("decoding should return a frame");
1221
1222        assert_eq!(frame, compressed_payload);
1223    }
1224
1225    #[test]
1226    fn detect_gzip_compression() {
1227        let payload = "foo";
1228
1229        for level in 0..=9 {
1230            let level = flate2::Compression::new(level);
1231            let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1232            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1233            assert_eq!(
1234                actual,
1235                ChunkedGelfDecompression::Gzip,
1236                "Failed for level {}",
1237                level.level()
1238            );
1239        }
1240    }
1241
1242    #[test]
1243    fn detect_zlib_compression() {
1244        let payload = "foo";
1245
1246        for level in 0..=9 {
1247            let level = flate2::Compression::new(level);
1248            let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1249            let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1250            assert_eq!(
1251                actual,
1252                ChunkedGelfDecompression::Zlib,
1253                "Failed for level {}",
1254                level.level()
1255            );
1256        }
1257    }
1258
1259    #[test]
1260    fn detect_no_compression() {
1261        let payload = "foo";
1262
1263        let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1264
1265        assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1266    }
1267}