1use std::{
2 any::Any,
3 collections::HashMap,
4 io::Read,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27 DEFAULT_TIMEOUT_SECS
28}
29
30#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34 #[serde(default)]
36 pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40 pub fn build(&self) -> ChunkedGelfDecoder {
42 ChunkedGelfDecoder::new(
43 self.chunked_gelf.timeout_secs,
44 self.chunked_gelf.pending_messages_limit,
45 self.chunked_gelf.max_length,
46 self.chunked_gelf.decompression,
47 )
48 }
49}
50
51#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56 #[serde(default = "default_timeout_secs")]
59 #[derivative(Default(value = "default_timeout_secs()"))]
60 pub timeout_secs: f64,
61
62 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67 pub pending_messages_limit: Option<usize>,
68
69 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79 pub max_length: Option<usize>,
80
81 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83 pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
89#[derivative(Default)]
90pub enum ChunkedGelfDecompressionConfig {
91 #[derivative(Default)]
93 Auto,
94 Gzip,
96 Zlib,
98 None,
100}
101
102impl ChunkedGelfDecompressionConfig {
103 pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
104 match self {
105 Self::Auto => ChunkedGelfDecompression::from_magic(data),
106 Self::Gzip => ChunkedGelfDecompression::Gzip,
107 Self::Zlib => ChunkedGelfDecompression::Zlib,
108 Self::None => ChunkedGelfDecompression::None,
109 }
110 }
111}
112
113#[derive(Debug)]
114struct MessageState {
115 total_chunks: u8,
116 chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
117 chunks_bitmap: u128,
118 current_length: usize,
119 timeout_task: JoinHandle<()>,
120}
121
122impl MessageState {
123 pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
124 Self {
125 total_chunks,
126 chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
127 chunks_bitmap: 0,
128 current_length: 0,
129 timeout_task,
130 }
131 }
132
133 fn is_chunk_present(&self, sequence_number: u8) -> bool {
134 let chunk_bitmap_id = 1 << sequence_number;
135 self.chunks_bitmap & chunk_bitmap_id != 0
136 }
137
138 fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
139 let chunk_bitmap_id = 1 << sequence_number;
140 self.chunks_bitmap |= chunk_bitmap_id;
141 self.current_length += chunk.remaining();
142 self.chunks[sequence_number as usize] = chunk;
143 }
144
145 fn is_complete(&self) -> bool {
146 self.chunks_bitmap.count_ones() == self.total_chunks as u32
147 }
148
149 fn current_length(&self) -> usize {
150 self.current_length
151 }
152
153 fn retrieve_message(&self) -> Option<Bytes> {
154 if self.is_complete() {
155 self.timeout_task.abort();
156 let chunks = &self.chunks[0..self.total_chunks as usize];
157 let mut message = BytesMut::new();
158 for chunk in chunks {
159 message.extend_from_slice(chunk);
160 }
161 Some(message.freeze())
162 } else {
163 None
164 }
165 }
166}
167
168#[derive(Debug, PartialEq, Eq)]
169pub enum ChunkedGelfDecompression {
170 Gzip,
171 Zlib,
172 None,
173}
174
175impl ChunkedGelfDecompression {
176 pub fn from_magic(data: &Bytes) -> Self {
177 if data.starts_with(GZIP_MAGIC) {
178 trace!("Detected Gzip compression");
179 return Self::Gzip;
180 }
181
182 if data.starts_with(ZLIB_MAGIC) {
183 if let Some([first_byte, second_byte]) = data.get(0..2)
185 && (*first_byte as u16 * 256 + *second_byte as u16).is_multiple_of(31)
186 {
187 trace!("Detected Zlib compression");
188 return Self::Zlib;
189 };
190
191 warn!(
192 "Detected Zlib magic bytes but the header is invalid: {:?}",
193 data.get(0..2)
194 );
195 };
196
197 trace!("No compression detected",);
198 Self::None
199 }
200
201 pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
202 let decompressed = match self {
203 Self::Gzip => {
204 let mut decoder = MultiGzDecoder::new(data.reader());
205 let mut decompressed = Vec::new();
206 decoder
207 .read_to_end(&mut decompressed)
208 .context(GzipDecompressionSnafu)?;
209 Bytes::from(decompressed)
210 }
211 Self::Zlib => {
212 let mut decoder = ZlibDecoder::new(data.reader());
213 let mut decompressed = Vec::new();
214 decoder
215 .read_to_end(&mut decompressed)
216 .context(ZlibDecompressionSnafu)?;
217 Bytes::from(decompressed)
218 }
219 Self::None => data,
220 };
221 Ok(decompressed)
222 }
223}
224
225#[derive(Debug, Snafu)]
226pub enum ChunkedGelfDecompressionError {
227 #[snafu(display("Gzip decompression error: {source}"))]
228 GzipDecompression { source: std::io::Error },
229 #[snafu(display("Zlib decompression error: {source}"))]
230 ZlibDecompression { source: std::io::Error },
231}
232
233#[derive(Debug, Snafu)]
234pub enum ChunkedGelfDecoderError {
235 #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
236 InvalidChunkHeader { header: Bytes },
237 #[snafu(display(
238 "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
239 ))]
240 InvalidTotalChunks {
241 message_id: u64,
242 sequence_number: u8,
243 total_chunks: u8,
244 },
245 #[snafu(display(
246 "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
247 ))]
248 InvalidSequenceNumber {
249 message_id: u64,
250 sequence_number: u8,
251 total_chunks: u8,
252 },
253 #[snafu(display(
254 "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
255 ))]
256 PendingMessagesLimitReached {
257 message_id: u64,
258 sequence_number: u8,
259 pending_messages_limit: usize,
260 },
261 #[snafu(display(
262 "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
263 ))]
264 TotalChunksMismatch {
265 message_id: u64,
266 sequence_number: u8,
267 original_total_chunks: u8,
268 received_total_chunks: u8,
269 },
270 #[snafu(display(
271 "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
272 ))]
273 MaxLengthExceed {
274 message_id: u64,
275 sequence_number: u8,
276 length: usize,
277 max_length: usize,
278 },
279 #[snafu(display("Error while decompressing message. {source}"))]
280 Decompression {
281 source: ChunkedGelfDecompressionError,
282 },
283}
284
285impl StreamDecodingError for ChunkedGelfDecoderError {
286 fn can_continue(&self) -> bool {
287 true
288 }
289}
290
291impl FramingError for ChunkedGelfDecoderError {
292 fn as_any(&self) -> &dyn Any {
293 self as &dyn Any
294 }
295}
296
297#[derive(Debug, Clone)]
300pub struct ChunkedGelfDecoder {
301 bytes_decoder: BytesDecoder,
307 decompression_config: ChunkedGelfDecompressionConfig,
308 state: Arc<Mutex<HashMap<u64, MessageState>>>,
309 timeout: Duration,
310 pending_messages_limit: Option<usize>,
311 max_length: Option<usize>,
312}
313
314impl ChunkedGelfDecoder {
315 pub fn new(
317 timeout_secs: f64,
318 pending_messages_limit: Option<usize>,
319 max_length: Option<usize>,
320 decompression_config: ChunkedGelfDecompressionConfig,
321 ) -> Self {
322 Self {
323 bytes_decoder: BytesDecoder::new(),
324 decompression_config,
325 state: Arc::new(Mutex::new(HashMap::new())),
326 timeout: Duration::from_secs_f64(timeout_secs),
327 pending_messages_limit,
328 max_length,
329 }
330 }
331
332 pub fn decode_chunk(
334 &mut self,
335 mut chunk: Bytes,
336 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
337 ensure!(
352 chunk.remaining() >= 10,
353 InvalidChunkHeaderSnafu { header: chunk }
354 );
355
356 let message_id = chunk.get_u64();
357 let sequence_number = chunk.get_u8();
358 let total_chunks = chunk.get_u8();
359
360 ensure!(
361 total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
362 InvalidTotalChunksSnafu {
363 message_id,
364 sequence_number,
365 total_chunks
366 }
367 );
368
369 ensure!(
370 sequence_number < total_chunks,
371 InvalidSequenceNumberSnafu {
372 message_id,
373 sequence_number,
374 total_chunks
375 }
376 );
377
378 let mut state_lock = self.state.lock().expect("poisoned lock");
379
380 if let Some(pending_messages_limit) = self.pending_messages_limit {
381 ensure!(
382 state_lock.len() < pending_messages_limit,
383 PendingMessagesLimitReachedSnafu {
384 message_id,
385 sequence_number,
386 pending_messages_limit
387 }
388 );
389 }
390
391 let message_state = state_lock.entry(message_id).or_insert_with(|| {
392 let state = Arc::clone(&self.state);
395 let timeout = self.timeout;
396 let timeout_handle = tokio::spawn(async move {
397 tokio::time::sleep(timeout).await;
398 let mut state_lock = state.lock().expect("poisoned lock");
399 if state_lock.remove(&message_id).is_some() {
400 warn!(
401 message_id = message_id,
402 timeout_secs = timeout.as_secs_f64(),
403 "Message was not fully received within the timeout window. Discarding it."
404 );
405 }
406 });
407 MessageState::new(total_chunks, timeout_handle)
408 });
409
410 ensure!(
411 message_state.total_chunks == total_chunks,
412 TotalChunksMismatchSnafu {
413 message_id,
414 sequence_number,
415 original_total_chunks: message_state.total_chunks,
416 received_total_chunks: total_chunks
417 }
418 );
419
420 if message_state.is_chunk_present(sequence_number) {
421 debug!(
422 message_id = message_id,
423 sequence_number = sequence_number,
424 "Received a duplicate chunk. Ignoring it."
425 );
426 return Ok(None);
427 }
428
429 message_state.add_chunk(sequence_number, chunk);
430
431 if let Some(max_length) = self.max_length {
432 let length = message_state.current_length();
433 if length > max_length {
434 state_lock.remove(&message_id);
435 return Err(ChunkedGelfDecoderError::MaxLengthExceed {
436 message_id,
437 sequence_number,
438 length,
439 max_length,
440 });
441 }
442 }
443
444 if let Some(message) = message_state.retrieve_message() {
445 state_lock.remove(&message_id);
446 Ok(Some(message))
447 } else {
448 Ok(None)
449 }
450 }
451
452 pub fn decode_message(
456 &mut self,
457 mut src: Bytes,
458 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
459 let message = if src.starts_with(GELF_MAGIC) {
460 trace!("Received a chunked GELF message based on the magic bytes");
461 src.advance(2);
462 self.decode_chunk(src)?
463 } else {
464 trace!(
465 "Received an unchunked GELF message. First two bytes of message: {:?}",
466 &src[0..2]
467 );
468 Some(src)
469 };
470
471 message
473 .map(|message| {
474 self.decompression_config
475 .get_decompression(&message)
476 .decompress(message)
477 .context(DecompressionSnafu)
478 })
479 .transpose()
480 }
481}
482
483impl Default for ChunkedGelfDecoder {
484 fn default() -> Self {
485 Self::new(
486 DEFAULT_TIMEOUT_SECS,
487 None,
488 None,
489 ChunkedGelfDecompressionConfig::Auto,
490 )
491 }
492}
493
494impl Decoder for ChunkedGelfDecoder {
495 type Item = Bytes;
496
497 type Error = BoxedFramingError;
498
499 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
500 if src.is_empty() {
501 return Ok(None);
502 }
503
504 Ok(self
505 .bytes_decoder
506 .decode(src)?
507 .and_then(|frame| self.decode_message(frame).transpose())
508 .transpose()?)
509 }
510 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
511 if buf.is_empty() {
512 return Ok(None);
513 }
514
515 Ok(self
516 .bytes_decoder
517 .decode_eof(buf)?
518 .and_then(|frame| self.decode_message(frame).transpose())
519 .transpose()?)
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
526
527 use bytes::{BufMut, BytesMut};
528 use flate2::write::{GzEncoder, ZlibEncoder};
529 use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
530 use rstest::{fixture, rstest};
531 use tracing_test::traced_test;
532
533 use super::*;
534
535 pub enum Compression {
536 Gzip,
537 Zlib,
538 }
539
540 impl Compression {
541 pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
542 self.compress_with_level(payload, flate2::Compression::default())
543 }
544
545 pub fn compress_with_level(
546 &self,
547 payload: &impl AsRef<[u8]>,
548 level: flate2::Compression,
549 ) -> Bytes {
550 match self {
551 Compression::Gzip => {
552 let mut encoder = GzEncoder::new(Vec::new(), level);
553 encoder
554 .write_all(payload.as_ref())
555 .expect("failed to write to encoder");
556 encoder.finish().expect("failed to finish encoder").into()
557 }
558 Compression::Zlib => {
559 let mut encoder = ZlibEncoder::new(Vec::new(), level);
560 encoder
561 .write_all(payload.as_ref())
562 .expect("failed to write to encoder");
563 encoder.finish().expect("failed to finish encoder").into()
564 }
565 }
566 }
567 }
568
569 fn create_chunk(
570 message_id: u64,
571 sequence_number: u8,
572 total_chunks: u8,
573 payload: &impl AsRef<[u8]>,
574 ) -> BytesMut {
575 let mut chunk = BytesMut::new();
576 chunk.put_slice(GELF_MAGIC);
577 chunk.put_u64(message_id);
578 chunk.put_u8(sequence_number);
579 chunk.put_u8(total_chunks);
580 chunk.extend_from_slice(payload.as_ref());
581 chunk
582 }
583
584 #[fixture]
585 fn unchunked_message() -> (BytesMut, String) {
586 let payload = "foo";
587 (BytesMut::from(payload), payload.to_string())
588 }
589
590 #[fixture]
591 fn two_chunks_message() -> ([BytesMut; 2], String) {
592 let message_id = 1u64;
593 let total_chunks = 2u8;
594
595 let first_sequence_number = 0u8;
596 let first_payload = "foo";
597 let first_chunk = create_chunk(
598 message_id,
599 first_sequence_number,
600 total_chunks,
601 &first_payload,
602 );
603
604 let second_sequence_number = 1u8;
605 let second_payload = "bar";
606 let second_chunk = create_chunk(
607 message_id,
608 second_sequence_number,
609 total_chunks,
610 &second_payload,
611 );
612
613 (
614 [first_chunk, second_chunk],
615 format!("{first_payload}{second_payload}"),
616 )
617 }
618
619 #[fixture]
620 fn three_chunks_message() -> ([BytesMut; 3], String) {
621 let message_id = 2u64;
622 let total_chunks = 3u8;
623
624 let first_sequence_number = 0u8;
625 let first_payload = "foo";
626 let first_chunk = create_chunk(
627 message_id,
628 first_sequence_number,
629 total_chunks,
630 &first_payload,
631 );
632
633 let second_sequence_number = 1u8;
634 let second_payload = "bar";
635 let second_chunk = create_chunk(
636 message_id,
637 second_sequence_number,
638 total_chunks,
639 &second_payload,
640 );
641
642 let third_sequence_number = 2u8;
643 let third_payload = "baz";
644 let third_chunk = create_chunk(
645 message_id,
646 third_sequence_number,
647 total_chunks,
648 &third_payload,
649 );
650
651 (
652 [first_chunk, second_chunk, third_chunk],
653 format!("{first_payload}{second_payload}{third_payload}"),
654 )
655 }
656
657 fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
658 error
659 .as_any()
660 .downcast_ref::<ChunkedGelfDecoderError>()
661 .expect("Expected ChunkedGelfDecoderError to be downcasted")
662 }
663
664 #[rstest]
665 #[tokio::test]
666 async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
667 let (mut chunks, expected_message) = two_chunks_message;
668 let mut decoder = ChunkedGelfDecoder::default();
669
670 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
671 assert!(frame.is_none());
672
673 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
674 assert_eq!(frame, Some(Bytes::from(expected_message)));
675 }
676
677 #[rstest]
678 #[tokio::test]
679 async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
680 let (mut message, expected_message) = unchunked_message;
681 let mut decoder = ChunkedGelfDecoder::default();
682
683 let frame = decoder.decode_eof(&mut message).unwrap();
684 assert_eq!(frame, Some(Bytes::from(expected_message)));
685 }
686
687 #[rstest]
688 #[tokio::test]
689 async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
690 let (mut chunks, expected_message) = two_chunks_message;
691 let mut decoder = ChunkedGelfDecoder::default();
692
693 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
694 assert!(frame.is_none());
695
696 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
697 assert_eq!(frame, Some(Bytes::from(expected_message)));
698 }
699
700 #[rstest]
701 #[tokio::test]
702 async fn decode_unordered_messages(
703 two_chunks_message: ([BytesMut; 2], String),
704 three_chunks_message: ([BytesMut; 3], String),
705 ) {
706 let (mut two_chunks, two_chunks_expected) = two_chunks_message;
707 let (mut three_chunks, three_chunks_expected) = three_chunks_message;
708 let mut decoder = ChunkedGelfDecoder::default();
709
710 let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
711 assert!(frame.is_none());
712
713 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
714 assert!(frame.is_none());
715
716 let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
717 assert!(frame.is_none());
718
719 let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
720 assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
721
722 let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
723 assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
724 }
725
726 #[rstest]
727 #[tokio::test]
728 async fn decode_mixed_chunked_and_unchunked_messages(
729 unchunked_message: (BytesMut, String),
730 two_chunks_message: ([BytesMut; 2], String),
731 ) {
732 let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
733 let (mut chunks, expected_chunked_message) = two_chunks_message;
734 let mut decoder = ChunkedGelfDecoder::default();
735
736 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
737 assert!(frame.is_none());
738
739 let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
740 assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
741
742 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
743 assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
744 }
745
746 #[tokio::test]
747 async fn decode_shuffled_messages() {
748 let mut rng = SmallRng::seed_from_u64(42);
749 let total_chunks = 100u8;
750 let first_message_id = 1u64;
751 let first_payload = "first payload";
752 let second_message_id = 2u64;
753 let second_payload = "second payload";
754 let first_message_chunks = (0..total_chunks).map(|sequence_number| {
755 create_chunk(
756 first_message_id,
757 sequence_number,
758 total_chunks,
759 &first_payload,
760 )
761 });
762 let second_message_chunks = (0..total_chunks).map(|sequence_number| {
763 create_chunk(
764 second_message_id,
765 sequence_number,
766 total_chunks,
767 &second_payload,
768 )
769 });
770 let expected_first_message = first_payload.repeat(total_chunks as usize);
771 let expected_second_message = second_payload.repeat(total_chunks as usize);
772 let mut merged_chunks = first_message_chunks
773 .chain(second_message_chunks)
774 .collect::<Vec<_>>();
775 merged_chunks.shuffle(&mut rng);
776 let mut decoder = ChunkedGelfDecoder::default();
777
778 let mut count = 0;
779 let first_retrieved_message = loop {
780 assert!(count < 2 * total_chunks as usize);
781 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
782 break message;
783 } else {
784 count += 1;
785 }
786 };
787 let second_retrieved_message = loop {
788 assert!(count < 2 * total_chunks as usize);
789 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
790 break message;
791 } else {
792 count += 1
793 }
794 };
795
796 assert_eq!(second_retrieved_message, expected_first_message);
797 assert_eq!(first_retrieved_message, expected_second_message);
798 }
799
800 #[rstest]
801 #[tokio::test(start_paused = true)]
802 #[traced_test]
803 async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
804 let (mut chunks, _) = two_chunks_message;
805 let mut decoder = ChunkedGelfDecoder::default();
806
807 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
808 assert!(frame.is_none());
809 assert!(!decoder.state.lock().unwrap().is_empty());
810
811 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
813 assert!(decoder.state.lock().unwrap().is_empty());
814 assert!(logs_contain(
815 "Message was not fully received within the timeout window. Discarding it."
816 ));
817
818 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
819 assert!(frame.is_none());
820
821 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
822 assert!(decoder.state.lock().unwrap().is_empty());
823 assert!(logs_contain(
824 "Message was not fully received within the timeout window. Discarding it"
825 ));
826 }
827
828 #[tokio::test]
829 async fn decode_empty_input() {
830 let mut src = BytesMut::new();
831 let mut decoder = ChunkedGelfDecoder::default();
832
833 let frame = decoder.decode_eof(&mut src).unwrap();
834 assert!(frame.is_none());
835 }
836
837 #[tokio::test]
838 async fn decode_chunk_with_invalid_header() {
839 let mut src = BytesMut::new();
840 src.extend_from_slice(GELF_MAGIC);
841 let invalid_chunk = [0x12, 0x34];
843 src.extend_from_slice(&invalid_chunk);
844 let mut decoder = ChunkedGelfDecoder::default();
845 let frame = decoder.decode_eof(&mut src);
846
847 let error = frame.unwrap_err();
848 let downcasted_error = downcast_framing_error(&error);
849 assert!(matches!(
850 downcasted_error,
851 ChunkedGelfDecoderError::InvalidChunkHeader { .. }
852 ));
853 }
854
855 #[tokio::test]
856 async fn decode_chunk_with_invalid_total_chunks() {
857 let message_id = 1u64;
858 let sequence_number = 1u8;
859 let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
860 let payload = "foo";
861 let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
862 let mut decoder = ChunkedGelfDecoder::default();
863
864 let frame = decoder.decode_eof(&mut chunk);
865 let error = frame.unwrap_err();
866 let downcasted_error = downcast_framing_error(&error);
867 assert!(matches!(
868 downcasted_error,
869 ChunkedGelfDecoderError::InvalidTotalChunks {
870 message_id: 1,
871 sequence_number: 1,
872 total_chunks: 129,
873 }
874 ));
875 }
876
877 #[tokio::test]
878 async fn decode_chunk_with_invalid_sequence_number() {
879 let message_id = 1u64;
880 let total_chunks = 2u8;
881 let invalid_sequence_number = total_chunks + 1;
882 let payload = "foo";
883 let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
884 let mut decoder = ChunkedGelfDecoder::default();
885
886 let frame = decoder.decode_eof(&mut chunk);
887 let error = frame.unwrap_err();
888 let downcasted_error = downcast_framing_error(&error);
889 assert!(matches!(
890 downcasted_error,
891 ChunkedGelfDecoderError::InvalidSequenceNumber {
892 message_id: 1,
893 sequence_number: 3,
894 total_chunks: 2,
895 }
896 ));
897 }
898
899 #[rstest]
900 #[tokio::test]
901 async fn decode_reached_pending_messages_limit(
902 two_chunks_message: ([BytesMut; 2], String),
903 three_chunks_message: ([BytesMut; 3], String),
904 ) {
905 let (mut two_chunks, _) = two_chunks_message;
906 let (mut three_chunks, _) = three_chunks_message;
907 let mut decoder = ChunkedGelfDecoder {
908 pending_messages_limit: Some(1),
909 ..Default::default()
910 };
911
912 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
913 assert!(frame.is_none());
914 assert!(decoder.state.lock().unwrap().len() == 1);
915
916 let frame = decoder.decode_eof(&mut three_chunks[0]);
917 let error = frame.unwrap_err();
918 let downcasted_error = downcast_framing_error(&error);
919 assert!(matches!(
920 downcasted_error,
921 ChunkedGelfDecoderError::PendingMessagesLimitReached {
922 message_id: 2u64,
923 sequence_number: 0u8,
924 pending_messages_limit: 1,
925 }
926 ));
927 assert!(decoder.state.lock().unwrap().len() == 1);
928 }
929
930 #[rstest]
931 #[tokio::test]
932 async fn decode_chunk_with_different_total_chunks() {
933 let message_id = 1u64;
934 let sequence_number = 0u8;
935 let total_chunks = 2u8;
936 let payload = "foo";
937 let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
938 let mut second_chunk =
939 create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
940 let mut decoder = ChunkedGelfDecoder::default();
941
942 let frame = decoder.decode_eof(&mut first_chunk).unwrap();
943 assert!(frame.is_none());
944
945 let frame = decoder.decode_eof(&mut second_chunk);
946 let error = frame.unwrap_err();
947 let downcasted_error = downcast_framing_error(&error);
948 assert!(matches!(
949 downcasted_error,
950 ChunkedGelfDecoderError::TotalChunksMismatch {
951 message_id: 1,
952 sequence_number: 1,
953 original_total_chunks: 2,
954 received_total_chunks: 3,
955 }
956 ));
957 }
958
959 #[rstest]
960 #[tokio::test]
961 async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
962 let (mut chunks, _) = two_chunks_message;
963 let mut decoder = ChunkedGelfDecoder {
964 max_length: Some(5),
965 ..Default::default()
966 };
967
968 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
969 assert!(frame.is_none());
970 let frame = decoder.decode_eof(&mut chunks[1]);
971 let error = frame.unwrap_err();
972 let downcasted_error = downcast_framing_error(&error);
973 assert!(matches!(
974 downcasted_error,
975 ChunkedGelfDecoderError::MaxLengthExceed {
976 message_id: 1,
977 sequence_number: 1,
978 length: 6,
979 max_length: 5,
980 }
981 ));
982 assert_eq!(decoder.state.lock().unwrap().len(), 0);
983 }
984
985 #[rstest]
986 #[tokio::test]
987 #[traced_test]
988 async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
989 let (mut chunks, _) = two_chunks_message;
990 let mut decoder = ChunkedGelfDecoder::default();
991
992 let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
993 assert!(frame.is_none());
994
995 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
996 assert!(frame.is_none());
997 assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
998 }
999
1000 #[tokio::test]
1001 #[rstest]
1002 #[case::gzip(Compression::Gzip)]
1003 #[case::zlib(Compression::Zlib)]
1004 async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1005 let payload = (0..100).fold(String::new(), |mut payload, n| {
1006 write!(payload, "foo{n}").unwrap();
1007 payload
1008 });
1009 let compressed_payload = compression.compress(&payload);
1010 let mut decoder = ChunkedGelfDecoder::default();
1011
1012 let frame = decoder
1013 .decode_eof(&mut compressed_payload.into())
1014 .expect("decoding should not fail")
1015 .expect("decoding should return a frame");
1016
1017 assert_eq!(frame, payload);
1018 }
1019
1020 #[tokio::test]
1021 #[rstest]
1022 #[case::gzip(Compression::Gzip)]
1023 #[case::zlib(Compression::Zlib)]
1024 async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1025 let message_id = 1u64;
1026 let max_chunk_size = 5;
1027 let payload = (0..100).fold(String::new(), |mut payload, n| {
1028 write!(payload, "foo{n}").unwrap();
1029 payload
1030 });
1031 let compressed_payload = compression.compress(&payload);
1032 let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1033 assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1034 let mut chunks = compressed_payload
1035 .chunks(max_chunk_size)
1036 .enumerate()
1037 .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1038 .collect::<Vec<_>>();
1039 let (last_chunk, first_chunks) =
1040 chunks.split_last_mut().expect("chunks should not be empty");
1041 let mut decoder = ChunkedGelfDecoder::default();
1042
1043 for chunk in first_chunks {
1044 let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1045 assert!(frame.is_none());
1046 }
1047 let frame = decoder
1048 .decode_eof(last_chunk)
1049 .expect("decoding should not fail")
1050 .expect("decoding should return a frame");
1051
1052 assert_eq!(frame, payload);
1053 }
1054
1055 #[tokio::test]
1056 async fn decode_malformed_gzip_message() {
1057 let mut compressed_payload = BytesMut::new();
1058 compressed_payload.extend(GZIP_MAGIC);
1059 compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1060 let mut decoder = ChunkedGelfDecoder::default();
1061
1062 let error = decoder
1063 .decode_eof(&mut compressed_payload)
1064 .expect_err("decoding should fail");
1065
1066 let downcasted_error = downcast_framing_error(&error);
1067 assert!(matches!(
1068 downcasted_error,
1069 ChunkedGelfDecoderError::Decompression {
1070 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1071 }
1072 ));
1073 }
1074
1075 #[tokio::test]
1076 async fn decode_malformed_zlib_message() {
1077 let mut compressed_payload = BytesMut::new();
1078 compressed_payload.extend(ZLIB_MAGIC);
1079 compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1080 let mut decoder = ChunkedGelfDecoder::default();
1081
1082 let error = decoder
1083 .decode_eof(&mut compressed_payload)
1084 .expect_err("decoding should fail");
1085
1086 let downcasted_error = downcast_framing_error(&error);
1087 assert!(matches!(
1088 downcasted_error,
1089 ChunkedGelfDecoderError::Decompression {
1090 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1091 }
1092 ));
1093 }
1094
1095 #[tokio::test]
1096 async fn decode_zlib_payload_with_zlib_decoder() {
1097 let payload = "foo";
1098 let compressed_payload = Compression::Zlib.compress(&payload);
1099 let mut decoder = ChunkedGelfDecoder {
1100 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1101 ..Default::default()
1102 };
1103
1104 let frame = decoder
1105 .decode_eof(&mut compressed_payload.into())
1106 .expect("decoding should not fail")
1107 .expect("decoding should return a frame");
1108
1109 assert_eq!(frame, payload);
1110 }
1111
1112 #[tokio::test]
1113 async fn decode_gzip_payload_with_zlib_decoder() {
1114 let payload = "foo";
1115 let compressed_payload = Compression::Gzip.compress(&payload);
1116 let mut decoder = ChunkedGelfDecoder {
1117 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1118 ..Default::default()
1119 };
1120
1121 let error = decoder
1122 .decode_eof(&mut compressed_payload.into())
1123 .expect_err("decoding should fail");
1124
1125 let downcasted_error = downcast_framing_error(&error);
1126 assert!(matches!(
1127 downcasted_error,
1128 ChunkedGelfDecoderError::Decompression {
1129 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1130 }
1131 ));
1132 }
1133
1134 #[tokio::test]
1135 async fn decode_uncompressed_payload_with_zlib_decoder() {
1136 let payload = "foo";
1137 let mut decoder = ChunkedGelfDecoder {
1138 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1139 ..Default::default()
1140 };
1141
1142 let error = decoder
1143 .decode_eof(&mut payload.into())
1144 .expect_err("decoding should fail");
1145
1146 let downcasted_error = downcast_framing_error(&error);
1147 assert!(matches!(
1148 downcasted_error,
1149 ChunkedGelfDecoderError::Decompression {
1150 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1151 }
1152 ));
1153 }
1154
1155 #[tokio::test]
1156 async fn decode_gzip_payload_with_gzip_decoder() {
1157 let payload = "foo";
1158 let compressed_payload = Compression::Gzip.compress(&payload);
1159 let mut decoder = ChunkedGelfDecoder {
1160 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1161 ..Default::default()
1162 };
1163
1164 let frame = decoder
1165 .decode_eof(&mut compressed_payload.into())
1166 .expect("decoding should not fail")
1167 .expect("decoding should return a frame");
1168
1169 assert_eq!(frame, payload);
1170 }
1171
1172 #[tokio::test]
1173 async fn decode_zlib_payload_with_gzip_decoder() {
1174 let payload = "foo";
1175 let compressed_payload = Compression::Zlib.compress(&payload);
1176 let mut decoder = ChunkedGelfDecoder {
1177 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1178 ..Default::default()
1179 };
1180
1181 let error = decoder
1182 .decode_eof(&mut compressed_payload.into())
1183 .expect_err("decoding should fail");
1184
1185 let downcasted_error = downcast_framing_error(&error);
1186 assert!(matches!(
1187 downcasted_error,
1188 ChunkedGelfDecoderError::Decompression {
1189 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1190 }
1191 ));
1192 }
1193
1194 #[tokio::test]
1195 async fn decode_uncompressed_payload_with_gzip_decoder() {
1196 let payload = "foo";
1197 let mut decoder = ChunkedGelfDecoder {
1198 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1199 ..Default::default()
1200 };
1201
1202 let error = decoder
1203 .decode_eof(&mut payload.into())
1204 .expect_err("decoding should fail");
1205
1206 let downcasted_error = downcast_framing_error(&error);
1207 assert!(matches!(
1208 downcasted_error,
1209 ChunkedGelfDecoderError::Decompression {
1210 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1211 }
1212 ));
1213 }
1214
1215 #[tokio::test]
1216 #[rstest]
1217 #[case::gzip(Compression::Gzip)]
1218 #[case::zlib(Compression::Zlib)]
1219 async fn decode_compressed_payload_with_no_decompression_decoder(
1220 #[case] compression: Compression,
1221 ) {
1222 let payload = "foo";
1223 let compressed_payload = compression.compress(&payload);
1224 let mut decoder = ChunkedGelfDecoder {
1225 decompression_config: ChunkedGelfDecompressionConfig::None,
1226 ..Default::default()
1227 };
1228
1229 let frame = decoder
1230 .decode_eof(&mut compressed_payload.clone().into())
1231 .expect("decoding should not fail")
1232 .expect("decoding should return a frame");
1233
1234 assert_eq!(frame, compressed_payload);
1235 }
1236
1237 #[test]
1238 fn detect_gzip_compression() {
1239 let payload = "foo";
1240
1241 for level in 0..=9 {
1242 let level = flate2::Compression::new(level);
1243 let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1244 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1245 assert_eq!(
1246 actual,
1247 ChunkedGelfDecompression::Gzip,
1248 "Failed for level {}",
1249 level.level()
1250 );
1251 }
1252 }
1253
1254 #[test]
1255 fn detect_zlib_compression() {
1256 let payload = "foo";
1257
1258 for level in 0..=9 {
1259 let level = flate2::Compression::new(level);
1260 let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1261 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1262 assert_eq!(
1263 actual,
1264 ChunkedGelfDecompression::Zlib,
1265 "Failed for level {}",
1266 level.level()
1267 );
1268 }
1269 }
1270
1271 #[test]
1272 fn detect_no_compression() {
1273 let payload = "foo";
1274
1275 let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1276
1277 assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1278 }
1279}