1use std::{
2 any::Any,
3 collections::HashMap,
4 io::Read,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use derivative::Derivative;
11use flate2::read::{MultiGzDecoder, ZlibDecoder};
12use snafu::{ResultExt, Snafu, ensure};
13use tokio::{self, task::JoinHandle};
14use tokio_util::codec::Decoder;
15use tracing::{debug, trace, warn};
16use vector_common::constants::{GZIP_MAGIC, ZLIB_MAGIC};
17use vector_config::configurable_component;
18
19use super::{BoxedFramingError, FramingError};
20use crate::{BytesDecoder, StreamDecodingError};
21
22const GELF_MAGIC: &[u8] = &[0x1e, 0x0f];
23const GELF_MAX_TOTAL_CHUNKS: u8 = 128;
24const DEFAULT_TIMEOUT_SECS: f64 = 5.0;
25
26const fn default_timeout_secs() -> f64 {
27 DEFAULT_TIMEOUT_SECS
28}
29
30#[configurable_component]
32#[derive(Debug, Clone, Default)]
33pub struct ChunkedGelfDecoderConfig {
34 #[serde(default)]
36 pub chunked_gelf: ChunkedGelfDecoderOptions,
37}
38
39impl ChunkedGelfDecoderConfig {
40 pub fn build(&self) -> ChunkedGelfDecoder {
42 ChunkedGelfDecoder::new(
43 self.chunked_gelf.timeout_secs,
44 self.chunked_gelf.pending_messages_limit,
45 self.chunked_gelf.max_length,
46 self.chunked_gelf.decompression,
47 )
48 }
49}
50
51#[configurable_component]
53#[derive(Clone, Debug, Derivative)]
54#[derivative(Default)]
55pub struct ChunkedGelfDecoderOptions {
56 #[serde(default = "default_timeout_secs")]
59 #[derivative(Default(value = "default_timeout_secs()"))]
60 pub timeout_secs: f64,
61
62 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
67 pub pending_messages_limit: Option<usize>,
68
69 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
79 pub max_length: Option<usize>,
80
81 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
83 pub decompression: ChunkedGelfDecompressionConfig,
84}
85
86#[configurable_component]
88#[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)]
89#[derivative(Default)]
90pub enum ChunkedGelfDecompressionConfig {
91 #[derivative(Default)]
93 Auto,
94 Gzip,
96 Zlib,
98 None,
100}
101
102impl ChunkedGelfDecompressionConfig {
103 pub fn get_decompression(&self, data: &Bytes) -> ChunkedGelfDecompression {
104 match self {
105 Self::Auto => ChunkedGelfDecompression::from_magic(data),
106 Self::Gzip => ChunkedGelfDecompression::Gzip,
107 Self::Zlib => ChunkedGelfDecompression::Zlib,
108 Self::None => ChunkedGelfDecompression::None,
109 }
110 }
111}
112
113#[derive(Debug)]
114struct MessageState {
115 total_chunks: u8,
116 chunks: [Bytes; GELF_MAX_TOTAL_CHUNKS as usize],
117 chunks_bitmap: u128,
118 current_length: usize,
119 timeout_task: JoinHandle<()>,
120}
121
122impl MessageState {
123 pub const fn new(total_chunks: u8, timeout_task: JoinHandle<()>) -> Self {
124 Self {
125 total_chunks,
126 chunks: [const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize],
127 chunks_bitmap: 0,
128 current_length: 0,
129 timeout_task,
130 }
131 }
132
133 fn is_chunk_present(&self, sequence_number: u8) -> bool {
134 let chunk_bitmap_id = 1 << sequence_number;
135 self.chunks_bitmap & chunk_bitmap_id != 0
136 }
137
138 fn add_chunk(&mut self, sequence_number: u8, chunk: Bytes) {
139 let chunk_bitmap_id = 1 << sequence_number;
140 self.chunks_bitmap |= chunk_bitmap_id;
141 self.current_length += chunk.remaining();
142 self.chunks[sequence_number as usize] = chunk;
143 }
144
145 fn is_complete(&self) -> bool {
146 self.chunks_bitmap.count_ones() == self.total_chunks as u32
147 }
148
149 fn current_length(&self) -> usize {
150 self.current_length
151 }
152
153 fn retrieve_message(&self) -> Option<Bytes> {
154 if self.is_complete() {
155 self.timeout_task.abort();
156 let chunks = &self.chunks[0..self.total_chunks as usize];
157 let mut message = BytesMut::new();
158 for chunk in chunks {
159 message.extend_from_slice(chunk);
160 }
161 Some(message.freeze())
162 } else {
163 None
164 }
165 }
166}
167
168#[derive(Debug, PartialEq, Eq)]
169pub enum ChunkedGelfDecompression {
170 Gzip,
171 Zlib,
172 None,
173}
174
175impl ChunkedGelfDecompression {
176 pub fn from_magic(data: &Bytes) -> Self {
177 if data.starts_with(GZIP_MAGIC) {
178 trace!("Detected Gzip compression");
179 return Self::Gzip;
180 }
181
182 if data.starts_with(ZLIB_MAGIC) {
183 if let Some([first_byte, second_byte]) = data.get(0..2)
185 && (*first_byte as u16 * 256 + *second_byte as u16) % 31 == 0
186 {
187 trace!("Detected Zlib compression");
188 return Self::Zlib;
189 };
190
191 warn!(
192 "Detected Zlib magic bytes but the header is invalid: {:?}",
193 data.get(0..2)
194 );
195 };
196
197 trace!("No compression detected",);
198 Self::None
199 }
200
201 pub fn decompress(&self, data: Bytes) -> Result<Bytes, ChunkedGelfDecompressionError> {
202 let decompressed = match self {
203 Self::Gzip => {
204 let mut decoder = MultiGzDecoder::new(data.reader());
205 let mut decompressed = Vec::new();
206 decoder
207 .read_to_end(&mut decompressed)
208 .context(GzipDecompressionSnafu)?;
209 Bytes::from(decompressed)
210 }
211 Self::Zlib => {
212 let mut decoder = ZlibDecoder::new(data.reader());
213 let mut decompressed = Vec::new();
214 decoder
215 .read_to_end(&mut decompressed)
216 .context(ZlibDecompressionSnafu)?;
217 Bytes::from(decompressed)
218 }
219 Self::None => data,
220 };
221 Ok(decompressed)
222 }
223}
224
225#[derive(Debug, Snafu)]
226pub enum ChunkedGelfDecompressionError {
227 #[snafu(display("Gzip decompression error: {source}"))]
228 GzipDecompression { source: std::io::Error },
229 #[snafu(display("Zlib decompression error: {source}"))]
230 ZlibDecompression { source: std::io::Error },
231}
232
233#[derive(Debug, Snafu)]
234pub enum ChunkedGelfDecoderError {
235 #[snafu(display("Invalid chunk header with less than 10 bytes: 0x{header:0x}"))]
236 InvalidChunkHeader { header: Bytes },
237 #[snafu(display(
238 "Received chunk with message id {message_id} and sequence number {sequence_number} has an invalid total chunks value of {total_chunks}. It must be between 1 and {GELF_MAX_TOTAL_CHUNKS}."
239 ))]
240 InvalidTotalChunks {
241 message_id: u64,
242 sequence_number: u8,
243 total_chunks: u8,
244 },
245 #[snafu(display(
246 "Received chunk with message id {message_id} and sequence number {sequence_number} has a sequence number greater than its total chunks value of {total_chunks}"
247 ))]
248 InvalidSequenceNumber {
249 message_id: u64,
250 sequence_number: u8,
251 total_chunks: u8,
252 },
253 #[snafu(display(
254 "Pending messages limit of {pending_messages_limit} reached while processing chunk with message id {message_id} and sequence number {sequence_number}"
255 ))]
256 PendingMessagesLimitReached {
257 message_id: u64,
258 sequence_number: u8,
259 pending_messages_limit: usize,
260 },
261 #[snafu(display(
262 "Received chunk with message id {message_id} and sequence number {sequence_number} has different total chunks values: original total chunks value is {original_total_chunks} and received total chunks value is {received_total_chunks}"
263 ))]
264 TotalChunksMismatch {
265 message_id: u64,
266 sequence_number: u8,
267 original_total_chunks: u8,
268 received_total_chunks: u8,
269 },
270 #[snafu(display(
271 "Message with id {message_id} has exceeded the maximum message length and it will be dropped: got {length} bytes and max message length is {max_length} bytes. Discarding all buffered chunks of that message"
272 ))]
273 MaxLengthExceed {
274 message_id: u64,
275 sequence_number: u8,
276 length: usize,
277 max_length: usize,
278 },
279 #[snafu(display("Error while decompressing message. {source}"))]
280 Decompression {
281 source: ChunkedGelfDecompressionError,
282 },
283}
284
285impl StreamDecodingError for ChunkedGelfDecoderError {
286 fn can_continue(&self) -> bool {
287 true
288 }
289}
290
291impl FramingError for ChunkedGelfDecoderError {
292 fn as_any(&self) -> &dyn Any {
293 self as &dyn Any
294 }
295}
296
297#[derive(Debug, Clone)]
300pub struct ChunkedGelfDecoder {
301 bytes_decoder: BytesDecoder,
307 decompression_config: ChunkedGelfDecompressionConfig,
308 state: Arc<Mutex<HashMap<u64, MessageState>>>,
309 timeout: Duration,
310 pending_messages_limit: Option<usize>,
311 max_length: Option<usize>,
312}
313
314impl ChunkedGelfDecoder {
315 pub fn new(
317 timeout_secs: f64,
318 pending_messages_limit: Option<usize>,
319 max_length: Option<usize>,
320 decompression_config: ChunkedGelfDecompressionConfig,
321 ) -> Self {
322 Self {
323 bytes_decoder: BytesDecoder::new(),
324 decompression_config,
325 state: Arc::new(Mutex::new(HashMap::new())),
326 timeout: Duration::from_secs_f64(timeout_secs),
327 pending_messages_limit,
328 max_length,
329 }
330 }
331
332 pub fn decode_chunk(
334 &mut self,
335 mut chunk: Bytes,
336 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
337 ensure!(
352 chunk.remaining() >= 10,
353 InvalidChunkHeaderSnafu { header: chunk }
354 );
355
356 let message_id = chunk.get_u64();
357 let sequence_number = chunk.get_u8();
358 let total_chunks = chunk.get_u8();
359
360 ensure!(
361 total_chunks > 0 && total_chunks <= GELF_MAX_TOTAL_CHUNKS,
362 InvalidTotalChunksSnafu {
363 message_id,
364 sequence_number,
365 total_chunks
366 }
367 );
368
369 ensure!(
370 sequence_number < total_chunks,
371 InvalidSequenceNumberSnafu {
372 message_id,
373 sequence_number,
374 total_chunks
375 }
376 );
377
378 let mut state_lock = self.state.lock().expect("poisoned lock");
379
380 if let Some(pending_messages_limit) = self.pending_messages_limit {
381 ensure!(
382 state_lock.len() < pending_messages_limit,
383 PendingMessagesLimitReachedSnafu {
384 message_id,
385 sequence_number,
386 pending_messages_limit
387 }
388 );
389 }
390
391 let message_state = state_lock.entry(message_id).or_insert_with(|| {
392 let state = Arc::clone(&self.state);
395 let timeout = self.timeout;
396 let timeout_handle = tokio::spawn(async move {
397 tokio::time::sleep(timeout).await;
398 let mut state_lock = state.lock().expect("poisoned lock");
399 if state_lock.remove(&message_id).is_some() {
400 warn!(
401 message_id = message_id,
402 timeout_secs = timeout.as_secs_f64(),
403 internal_log_rate_limit = true,
404 "Message was not fully received within the timeout window. Discarding it."
405 );
406 }
407 });
408 MessageState::new(total_chunks, timeout_handle)
409 });
410
411 ensure!(
412 message_state.total_chunks == total_chunks,
413 TotalChunksMismatchSnafu {
414 message_id,
415 sequence_number,
416 original_total_chunks: message_state.total_chunks,
417 received_total_chunks: total_chunks
418 }
419 );
420
421 if message_state.is_chunk_present(sequence_number) {
422 debug!(
423 message_id = message_id,
424 sequence_number = sequence_number,
425 internal_log_rate_limit = true,
426 "Received a duplicate chunk. Ignoring it."
427 );
428 return Ok(None);
429 }
430
431 message_state.add_chunk(sequence_number, chunk);
432
433 if let Some(max_length) = self.max_length {
434 let length = message_state.current_length();
435 if length > max_length {
436 state_lock.remove(&message_id);
437 return Err(ChunkedGelfDecoderError::MaxLengthExceed {
438 message_id,
439 sequence_number,
440 length,
441 max_length,
442 });
443 }
444 }
445
446 if let Some(message) = message_state.retrieve_message() {
447 state_lock.remove(&message_id);
448 Ok(Some(message))
449 } else {
450 Ok(None)
451 }
452 }
453
454 pub fn decode_message(
458 &mut self,
459 mut src: Bytes,
460 ) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
461 let message = if src.starts_with(GELF_MAGIC) {
462 trace!("Received a chunked GELF message based on the magic bytes");
463 src.advance(2);
464 self.decode_chunk(src)?
465 } else {
466 trace!(
467 "Received an unchunked GELF message. First two bytes of message: {:?}",
468 &src[0..2]
469 );
470 Some(src)
471 };
472
473 message
475 .map(|message| {
476 self.decompression_config
477 .get_decompression(&message)
478 .decompress(message)
479 .context(DecompressionSnafu)
480 })
481 .transpose()
482 }
483}
484
485impl Default for ChunkedGelfDecoder {
486 fn default() -> Self {
487 Self::new(
488 DEFAULT_TIMEOUT_SECS,
489 None,
490 None,
491 ChunkedGelfDecompressionConfig::Auto,
492 )
493 }
494}
495
496impl Decoder for ChunkedGelfDecoder {
497 type Item = Bytes;
498
499 type Error = BoxedFramingError;
500
501 fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
502 if src.is_empty() {
503 return Ok(None);
504 }
505
506 Ok(self
507 .bytes_decoder
508 .decode(src)?
509 .and_then(|frame| self.decode_message(frame).transpose())
510 .transpose()?)
511 }
512 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
513 if buf.is_empty() {
514 return Ok(None);
515 }
516
517 Ok(self
518 .bytes_decoder
519 .decode_eof(buf)?
520 .and_then(|frame| self.decode_message(frame).transpose())
521 .transpose()?)
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use std::{fmt::Write as FmtWrite, io::Write as IoWrite};
528
529 use bytes::{BufMut, BytesMut};
530 use flate2::write::{GzEncoder, ZlibEncoder};
531 use rand::{SeedableRng, rngs::SmallRng, seq::SliceRandom};
532 use rstest::{fixture, rstest};
533 use tracing_test::traced_test;
534
535 use super::*;
536
537 pub enum Compression {
538 Gzip,
539 Zlib,
540 }
541
542 impl Compression {
543 pub fn compress(&self, payload: &impl AsRef<[u8]>) -> Bytes {
544 self.compress_with_level(payload, flate2::Compression::default())
545 }
546
547 pub fn compress_with_level(
548 &self,
549 payload: &impl AsRef<[u8]>,
550 level: flate2::Compression,
551 ) -> Bytes {
552 match self {
553 Compression::Gzip => {
554 let mut encoder = GzEncoder::new(Vec::new(), level);
555 encoder
556 .write_all(payload.as_ref())
557 .expect("failed to write to encoder");
558 encoder.finish().expect("failed to finish encoder").into()
559 }
560 Compression::Zlib => {
561 let mut encoder = ZlibEncoder::new(Vec::new(), level);
562 encoder
563 .write_all(payload.as_ref())
564 .expect("failed to write to encoder");
565 encoder.finish().expect("failed to finish encoder").into()
566 }
567 }
568 }
569 }
570
571 fn create_chunk(
572 message_id: u64,
573 sequence_number: u8,
574 total_chunks: u8,
575 payload: &impl AsRef<[u8]>,
576 ) -> BytesMut {
577 let mut chunk = BytesMut::new();
578 chunk.put_slice(GELF_MAGIC);
579 chunk.put_u64(message_id);
580 chunk.put_u8(sequence_number);
581 chunk.put_u8(total_chunks);
582 chunk.extend_from_slice(payload.as_ref());
583 chunk
584 }
585
586 #[fixture]
587 fn unchunked_message() -> (BytesMut, String) {
588 let payload = "foo";
589 (BytesMut::from(payload), payload.to_string())
590 }
591
592 #[fixture]
593 fn two_chunks_message() -> ([BytesMut; 2], String) {
594 let message_id = 1u64;
595 let total_chunks = 2u8;
596
597 let first_sequence_number = 0u8;
598 let first_payload = "foo";
599 let first_chunk = create_chunk(
600 message_id,
601 first_sequence_number,
602 total_chunks,
603 &first_payload,
604 );
605
606 let second_sequence_number = 1u8;
607 let second_payload = "bar";
608 let second_chunk = create_chunk(
609 message_id,
610 second_sequence_number,
611 total_chunks,
612 &second_payload,
613 );
614
615 (
616 [first_chunk, second_chunk],
617 format!("{first_payload}{second_payload}"),
618 )
619 }
620
621 #[fixture]
622 fn three_chunks_message() -> ([BytesMut; 3], String) {
623 let message_id = 2u64;
624 let total_chunks = 3u8;
625
626 let first_sequence_number = 0u8;
627 let first_payload = "foo";
628 let first_chunk = create_chunk(
629 message_id,
630 first_sequence_number,
631 total_chunks,
632 &first_payload,
633 );
634
635 let second_sequence_number = 1u8;
636 let second_payload = "bar";
637 let second_chunk = create_chunk(
638 message_id,
639 second_sequence_number,
640 total_chunks,
641 &second_payload,
642 );
643
644 let third_sequence_number = 2u8;
645 let third_payload = "baz";
646 let third_chunk = create_chunk(
647 message_id,
648 third_sequence_number,
649 total_chunks,
650 &third_payload,
651 );
652
653 (
654 [first_chunk, second_chunk, third_chunk],
655 format!("{first_payload}{second_payload}{third_payload}"),
656 )
657 }
658
659 fn downcast_framing_error(error: &BoxedFramingError) -> &ChunkedGelfDecoderError {
660 error
661 .as_any()
662 .downcast_ref::<ChunkedGelfDecoderError>()
663 .expect("Expected ChunkedGelfDecoderError to be downcasted")
664 }
665
666 #[rstest]
667 #[tokio::test]
668 async fn decode_chunked(two_chunks_message: ([BytesMut; 2], String)) {
669 let (mut chunks, expected_message) = two_chunks_message;
670 let mut decoder = ChunkedGelfDecoder::default();
671
672 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
673 assert!(frame.is_none());
674
675 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
676 assert_eq!(frame, Some(Bytes::from(expected_message)));
677 }
678
679 #[rstest]
680 #[tokio::test]
681 async fn decode_unchunked(unchunked_message: (BytesMut, String)) {
682 let (mut message, expected_message) = unchunked_message;
683 let mut decoder = ChunkedGelfDecoder::default();
684
685 let frame = decoder.decode_eof(&mut message).unwrap();
686 assert_eq!(frame, Some(Bytes::from(expected_message)));
687 }
688
689 #[rstest]
690 #[tokio::test]
691 async fn decode_unordered_chunks(two_chunks_message: ([BytesMut; 2], String)) {
692 let (mut chunks, expected_message) = two_chunks_message;
693 let mut decoder = ChunkedGelfDecoder::default();
694
695 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
696 assert!(frame.is_none());
697
698 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
699 assert_eq!(frame, Some(Bytes::from(expected_message)));
700 }
701
702 #[rstest]
703 #[tokio::test]
704 async fn decode_unordered_messages(
705 two_chunks_message: ([BytesMut; 2], String),
706 three_chunks_message: ([BytesMut; 3], String),
707 ) {
708 let (mut two_chunks, two_chunks_expected) = two_chunks_message;
709 let (mut three_chunks, three_chunks_expected) = three_chunks_message;
710 let mut decoder = ChunkedGelfDecoder::default();
711
712 let frame = decoder.decode_eof(&mut three_chunks[2]).unwrap();
713 assert!(frame.is_none());
714
715 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
716 assert!(frame.is_none());
717
718 let frame = decoder.decode_eof(&mut three_chunks[0]).unwrap();
719 assert!(frame.is_none());
720
721 let frame = decoder.decode_eof(&mut two_chunks[1]).unwrap();
722 assert_eq!(frame, Some(Bytes::from(two_chunks_expected)));
723
724 let frame = decoder.decode_eof(&mut three_chunks[1]).unwrap();
725 assert_eq!(frame, Some(Bytes::from(three_chunks_expected)));
726 }
727
728 #[rstest]
729 #[tokio::test]
730 async fn decode_mixed_chunked_and_unchunked_messages(
731 unchunked_message: (BytesMut, String),
732 two_chunks_message: ([BytesMut; 2], String),
733 ) {
734 let (mut unchunked_message, expected_unchunked_message) = unchunked_message;
735 let (mut chunks, expected_chunked_message) = two_chunks_message;
736 let mut decoder = ChunkedGelfDecoder::default();
737
738 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
739 assert!(frame.is_none());
740
741 let frame = decoder.decode_eof(&mut unchunked_message).unwrap();
742 assert_eq!(frame, Some(Bytes::from(expected_unchunked_message)));
743
744 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
745 assert_eq!(frame, Some(Bytes::from(expected_chunked_message)));
746 }
747
748 #[tokio::test]
749 async fn decode_shuffled_messages() {
750 let mut rng = SmallRng::seed_from_u64(42);
751 let total_chunks = 100u8;
752 let first_message_id = 1u64;
753 let first_payload = "first payload";
754 let second_message_id = 2u64;
755 let second_payload = "second payload";
756 let first_message_chunks = (0..total_chunks).map(|sequence_number| {
757 create_chunk(
758 first_message_id,
759 sequence_number,
760 total_chunks,
761 &first_payload,
762 )
763 });
764 let second_message_chunks = (0..total_chunks).map(|sequence_number| {
765 create_chunk(
766 second_message_id,
767 sequence_number,
768 total_chunks,
769 &second_payload,
770 )
771 });
772 let expected_first_message = first_payload.repeat(total_chunks as usize);
773 let expected_second_message = second_payload.repeat(total_chunks as usize);
774 let mut merged_chunks = first_message_chunks
775 .chain(second_message_chunks)
776 .collect::<Vec<_>>();
777 merged_chunks.shuffle(&mut rng);
778 let mut decoder = ChunkedGelfDecoder::default();
779
780 let mut count = 0;
781 let first_retrieved_message = loop {
782 assert!(count < 2 * total_chunks as usize);
783 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
784 break message;
785 } else {
786 count += 1;
787 }
788 };
789 let second_retrieved_message = loop {
790 assert!(count < 2 * total_chunks as usize);
791 if let Some(message) = decoder.decode_eof(&mut merged_chunks[count]).unwrap() {
792 break message;
793 } else {
794 count += 1
795 }
796 };
797
798 assert_eq!(second_retrieved_message, expected_first_message);
799 assert_eq!(first_retrieved_message, expected_second_message);
800 }
801
802 #[rstest]
803 #[tokio::test(start_paused = true)]
804 #[traced_test]
805 async fn decode_timeout(two_chunks_message: ([BytesMut; 2], String)) {
806 let (mut chunks, _) = two_chunks_message;
807 let mut decoder = ChunkedGelfDecoder::default();
808
809 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
810 assert!(frame.is_none());
811 assert!(!decoder.state.lock().unwrap().is_empty());
812
813 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
815 assert!(decoder.state.lock().unwrap().is_empty());
816 assert!(logs_contain(
817 "Message was not fully received within the timeout window. Discarding it."
818 ));
819
820 let frame = decoder.decode_eof(&mut chunks[1]).unwrap();
821 assert!(frame.is_none());
822
823 tokio::time::sleep(Duration::from_secs_f64(DEFAULT_TIMEOUT_SECS + 1.0)).await;
824 assert!(decoder.state.lock().unwrap().is_empty());
825 assert!(logs_contain(
826 "Message was not fully received within the timeout window. Discarding it"
827 ));
828 }
829
830 #[tokio::test]
831 async fn decode_empty_input() {
832 let mut src = BytesMut::new();
833 let mut decoder = ChunkedGelfDecoder::default();
834
835 let frame = decoder.decode_eof(&mut src).unwrap();
836 assert!(frame.is_none());
837 }
838
839 #[tokio::test]
840 async fn decode_chunk_with_invalid_header() {
841 let mut src = BytesMut::new();
842 src.extend_from_slice(GELF_MAGIC);
843 let invalid_chunk = [0x12, 0x34];
845 src.extend_from_slice(&invalid_chunk);
846 let mut decoder = ChunkedGelfDecoder::default();
847 let frame = decoder.decode_eof(&mut src);
848
849 let error = frame.unwrap_err();
850 let downcasted_error = downcast_framing_error(&error);
851 assert!(matches!(
852 downcasted_error,
853 ChunkedGelfDecoderError::InvalidChunkHeader { .. }
854 ));
855 }
856
857 #[tokio::test]
858 async fn decode_chunk_with_invalid_total_chunks() {
859 let message_id = 1u64;
860 let sequence_number = 1u8;
861 let invalid_total_chunks = GELF_MAX_TOTAL_CHUNKS + 1;
862 let payload = "foo";
863 let mut chunk = create_chunk(message_id, sequence_number, invalid_total_chunks, &payload);
864 let mut decoder = ChunkedGelfDecoder::default();
865
866 let frame = decoder.decode_eof(&mut chunk);
867 let error = frame.unwrap_err();
868 let downcasted_error = downcast_framing_error(&error);
869 assert!(matches!(
870 downcasted_error,
871 ChunkedGelfDecoderError::InvalidTotalChunks {
872 message_id: 1,
873 sequence_number: 1,
874 total_chunks: 129,
875 }
876 ));
877 }
878
879 #[tokio::test]
880 async fn decode_chunk_with_invalid_sequence_number() {
881 let message_id = 1u64;
882 let total_chunks = 2u8;
883 let invalid_sequence_number = total_chunks + 1;
884 let payload = "foo";
885 let mut chunk = create_chunk(message_id, invalid_sequence_number, total_chunks, &payload);
886 let mut decoder = ChunkedGelfDecoder::default();
887
888 let frame = decoder.decode_eof(&mut chunk);
889 let error = frame.unwrap_err();
890 let downcasted_error = downcast_framing_error(&error);
891 assert!(matches!(
892 downcasted_error,
893 ChunkedGelfDecoderError::InvalidSequenceNumber {
894 message_id: 1,
895 sequence_number: 3,
896 total_chunks: 2,
897 }
898 ));
899 }
900
901 #[rstest]
902 #[tokio::test]
903 async fn decode_reached_pending_messages_limit(
904 two_chunks_message: ([BytesMut; 2], String),
905 three_chunks_message: ([BytesMut; 3], String),
906 ) {
907 let (mut two_chunks, _) = two_chunks_message;
908 let (mut three_chunks, _) = three_chunks_message;
909 let mut decoder = ChunkedGelfDecoder {
910 pending_messages_limit: Some(1),
911 ..Default::default()
912 };
913
914 let frame = decoder.decode_eof(&mut two_chunks[0]).unwrap();
915 assert!(frame.is_none());
916 assert!(decoder.state.lock().unwrap().len() == 1);
917
918 let frame = decoder.decode_eof(&mut three_chunks[0]);
919 let error = frame.unwrap_err();
920 let downcasted_error = downcast_framing_error(&error);
921 assert!(matches!(
922 downcasted_error,
923 ChunkedGelfDecoderError::PendingMessagesLimitReached {
924 message_id: 2u64,
925 sequence_number: 0u8,
926 pending_messages_limit: 1,
927 }
928 ));
929 assert!(decoder.state.lock().unwrap().len() == 1);
930 }
931
932 #[rstest]
933 #[tokio::test]
934 async fn decode_chunk_with_different_total_chunks() {
935 let message_id = 1u64;
936 let sequence_number = 0u8;
937 let total_chunks = 2u8;
938 let payload = "foo";
939 let mut first_chunk = create_chunk(message_id, sequence_number, total_chunks, &payload);
940 let mut second_chunk =
941 create_chunk(message_id, sequence_number + 1, total_chunks + 1, &payload);
942 let mut decoder = ChunkedGelfDecoder::default();
943
944 let frame = decoder.decode_eof(&mut first_chunk).unwrap();
945 assert!(frame.is_none());
946
947 let frame = decoder.decode_eof(&mut second_chunk);
948 let error = frame.unwrap_err();
949 let downcasted_error = downcast_framing_error(&error);
950 assert!(matches!(
951 downcasted_error,
952 ChunkedGelfDecoderError::TotalChunksMismatch {
953 message_id: 1,
954 sequence_number: 1,
955 original_total_chunks: 2,
956 received_total_chunks: 3,
957 }
958 ));
959 }
960
961 #[rstest]
962 #[tokio::test]
963 async fn decode_message_greater_than_max_length(two_chunks_message: ([BytesMut; 2], String)) {
964 let (mut chunks, _) = two_chunks_message;
965 let mut decoder = ChunkedGelfDecoder {
966 max_length: Some(5),
967 ..Default::default()
968 };
969
970 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
971 assert!(frame.is_none());
972 let frame = decoder.decode_eof(&mut chunks[1]);
973 let error = frame.unwrap_err();
974 let downcasted_error = downcast_framing_error(&error);
975 assert!(matches!(
976 downcasted_error,
977 ChunkedGelfDecoderError::MaxLengthExceed {
978 message_id: 1,
979 sequence_number: 1,
980 length: 6,
981 max_length: 5,
982 }
983 ));
984 assert_eq!(decoder.state.lock().unwrap().len(), 0);
985 }
986
987 #[rstest]
988 #[tokio::test]
989 #[traced_test]
990 async fn decode_duplicated_chunk(two_chunks_message: ([BytesMut; 2], String)) {
991 let (mut chunks, _) = two_chunks_message;
992 let mut decoder = ChunkedGelfDecoder::default();
993
994 let frame = decoder.decode_eof(&mut chunks[0].clone()).unwrap();
995 assert!(frame.is_none());
996
997 let frame = decoder.decode_eof(&mut chunks[0]).unwrap();
998 assert!(frame.is_none());
999 assert!(logs_contain("Received a duplicate chunk. Ignoring it."));
1000 }
1001
1002 #[tokio::test]
1003 #[rstest]
1004 #[case::gzip(Compression::Gzip)]
1005 #[case::zlib(Compression::Zlib)]
1006 async fn decode_compressed_unchunked_message(#[case] compression: Compression) {
1007 let payload = (0..100).fold(String::new(), |mut payload, n| {
1008 write!(payload, "foo{n}").unwrap();
1009 payload
1010 });
1011 let compressed_payload = compression.compress(&payload);
1012 let mut decoder = ChunkedGelfDecoder::default();
1013
1014 let frame = decoder
1015 .decode_eof(&mut compressed_payload.into())
1016 .expect("decoding should not fail")
1017 .expect("decoding should return a frame");
1018
1019 assert_eq!(frame, payload);
1020 }
1021
1022 #[tokio::test]
1023 #[rstest]
1024 #[case::gzip(Compression::Gzip)]
1025 #[case::zlib(Compression::Zlib)]
1026 async fn decode_compressed_chunked_message(#[case] compression: Compression) {
1027 let message_id = 1u64;
1028 let max_chunk_size = 5;
1029 let payload = (0..100).fold(String::new(), |mut payload, n| {
1030 write!(payload, "foo{n}").unwrap();
1031 payload
1032 });
1033 let compressed_payload = compression.compress(&payload);
1034 let total_chunks = compressed_payload.len().div_ceil(max_chunk_size) as u8;
1035 assert!(total_chunks < GELF_MAX_TOTAL_CHUNKS);
1036 let mut chunks = compressed_payload
1037 .chunks(max_chunk_size)
1038 .enumerate()
1039 .map(|(i, chunk)| create_chunk(message_id, i as u8, total_chunks, &chunk))
1040 .collect::<Vec<_>>();
1041 let (last_chunk, first_chunks) =
1042 chunks.split_last_mut().expect("chunks should not be empty");
1043 let mut decoder = ChunkedGelfDecoder::default();
1044
1045 for chunk in first_chunks {
1046 let frame = decoder.decode_eof(chunk).expect("decoding should not fail");
1047 assert!(frame.is_none());
1048 }
1049 let frame = decoder
1050 .decode_eof(last_chunk)
1051 .expect("decoding should not fail")
1052 .expect("decoding should return a frame");
1053
1054 assert_eq!(frame, payload);
1055 }
1056
1057 #[tokio::test]
1058 async fn decode_malformed_gzip_message() {
1059 let mut compressed_payload = BytesMut::new();
1060 compressed_payload.extend(GZIP_MAGIC);
1061 compressed_payload.extend(&[0x12, 0x34, 0x56, 0x78]);
1062 let mut decoder = ChunkedGelfDecoder::default();
1063
1064 let error = decoder
1065 .decode_eof(&mut compressed_payload)
1066 .expect_err("decoding should fail");
1067
1068 let downcasted_error = downcast_framing_error(&error);
1069 assert!(matches!(
1070 downcasted_error,
1071 ChunkedGelfDecoderError::Decompression {
1072 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1073 }
1074 ));
1075 }
1076
1077 #[tokio::test]
1078 async fn decode_malformed_zlib_message() {
1079 let mut compressed_payload = BytesMut::new();
1080 compressed_payload.extend(ZLIB_MAGIC);
1081 compressed_payload.extend(&[0x9c, 0x12, 0x00, 0xFF]);
1082 let mut decoder = ChunkedGelfDecoder::default();
1083
1084 let error = decoder
1085 .decode_eof(&mut compressed_payload)
1086 .expect_err("decoding should fail");
1087
1088 let downcasted_error = downcast_framing_error(&error);
1089 assert!(matches!(
1090 downcasted_error,
1091 ChunkedGelfDecoderError::Decompression {
1092 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1093 }
1094 ));
1095 }
1096
1097 #[tokio::test]
1098 async fn decode_zlib_payload_with_zlib_decoder() {
1099 let payload = "foo";
1100 let compressed_payload = Compression::Zlib.compress(&payload);
1101 let mut decoder = ChunkedGelfDecoder {
1102 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1103 ..Default::default()
1104 };
1105
1106 let frame = decoder
1107 .decode_eof(&mut compressed_payload.into())
1108 .expect("decoding should not fail")
1109 .expect("decoding should return a frame");
1110
1111 assert_eq!(frame, payload);
1112 }
1113
1114 #[tokio::test]
1115 async fn decode_gzip_payload_with_zlib_decoder() {
1116 let payload = "foo";
1117 let compressed_payload = Compression::Gzip.compress(&payload);
1118 let mut decoder = ChunkedGelfDecoder {
1119 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1120 ..Default::default()
1121 };
1122
1123 let error = decoder
1124 .decode_eof(&mut compressed_payload.into())
1125 .expect_err("decoding should fail");
1126
1127 let downcasted_error = downcast_framing_error(&error);
1128 assert!(matches!(
1129 downcasted_error,
1130 ChunkedGelfDecoderError::Decompression {
1131 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1132 }
1133 ));
1134 }
1135
1136 #[tokio::test]
1137 async fn decode_uncompressed_payload_with_zlib_decoder() {
1138 let payload = "foo";
1139 let mut decoder = ChunkedGelfDecoder {
1140 decompression_config: ChunkedGelfDecompressionConfig::Zlib,
1141 ..Default::default()
1142 };
1143
1144 let error = decoder
1145 .decode_eof(&mut payload.into())
1146 .expect_err("decoding should fail");
1147
1148 let downcasted_error = downcast_framing_error(&error);
1149 assert!(matches!(
1150 downcasted_error,
1151 ChunkedGelfDecoderError::Decompression {
1152 source: ChunkedGelfDecompressionError::ZlibDecompression { .. }
1153 }
1154 ));
1155 }
1156
1157 #[tokio::test]
1158 async fn decode_gzip_payload_with_gzip_decoder() {
1159 let payload = "foo";
1160 let compressed_payload = Compression::Gzip.compress(&payload);
1161 let mut decoder = ChunkedGelfDecoder {
1162 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1163 ..Default::default()
1164 };
1165
1166 let frame = decoder
1167 .decode_eof(&mut compressed_payload.into())
1168 .expect("decoding should not fail")
1169 .expect("decoding should return a frame");
1170
1171 assert_eq!(frame, payload);
1172 }
1173
1174 #[tokio::test]
1175 async fn decode_zlib_payload_with_gzip_decoder() {
1176 let payload = "foo";
1177 let compressed_payload = Compression::Zlib.compress(&payload);
1178 let mut decoder = ChunkedGelfDecoder {
1179 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1180 ..Default::default()
1181 };
1182
1183 let error = decoder
1184 .decode_eof(&mut compressed_payload.into())
1185 .expect_err("decoding should fail");
1186
1187 let downcasted_error = downcast_framing_error(&error);
1188 assert!(matches!(
1189 downcasted_error,
1190 ChunkedGelfDecoderError::Decompression {
1191 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1192 }
1193 ));
1194 }
1195
1196 #[tokio::test]
1197 async fn decode_uncompressed_payload_with_gzip_decoder() {
1198 let payload = "foo";
1199 let mut decoder = ChunkedGelfDecoder {
1200 decompression_config: ChunkedGelfDecompressionConfig::Gzip,
1201 ..Default::default()
1202 };
1203
1204 let error = decoder
1205 .decode_eof(&mut payload.into())
1206 .expect_err("decoding should fail");
1207
1208 let downcasted_error = downcast_framing_error(&error);
1209 assert!(matches!(
1210 downcasted_error,
1211 ChunkedGelfDecoderError::Decompression {
1212 source: ChunkedGelfDecompressionError::GzipDecompression { .. }
1213 }
1214 ));
1215 }
1216
1217 #[tokio::test]
1218 #[rstest]
1219 #[case::gzip(Compression::Gzip)]
1220 #[case::zlib(Compression::Zlib)]
1221 async fn decode_compressed_payload_with_no_decompression_decoder(
1222 #[case] compression: Compression,
1223 ) {
1224 let payload = "foo";
1225 let compressed_payload = compression.compress(&payload);
1226 let mut decoder = ChunkedGelfDecoder {
1227 decompression_config: ChunkedGelfDecompressionConfig::None,
1228 ..Default::default()
1229 };
1230
1231 let frame = decoder
1232 .decode_eof(&mut compressed_payload.clone().into())
1233 .expect("decoding should not fail")
1234 .expect("decoding should return a frame");
1235
1236 assert_eq!(frame, compressed_payload);
1237 }
1238
1239 #[test]
1240 fn detect_gzip_compression() {
1241 let payload = "foo";
1242
1243 for level in 0..=9 {
1244 let level = flate2::Compression::new(level);
1245 let compressed_payload = Compression::Gzip.compress_with_level(&payload, level);
1246 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1247 assert_eq!(
1248 actual,
1249 ChunkedGelfDecompression::Gzip,
1250 "Failed for level {}",
1251 level.level()
1252 );
1253 }
1254 }
1255
1256 #[test]
1257 fn detect_zlib_compression() {
1258 let payload = "foo";
1259
1260 for level in 0..=9 {
1261 let level = flate2::Compression::new(level);
1262 let compressed_payload = Compression::Zlib.compress_with_level(&payload, level);
1263 let actual = ChunkedGelfDecompression::from_magic(&compressed_payload);
1264 assert_eq!(
1265 actual,
1266 ChunkedGelfDecompression::Zlib,
1267 "Failed for level {}",
1268 level.level()
1269 );
1270 }
1271 }
1272
1273 #[test]
1274 fn detect_no_compression() {
1275 let payload = "foo";
1276
1277 let detected_compression = ChunkedGelfDecompression::from_magic(&payload.into());
1278
1279 assert_eq!(detected_compression, ChunkedGelfDecompression::None);
1280 }
1281}