1use super::{BoxedFramingError, FramingError};
2use crate::{BytesDecoder, StreamDecodingError};
3use bytes::{Buf, Bytes, BytesMut};
4use derivative::Derivative;
5use flate2::read::{MultiGzDecoder, ZlibDecoder};
6use snafu::{ensure, ResultExt, Snafu};
7use std::any::Any;
8use std::collections::HashMap;
9use std::io::Read;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use tokio;
13use tokio::task::JoinHandle;
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
20const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
21const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
22
23const fn default_timeout_secs() -> f64 {
24 DEFAULT_TIMEOUT_SECS
25}
26
27#[configurable_component]
29#[derive(Debug, Clone, Default)]
30pub struct ChunkedGelfDecoderConfig {
31 #[serde(default)]
33 pub chunked_gelf: ChunkedGelfDecoderOptions,
34}
35
36impl ChunkedGelfDecoderConfig {
37 pub fn build(&self) -> ChunkedGelfDecoder {
39 ChunkedGelfDecoder::new(
40 self.chunked_gelf.timeout_secs,
41 self.chunked_gelf.pending_messages_limit,
42 self.chunked_gelf.max_length,
43 self.chunked_gelf.decompression,
44 )
45 }
46}
47
48#[configurable_component]
50#[derive(Clone, Debug, Derivative)]
51#[derivative(Default)]
52pub struct ChunkedGelfDecoderOptions {
53 #[serde(default = "default_timeout_secs")]
56 #[derivative(Default(value = "default_timeout_secs()"))]
57 pub timeout_secs: f64,
58
59 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
64 pub pending_messages_limit: Option<usize>,
65
66 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
76 pub max_length: Option<usize>,
77
78 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
80 pub decompression: ChunkedGelfDecompressionConfig,
81}
82
83#[configurable_component]
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
86#[derivative(Default)]
87pub enum ChunkedGelfDecompressionConfig {
88 #[derivative(Default)]
90 Auto,
91 Gzip,
93 Zlib,
95 None,
97}
98
99impl ChunkedGelfDecompressionConfig {
100 pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
101 match self {
102 Self::Auto => ChunkedGelfDecompression::from_magic(data),
103 Self::Gzip => ChunkedGelfDecompression::Gzip,
104 Self::Zlib => ChunkedGelfDecompression::Zlib,
105 Self::None => ChunkedGelfDecompression::None,
106 }
107 }
108}
109
110#[derive(Debug)]
111struct MessageState {
112 total_chunks: u8,
113 chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
114 chunks_bitmap: u128,
115 current_length: usize,
116 timeout_task: JoinHandle<()>,
117}
118
119impl MessageState {
120 pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
121 Self {
122 total_chunks,
123 chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
124 chunks_bitmap: 0,
125 current_length: 0,
126 timeout_task,
127 }
128 }
129
130 fn is_chunk_present(&self, sequence_number: u8) -> bool {
131 let chunk_bitmap_id = 1 << sequence_number;
132 self.chunks_bitmap & chunk_bitmap_id != 0
133 }
134
135 fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
136 let chunk_bitmap_id = 1 << sequence_number;
137 self.chunks_bitmap |= chunk_bitmap_id;
138 self.current_length += chunk.remaining();
139 self.chunks[sequence_number as usize] = chunk;
140 }
141
142 fn is_complete(&self) -> bool {
143 self.chunks_bitmap.count_ones() == self.total_chunks as u32
144 }
145
146 fn current_length(&self) -> usize {
147 self.current_length
148 }
149
150 fn retrieve_message(&self) -> Option<Bytes> {
151 if self.is_complete() {
152 self.timeout_task.abort();
153 let chunks = &self.chunks[0..self.total_chunks as usize];
154 let mut message = BytesMut::new();
155 for chunk in chunks {
156 message.extend_from_slice(chunk);
157 }
158 Some(message.freeze())
159 } else {
160 None
161 }
162 }
163}
164
165#[derive(Debug, PartialEq, Eq)]
166pub enum ChunkedGelfDecompression {
167 Gzip,
168 Zlib,
169 None,
170}
171
172impl ChunkedGelfDecompression {
173 pub fn from_magic(data: &Bytes) -> Self {
174 if data.starts_with(GZIP_MAGIC) {
175 trace!("Detected Gzip compression");
176 return Self::Gzip;
177 }
178
179 if data.starts_with(ZLIB_MAGIC) {
180 if let Some([first_byte, second_byte]) = data.get(0..2) {
182 if (*first_byte as u16 * 256 + *second_byte as u16) % 31 == 0 {
183 trace!("Detected Zlib compression");
184 return Self::Zlib;
185 }
186 };
187
188 warn!(
189 "Detected Zlib magic bytes but the header is invalid: {:?}",
190 data.get(0..2)
191 );
192 };
193
194 trace!("No compression detected",);
195 Self::None
196 }
197
198 pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
199 let decompressed = match self {
200 Self::Gzip => {
201 let mut decoder = MultiGzDecoder::new(data.reader());
202 let mut decompressed = Vec::new();
203 decoder
204 .read_to_end(&mut decompressed)
205 .context(GzipDecompressionSnafu)?;
206 Bytes::from(decompressed)
207 }
208 Self::Zlib => {
209 let mut decoder = ZlibDecoder::new(data.reader());
210 let mut decompressed = Vec::new();
211 decoder
212 .read_to_end(&mut decompressed)
213 .context(ZlibDecompressionSnafu)?;
214 Bytes::from(decompressed)
215 }
216 Self::None => data,
217 };
218 Ok(decompressed)
219 }
220}
221
222#[derive(Debug, Snafu)]
223pub enum ChunkedGelfDecompressionError {
224 #[snafu(display("Gzip decompression error: {source}"))]
225 GzipDecompression { source: std::io::Error },
226 #[snafu(display("Zlib decompression error: {source}"))]
227 ZlibDecompression { source: std::io::Error },
228}
229
230#[derive(Debug, Snafu)]
231pub enum ChunkedGelfDecoderError {
232 #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
233 InvalidChunkHeader { header: Bytes },
234 #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."))]
235 InvalidTotalChunks {
236 message_id: u64,
237 sequence_number: u8,
238 total_chunks: u8,
239 },
240 #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"))]
241 InvalidSequenceNumber {
242 message_id: u64,
243 sequence_number: u8,
244 total_chunks: u8,
245 },
246 #[snafu(display("Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"))]
247 PendingMessagesLimitReached {
248 message_id: u64,
249 sequence_number: u8,
250 pending_messages_limit: usize,
251 },
252 #[snafu(display("Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"))]
253 TotalChunksMismatch {
254 message_id: u64,
255 sequence_number: u8,
256 original_total_chunks: u8,
257 received_total_chunks: u8,
258 },
259 #[snafu(display("Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"))]
260 MaxLengthExceed {
261 message_id: u64,
262 sequence_number: u8,
263 length: usize,
264 max_length: usize,
265 },
266 #[snafu(display("Error while decompressing message. {source}"))]
267 Decompression {
268 source: ChunkedGelfDecompressionError,
269 },
270}
271
272impl StreamDecodingError for ChunkedGelfDecoderError {
273 fn can_continue(&self) -> bool {
274 true
275 }
276}
277
278impl FramingError for ChunkedGelfDecoderError {
279 fn as_any(&self) -> &dyn Any {
280 self as &dyn Any
281 }
282}
283
284#[derive(Debug, Clone)]
287pub struct ChunkedGelfDecoder {
288 bytes_decoder: BytesDecoder,
294 decompression_config: ChunkedGelfDecompressionConfig,
295 state: Arc<Mutex<HashMap<u64, MessageState>>>,
296 timeout: Duration,
297 pending_messages_limit: Option<usize>,
298 max_length: Option<usize>,
299}
300
301impl ChunkedGelfDecoder {
302 pub fn new(
304 timeout_secs: f64,
305 pending_messages_limit: Option<usize>,
306 max_length: Option<usize>,
307 decompression_config: ChunkedGelfDecompressionConfig,
308 ) -> Self {
309 Self {
310 bytes_decoder: BytesDecoder::new(),
311 decompression_config,
312 state: Arc::new(Mutex::new(HashMap::new())),
313 timeout: Duration::from_secs_f64(timeout_secs),
314 pending_messages_limit,
315 max_length,
316 }
317 }
318
319 pub fn decode_chunk(
321 &mut self,
322 mut chunk: Bytes,
323 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
324 ensure!(
339 chunk.remaining() >= 10,
340 InvalidChunkHeaderSnafu { header: chunk }
341 );
342
343 let message_id = chunk.get_u64();
344 let sequence_number = chunk.get_u8();
345 let total_chunks = chunk.get_u8();
346
347 ensure!(
348 total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
349 InvalidTotalChunksSnafu {
350 message_id,
351 sequence_number,
352 total_chunks
353 }
354 );
355
356 ensure!(
357 sequence_number < total_chunks,
358 InvalidSequenceNumberSnafu {
359 message_id,
360 sequence_number,
361 total_chunks
362 }
363 );
364
365 let mut state_lock = self.state.lock().expect("poisoned lock");
366
367 if let Some(pending_messages_limit) = self.pending_messages_limit {
368 ensure!(
369 state_lock.len() < pending_messages_limit,
370 PendingMessagesLimitReachedSnafu {
371 message_id,
372 sequence_number,
373 pending_messages_limit
374 }
375 );
376 }
377
378 let message_state = state_lock.entry(message_id).or_insert_with(|| {
379 let state = Arc::clone(&self.state);
382 let timeout = self.timeout;
383 let timeout_handle = tokio::spawn(async move {
384 tokio::time::sleep(timeout).await;
385 let mut state_lock = state.lock().expect("poisoned lock");
386 if state_lock.remove(&message_id).is_some() {
387 warn!(
388 message_id = message_id,
389 timeout_secs = timeout.as_secs_f64(),
390 internal_log_rate_limit = true,
391 "Message was not fully received within the timeout window. Discarding it."
392 );
393 }
394 });
395 MessageState::new(total_chunks, timeout_handle)
396 });
397
398 ensure!(
399 message_state.total_chunks == total_chunks,
400 TotalChunksMismatchSnafu {
401 message_id,
402 sequence_number,
403 original_total_chunks: message_state.total_chunks,
404 received_total_chunks: total_chunks
405 }
406 );
407
408 if message_state.is_chunk_present(sequence_number) {
409 debug!(
410 message_id = message_id,
411 sequence_number = sequence_number,
412 internal_log_rate_limit = true,
413 "Received a duplicate chunk. Ignoring it."
414 );
415 return Ok(None);
416 }
417
418 message_state.add_chunk(sequence_number, chunk);
419
420 if let Some(max_length) = self.max_length {
421 let length = message_state.current_length();
422 if length > max_length {
423 state_lock.remove(&message_id);
424 return Err(ChunkedGelfDecoderError::MaxLengthExceed {
425 message_id,
426 sequence_number,
427 length,
428 max_length,
429 });
430 }
431 }
432
433 if let Some(message) = message_state.retrieve_message() {
434 state_lock.remove(&message_id);
435 Ok(Some(message))
436 } else {
437 Ok(None)
438 }
439 }
440
441 pub fn decode_message(
445 &mut self,
446 mut src: Bytes,
447 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
448 let message = if src.starts_with(GELF_MAGIC) {
449 trace!("Received a chunked GELF message based on the magic bytes");
450 src.advance(2);
451 self.decode_chunk(src)?
452 } else {
453 trace!(
454 "Received an unchunked GELF message. First two bytes of message: {:?}",
455 &src[0..2]
456 );
457 Some(src)
458 };
459
460 message
462 .map(|message| {
463 self.decompression_config
464 .get_decompression(&message)
465 .decompress(message)
466 .context(DecompressionSnafu)
467 })
468 .transpose()
469 }
470}
471
472impl Default for ChunkedGelfDecoder {
473 fn default() -> Self {
474 Self::new(
475 DEFAULT_TIMEOUT_SECS,
476 None,
477 None,
478 ChunkedGelfDecompressionConfig::Auto,
479 )
480 }
481}
482
483impl Decoder for ChunkedGelfDecoder {
484 type Item = Bytes;
485
486 type Error = BoxedFramingError;
487
488 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
489 if src.is_empty() {
490 return Ok(None);
491 }
492
493 Ok(self
494 .bytes_decoder
495 .decode(src)?
496 .and_then(|frame| self.decode_message(frame).transpose())
497 .transpose()?)
498 }
499 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
500 if buf.is_empty() {
501 return Ok(None);
502 }
503
504 Ok(self
505 .bytes_decoder
506 .decode_eof(buf)?
507 .and_then(|frame| self.decode_message(frame).transpose())
508 .transpose()?)
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use bytes::{BufMut, BytesMut};
516 use flate2::{write::GzEncoder, write::ZlibEncoder};
517 use rand::{rngs::SmallRng, seq::SliceRandom, SeedableRng};
518 use rstest::{fixture, rstest};
519 use std::fmt::Write as FmtWrite;
520 use std::io::Write as IoWrite;
521 use tracing_test::traced_test;
522
523 pub enum Compression {
524 Gzip,
525 Zlib,
526 }
527
528 impl Compression {
529 pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
530 self.compress_with_level(payload, flate2::Compression::default())
531 }
532
533 pub fn compress_with_level(
534 &self,
535 payload: &impl AsRef<[u8]>,
536 level: flate2::Compression,
537 ) -> Bytes {
538 match self {
539 Compression::Gzip => {
540 let mut encoder = GzEncoder::new(Vec::new(), level);
541 encoder
542 .write_all(payload.as_ref())
543 .expect("failed to write to encoder");
544 encoder.finish().expect("failed to finish encoder").into()
545 }
546 Compression::Zlib => {
547 let mut encoder = ZlibEncoder::new(Vec::new(), level);
548 encoder
549 .write_all(payload.as_ref())
550 .expect("failed to write to encoder");
551 encoder.finish().expect("failed to finish encoder").into()
552 }
553 }
554 }
555 }
556
557 fn create_chunk(
558 message_id: u64,
559 sequence_number: u8,
560 total_chunks: u8,
561 payload: &impl AsRef<[u8]>,
562 ) -> BytesMut {
563 let mut chunk = BytesMut::new();
564 chunk.put_slice(GELF_MAGIC);
565 chunk.put_u64(message_id);
566 chunk.put_u8(sequence_number);
567 chunk.put_u8(total_chunks);
568 chunk.extend_from_slice(payload.as_ref());
569 chunk
570 }
571
572 #[fixture]
573 fn unchunked_message() -> (BytesMut, String) {
574 let payload = "foo";
575 (BytesMut::from(payload), payload.to_string())
576 }
577
578 #[fixture]
579 fn two_chunks_message() -> ([BytesMut; 2], String) {
580 let message_id = 1u64;
581 let total_chunks = 2u8;
582
583 let first_sequence_number = 0u8;
584 let first_payload = "foo";
585 let first_chunk = create_chunk(
586 message_id,
587 first_sequence_number,
588 total_chunks,
589 &first_payload,
590 );
591
592 let second_sequence_number = 1u8;
593 let second_payload = "bar";
594 let second_chunk = create_chunk(
595 message_id,
596 second_sequence_number,
597 total_chunks,
598 &second_payload,
599 );
600
601 (
602 [first_chunk, second_chunk],
603 format!("{first_payload}{second_payload}"),
604 )
605 }
606
607 #[fixture]
608 fn three_chunks_message() -> ([BytesMut; 3], String) {
609 let message_id = 2u64;
610 let total_chunks = 3u8;
611
612 let first_sequence_number = 0u8;
613 let first_payload = "foo";
614 let first_chunk = create_chunk(
615 message_id,
616 first_sequence_number,
617 total_chunks,
618 &first_payload,
619 );
620
621 let second_sequence_number = 1u8;
622 let second_payload = "bar";
623 let second_chunk = create_chunk(
624 message_id,
625 second_sequence_number,
626 total_chunks,
627 &second_payload,
628 );
629
630 let third_sequence_number = 2u8;
631 let third_payload = "baz";
632 let third_chunk = create_chunk(
633 message_id,
634 third_sequence_number,
635 total_chunks,
636 &third_payload,
637 );
638
639 (
640 [first_chunk, second_chunk, third_chunk],
641 format!("{first_payload}{second_payload}{third_payload}"),
642 )
643 }
644
645 fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
646 error
647 .as_any()
648 .downcast_ref::<ChunkedGelfDecoderError>()
649 .expect("Expected ChunkedGelfDecoderError to be downcasted")
650 }
651
652 #[rstest]
653 #[tokio::test]
654 async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
655 let (mut chunks, expected_message) = two_chunks_message;
656 let mut decoder = ChunkedGelfDecoder::default();
657
658 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
659 assert!(frame.is_none());
660
661 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
662 assert_eq!(frame, Some(Bytes::from(expected_message)));
663 }
664
665 #[rstest]
666 #[tokio::test]
667 async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
668 let (mut message, expected_message) = unchunked_message;
669 let mut decoder = ChunkedGelfDecoder::default();
670
671 let frame = decoder.decode_eof(&mut message).unwrap();
672 assert_eq!(frame, Some(Bytes::from(expected_message)));
673 }
674
675 #[rstest]
676 #[tokio::test]
677 async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
678 let (mut chunks, expected_message) = two_chunks_message;
679 let mut decoder = ChunkedGelfDecoder::default();
680
681 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
682 assert!(frame.is_none());
683
684 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
685 assert_eq!(frame, Some(Bytes::from(expected_message)));
686 }
687
688 #[rstest]
689 #[tokio::test]
690 async fn decode_unordered_messages(
691 two_chunks_message: ([BytesMut; 2], String),
692 three_chunks_message: ([BytesMut; 3], String),
693 ) {
694 let (mut two_chunks, two_chunks_expected) = two_chunks_message;
695 let (mut three_chunks, three_chunks_expected) = three_chunks_message;
696 let mut decoder = ChunkedGelfDecoder::default();
697
698 let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
699 assert!(frame.is_none());
700
701 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
702 assert!(frame.is_none());
703
704 let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
705 assert!(frame.is_none());
706
707 let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
708 assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
709
710 let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
711 assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
712 }
713
714 #[rstest]
715 #[tokio::test]
716 async fn decode_mixed_chunked_and_unchunked_messages(
717 unchunked_message: (BytesMut, String),
718 two_chunks_message: ([BytesMut; 2], String),
719 ) {
720 let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
721 let (mut chunks, expected_chunked_message) = two_chunks_message;
722 let mut decoder = ChunkedGelfDecoder::default();
723
724 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
725 assert!(frame.is_none());
726
727 let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
728 assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
729
730 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
731 assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
732 }
733
734 #[tokio::test]
735 async fn decode_shuffled_messages() {
736 let mut rng = SmallRng::seed_from_u64(42);
737 let total_chunks = 100u8;
738 let first_message_id = 1u64;
739 let first_payload = "first payload";
740 let second_message_id = 2u64;
741 let second_payload = "second payload";
742 let first_message_chunks = (0..total_chunks).map(|sequence_number| {
743 create_chunk(
744 first_message_id,
745 sequence_number,
746 total_chunks,
747 &first_payload,
748 )
749 });
750 let second_message_chunks = (0..total_chunks).map(|sequence_number| {
751 create_chunk(
752 second_message_id,
753 sequence_number,
754 total_chunks,
755 &second_payload,
756 )
757 });
758 let expected_first_message = first_payload.repeat(total_chunks as usize);
759 let expected_second_message = second_payload.repeat(total_chunks as usize);
760 let mut merged_chunks = first_message_chunks
761 .chain(second_message_chunks)
762 .collect::<Vec<_>>();
763 merged_chunks.shuffle(&mut rng);
764 let mut decoder = ChunkedGelfDecoder::default();
765
766 let mut count = 0;
767 let first_retrieved_message = loop {
768 assert!(count < 2 * total_chunks as usize);
769 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
770 break message;
771 } else {
772 count += 1;
773 }
774 };
775 let second_retrieved_message = loop {
776 assert!(count < 2 * total_chunks as usize);
777 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
778 break message;
779 } else {
780 count += 1
781 }
782 };
783
784 assert_eq!(second_retrieved_message, expected_first_message);
785 assert_eq!(first_retrieved_message, expected_second_message);
786 }
787
788 #[rstest]
789 #[tokio::test(start_paused = true)]
790 #[traced_test]
791 async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
792 let (mut chunks, _) = two_chunks_message;
793 let mut decoder = ChunkedGelfDecoder::default();
794
795 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
796 assert!(frame.is_none());
797 assert!(!decoder.state.lock().unwrap().is_empty());
798
799 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
801 assert!(decoder.state.lock().unwrap().is_empty());
802 assert!(logs_contain(
803 "Message was not fully received within the timeout window. Discarding it."
804 ));
805
806 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
807 assert!(frame.is_none());
808
809 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
810 assert!(decoder.state.lock().unwrap().is_empty());
811 assert!(logs_contain(
812 "Message was not fully received within the timeout window. Discarding it"
813 ));
814 }
815
816 #[tokio::test]
817 async fn decode_empty_input() {
818 let mut src = BytesMut::new();
819 let mut decoder = ChunkedGelfDecoder::default();
820
821 let frame = decoder.decode_eof(&mut src).unwrap();
822 assert!(frame.is_none());
823 }
824
825 #[tokio::test]
826 async fn decode_chunk_with_invalid_header() {
827 let mut src = BytesMut::new();
828 src.extend_from_slice(GELF_MAGIC);
829 let invalid_chunk = [0x12, 0x34];
831 src.extend_from_slice(&invalid_chunk);
832 let mut decoder = ChunkedGelfDecoder::default();
833 let frame = decoder.decode_eof(&mut src);
834
835 let error = frame.unwrap_err();
836 let downcasted_error = downcast_framing_error(&error);
837 assert!(matches!(
838 downcasted_error,
839 ChunkedGelfDecoderError::InvalidChunkHeader { .. }
840 ));
841 }
842
843 #[tokio::test]
844 async fn decode_chunk_with_invalid_total_chunks() {
845 let message_id = 1u64;
846 let sequence_number = 1u8;
847 let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
848 let payload = "foo";
849 let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
850 let mut decoder = ChunkedGelfDecoder::default();
851
852 let frame = decoder.decode_eof(&mut chunk);
853 let error = frame.unwrap_err();
854 let downcasted_error = downcast_framing_error(&error);
855 assert!(matches!(
856 downcasted_error,
857 ChunkedGelfDecoderError::InvalidTotalChunks {
858 message_id: 1,
859 sequence_number: 1,
860 total_chunks: 129,
861 }
862 ));
863 }
864
865 #[tokio::test]
866 async fn decode_chunk_with_invalid_sequence_number() {
867 let message_id = 1u64;
868 let total_chunks = 2u8;
869 let invalid_sequence_number = total_chunks + 1;
870 let payload = "foo";
871 let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
872 let mut decoder = ChunkedGelfDecoder::default();
873
874 let frame = decoder.decode_eof(&mut chunk);
875 let error = frame.unwrap_err();
876 let downcasted_error = downcast_framing_error(&error);
877 assert!(matches!(
878 downcasted_error,
879 ChunkedGelfDecoderError::InvalidSequenceNumber {
880 message_id: 1,
881 sequence_number: 3,
882 total_chunks: 2,
883 }
884 ));
885 }
886
887 #[rstest]
888 #[tokio::test]
889 async fn decode_reached_pending_messages_limit(
890 two_chunks_message: ([BytesMut; 2], String),
891 three_chunks_message: ([BytesMut; 3], String),
892 ) {
893 let (mut two_chunks, _) = two_chunks_message;
894 let (mut three_chunks, _) = three_chunks_message;
895 let mut decoder = ChunkedGelfDecoder {
896 pending_messages_limit: Some(1),
897 ..Default::default()
898 };
899
900 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
901 assert!(frame.is_none());
902 assert!(decoder.state.lock().unwrap().len() == 1);
903
904 let frame = decoder.decode_eof(&mut three_chunks[0]);
905 let error = frame.unwrap_err();
906 let downcasted_error = downcast_framing_error(&error);
907 assert!(matches!(
908 downcasted_error,
909 ChunkedGelfDecoderError::PendingMessagesLimitReached {
910 message_id: 2u64,
911 sequence_number: 0u8,
912 pending_messages_limit: 1,
913 }
914 ));
915 assert!(decoder.state.lock().unwrap().len() == 1);
916 }
917
918 #[rstest]
919 #[tokio::test]
920 async fn decode_chunk_with_different_total_chunks() {
921 let message_id = 1u64;
922 let sequence_number = 0u8;
923 let total_chunks = 2u8;
924 let payload = "foo";
925 let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
926 let mut second_chunk =
927 create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
928 let mut decoder = ChunkedGelfDecoder::default();
929
930 let frame = decoder.decode_eof(&mut first_chunk).unwrap();
931 assert!(frame.is_none());
932
933 let frame = decoder.decode_eof(&mut second_chunk);
934 let error = frame.unwrap_err();
935 let downcasted_error = downcast_framing_error(&error);
936 assert!(matches!(
937 downcasted_error,
938 ChunkedGelfDecoderError::TotalChunksMismatch {
939 message_id: 1,
940 sequence_number: 1,
941 original_total_chunks: 2,
942 received_total_chunks: 3,
943 }
944 ));
945 }
946
947 #[rstest]
948 #[tokio::test]
949 async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
950 let (mut chunks, _) = two_chunks_message;
951 let mut decoder = ChunkedGelfDecoder {
952 max_length: Some(5),
953 ..Default::default()
954 };
955
956 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
957 assert!(frame.is_none());
958 let frame = decoder.decode_eof(&mut chunks[1]);
959 let error = frame.unwrap_err();
960 let downcasted_error = downcast_framing_error(&error);
961 assert!(matches!(
962 downcasted_error,
963 ChunkedGelfDecoderError::MaxLengthExceed {
964 message_id: 1,
965 sequence_number: 1,
966 length: 6,
967 max_length: 5,
968 }
969 ));
970 assert_eq!(decoder.state.lock().unwrap().len(), 0);
971 }
972
973 #[rstest]
974 #[tokio::test]
975 #[traced_test]
976 async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
977 let (mut chunks, _) = two_chunks_message;
978 let mut decoder = ChunkedGelfDecoder::default();
979
980 let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
981 assert!(frame.is_none());
982
983 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
984 assert!(frame.is_none());
985 assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
986 }
987
988 #[tokio::test]
989 #[rstest]
990 #[case::gzip(Compression::Gzip)]
991 #[case::zlib(Compression::Zlib)]
992 async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
993 let payload = (0..100).fold(String::new(), |mut payload, n| {
994 write!(payload, "foo{n}").unwrap();
995 payload
996 });
997 let compressed_payload = compression.compress(&payload);
998 let mut decoder = ChunkedGelfDecoder::default();
999
1000 let frame = decoder
1001 .decode_eof(&mut compressed_payload.into())
1002 .expect("decoding should not fail")
1003 .expect("decoding should return a frame");
1004
1005 assert_eq!(frame, payload);
1006 }
1007
1008 #[tokio::test]
1009 #[rstest]
1010 #[case::gzip(Compression::Gzip)]
1011 #[case::zlib(Compression::Zlib)]
1012 async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1013 let message_id = 1u64;
1014 let max_chunk_size = 5;
1015 let payload = (0..100).fold(String::new(), |mut payload, n| {
1016 write!(payload, "foo{n}").unwrap();
1017 payload
1018 });
1019 let compressed_payload = compression.compress(&payload);
1020 let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1021 assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1022 let mut chunks = compressed_payload
1023 .chunks(max_chunk_size)
1024 .enumerate()
1025 .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1026 .collect::<Vec<_>>();
1027 let (last_chunk, first_chunks) =
1028 chunks.split_last_mut().expect("chunks should not be empty");
1029 let mut decoder = ChunkedGelfDecoder::default();
1030
1031 for chunk in first_chunks {
1032 let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1033 assert!(frame.is_none());
1034 }
1035 let frame = decoder
1036 .decode_eof(last_chunk)
1037 .expect("decoding should not fail")
1038 .expect("decoding should return a frame");
1039
1040 assert_eq!(frame, payload);
1041 }
1042
1043 #[tokio::test]
1044 async fn decode_malformed_gzip_message() {
1045 let mut compressed_payload = BytesMut::new();
1046 compressed_payload.extend(GZIP_MAGIC);
1047 compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1048 let mut decoder = ChunkedGelfDecoder::default();
1049
1050 let error = decoder
1051 .decode_eof(&mut compressed_payload)
1052 .expect_err("decoding should fail");
1053
1054 let downcasted_error = downcast_framing_error(&error);
1055 assert!(matches!(
1056 downcasted_error,
1057 ChunkedGelfDecoderError::Decompression {
1058 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1059 }
1060 ));
1061 }
1062
1063 #[tokio::test]
1064 async fn decode_malformed_zlib_message() {
1065 let mut compressed_payload = BytesMut::new();
1066 compressed_payload.extend(ZLIB_MAGIC);
1067 compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1068 let mut decoder = ChunkedGelfDecoder::default();
1069
1070 let error = decoder
1071 .decode_eof(&mut compressed_payload)
1072 .expect_err("decoding should fail");
1073
1074 let downcasted_error = downcast_framing_error(&error);
1075 assert!(matches!(
1076 downcasted_error,
1077 ChunkedGelfDecoderError::Decompression {
1078 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1079 }
1080 ));
1081 }
1082
1083 #[tokio::test]
1084 async fn decode_zlib_payload_with_zlib_decoder() {
1085 let payload = "foo";
1086 let compressed_payload = Compression::Zlib.compress(&payload);
1087 let mut decoder = ChunkedGelfDecoder {
1088 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1089 ..Default::default()
1090 };
1091
1092 let frame = decoder
1093 .decode_eof(&mut compressed_payload.into())
1094 .expect("decoding should not fail")
1095 .expect("decoding should return a frame");
1096
1097 assert_eq!(frame, payload);
1098 }
1099
1100 #[tokio::test]
1101 async fn decode_gzip_payload_with_zlib_decoder() {
1102 let payload = "foo";
1103 let compressed_payload = Compression::Gzip.compress(&payload);
1104 let mut decoder = ChunkedGelfDecoder {
1105 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1106 ..Default::default()
1107 };
1108
1109 let error = decoder
1110 .decode_eof(&mut compressed_payload.into())
1111 .expect_err("decoding should fail");
1112
1113 let downcasted_error = downcast_framing_error(&error);
1114 assert!(matches!(
1115 downcasted_error,
1116 ChunkedGelfDecoderError::Decompression {
1117 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1118 }
1119 ));
1120 }
1121
1122 #[tokio::test]
1123 async fn decode_uncompressed_payload_with_zlib_decoder() {
1124 let payload = "foo";
1125 let mut decoder = ChunkedGelfDecoder {
1126 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1127 ..Default::default()
1128 };
1129
1130 let error = decoder
1131 .decode_eof(&mut payload.into())
1132 .expect_err("decoding should fail");
1133
1134 let downcasted_error = downcast_framing_error(&error);
1135 assert!(matches!(
1136 downcasted_error,
1137 ChunkedGelfDecoderError::Decompression {
1138 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1139 }
1140 ));
1141 }
1142
1143 #[tokio::test]
1144 async fn decode_gzip_payload_with_gzip_decoder() {
1145 let payload = "foo";
1146 let compressed_payload = Compression::Gzip.compress(&payload);
1147 let mut decoder = ChunkedGelfDecoder {
1148 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1149 ..Default::default()
1150 };
1151
1152 let frame = decoder
1153 .decode_eof(&mut compressed_payload.into())
1154 .expect("decoding should not fail")
1155 .expect("decoding should return a frame");
1156
1157 assert_eq!(frame, payload);
1158 }
1159
1160 #[tokio::test]
1161 async fn decode_zlib_payload_with_gzip_decoder() {
1162 let payload = "foo";
1163 let compressed_payload = Compression::Zlib.compress(&payload);
1164 let mut decoder = ChunkedGelfDecoder {
1165 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1166 ..Default::default()
1167 };
1168
1169 let error = decoder
1170 .decode_eof(&mut compressed_payload.into())
1171 .expect_err("decoding should fail");
1172
1173 let downcasted_error = downcast_framing_error(&error);
1174 assert!(matches!(
1175 downcasted_error,
1176 ChunkedGelfDecoderError::Decompression {
1177 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1178 }
1179 ));
1180 }
1181
1182 #[tokio::test]
1183 async fn decode_uncompressed_payload_with_gzip_decoder() {
1184 let payload = "foo";
1185 let mut decoder = ChunkedGelfDecoder {
1186 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1187 ..Default::default()
1188 };
1189
1190 let error = decoder
1191 .decode_eof(&mut payload.into())
1192 .expect_err("decoding should fail");
1193
1194 let downcasted_error = downcast_framing_error(&error);
1195 assert!(matches!(
1196 downcasted_error,
1197 ChunkedGelfDecoderError::Decompression {
1198 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1199 }
1200 ));
1201 }
1202
1203 #[tokio::test]
1204 #[rstest]
1205 #[case::gzip(Compression::Gzip)]
1206 #[case::zlib(Compression::Zlib)]
1207 async fn decode_compressed_payload_with_no_decompression_decoder(
1208 #[case] compression: Compression,
1209 ) {
1210 let payload = "foo";
1211 let compressed_payload = compression.compress(&payload);
1212 let mut decoder = ChunkedGelfDecoder {
1213 decompression_config: ChunkedGelfDecompressionConfig::None,
1214 ..Default::default()
1215 };
1216
1217 let frame = decoder
1218 .decode_eof(&mut compressed_payload.clone().into())
1219 .expect("decoding should not fail")
1220 .expect("decoding should return a frame");
1221
1222 assert_eq!(frame, compressed_payload);
1223 }
1224
1225 #[test]
1226 fn detect_gzip_compression() {
1227 let payload = "foo";
1228
1229 for level in 0..=9 {
1230 let level = flate2::Compression::new(level);
1231 let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1232 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1233 assert_eq!(
1234 actual,
1235 ChunkedGelfDecompression::Gzip,
1236 "Failed for level {}",
1237 level.level()
1238 );
1239 }
1240 }
1241
1242 #[test]
1243 fn detect_zlib_compression() {
1244 let payload = "foo";
1245
1246 for level in 0..=9 {
1247 let level = flate2::Compression::new(level);
1248 let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1249 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1250 assert_eq!(
1251 actual,
1252 ChunkedGelfDecompression::Zlib,
1253 "Failed for level {}",
1254 level.level()
1255 );
1256 }
1257 }
1258
1259 #[test]
1260 fn detect_no_compression() {
1261 let payload = "foo";
1262
1263 let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1264
1265 assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1266 }
1267}