codecs/encoding/chunking/
gelf.rs

1use std::vec;
2
3use super::Chunking;
4use bytes::{BufMut, Bytes, BytesMut};
5use tracing::trace;
6
7const GELF_MAX_TOTAL_CHUNKS: usize = 128;
8const GELF_CHUNK_HEADERS_LENGTH: usize = 12;
9const GELF_MAGIC_BYTES: [u8; 2] = [0x1e, 0x0f];
10
11/// Chunks with GELF native chunking format, as documented from the [source][source].
12/// Supports up to 128 chunks, each with a maximum size that can be configured.
13///
14/// [source]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html#chunking
15#[derive(Clone, Debug)]
16pub struct GelfChunker {
17    /// Max chunk size. This must be at least 13 bytes (12 bytes for headers + N bytes for data).
18    /// There is no specific upper limit, since it depends on the transport protocol and network interface settings.
19    /// Most networks will limit IP frames to 64KiB; however, the actual payload size limit will be lower due to UDP and GELF headers.
20    ///
21    /// For safety it is not recommended to set this value any higher than 65,500 bytes unless your network supports [Jumbograms][jumbogram].
22    ///
23    /// [jumbogram]: https://en.wikipedia.org/wiki/Jumbogram
24    pub max_chunk_size: usize,
25}
26
27impl Chunking for GelfChunker {
28    fn chunk(&self, bytes: Bytes) -> Result<Vec<Bytes>, vector_common::Error> {
29        if bytes.len() <= self.max_chunk_size {
30            return Ok(vec![bytes]);
31        }
32
33        let chunk_size = self.max_chunk_size - GELF_CHUNK_HEADERS_LENGTH;
34        let message_id: u64 = rand::random();
35        let chunk_count = bytes.len().div_ceil(chunk_size);
36
37        trace!(
38            message_id = message_id,
39            chunk_count = chunk_count,
40            chunk_size = chunk_size,
41            "Generating chunks for GELF."
42        );
43
44        if chunk_count > GELF_MAX_TOTAL_CHUNKS {
45            return Err(vector_common::Error::from(format!(
46                "Too many chunks to generate for GELF: {}, max: {}",
47                chunk_count, GELF_MAX_TOTAL_CHUNKS
48            )));
49        }
50
51        // Split into chunks and add headers to each slice.
52        // Map with index to determine sequence number.
53        let chunks = bytes
54            .chunks(chunk_size)
55            .enumerate()
56            .map(|(i, chunk)| {
57                let framed = Bytes::copy_from_slice(chunk);
58                let sequence_number = i as u8;
59                let sequence_count = chunk_count as u8;
60
61                let mut headers = BytesMut::with_capacity(GELF_CHUNK_HEADERS_LENGTH);
62                headers.put_slice(&GELF_MAGIC_BYTES);
63                headers.put_u64(message_id);
64                headers.put_u8(sequence_number);
65                headers.put_u8(sequence_count);
66
67                [headers.freeze(), framed].concat().into()
68            })
69            .collect();
70        Ok(chunks)
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use bytes::Bytes;
77
78    use super::{Chunking, GELF_CHUNK_HEADERS_LENGTH, GELF_MAGIC_BYTES, GelfChunker};
79    use crate::encoding::Chunker;
80
81    #[test]
82    fn test_gelf_chunker_noop() {
83        let chunker = Chunker::Gelf(GelfChunker {
84            max_chunk_size: 8192,
85        });
86        let input = Bytes::from("1234123412341234123");
87        let chunks = chunker.chunk(input.clone()).unwrap();
88        assert_eq!(chunks.len(), 1);
89        assert_eq!(chunks[0], input);
90    }
91
92    #[test]
93    fn test_gelf_chunker_chunk() {
94        let chunker = Chunker::Gelf(GelfChunker {
95            max_chunk_size: GELF_CHUNK_HEADERS_LENGTH + 4,
96        });
97        // Input for 5 chunks of 4 bytes: [1234] [1234] [1234] [1234] [123]
98        let input = Bytes::from("1234123412341234123");
99        let chunks = chunker.chunk(input).unwrap();
100        assert_eq!(chunks.len(), 5);
101
102        for i in 0..chunks.len() {
103            if i < 4 {
104                assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 4);
105            } else {
106                assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 3);
107            }
108            // Bytes 0 and 1: Magic bytes
109            assert_eq!(chunks[i][0..2], GELF_MAGIC_BYTES);
110            // Bytes 2 to 9: Random ID (not checked)
111            // Byte 10: Sequence number
112            assert_eq!(chunks[i][10], i as u8);
113            // Byte 11: Sequence count
114            assert_eq!(chunks[i][11], chunks.len() as u8);
115            // Payload bytes
116            if i < 4 {
117                assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], b"1234");
118            } else {
119                assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], b"123");
120            }
121        }
122    }
123
124    #[test]
125    fn test_gelf_chunker_max() {
126        let chunker = Chunker::Gelf(GelfChunker {
127            max_chunk_size: GELF_CHUNK_HEADERS_LENGTH + 65500,
128        });
129        // Input for 128 chunks of 65500 bytes of data
130        let input = Bytes::from_static(&[0; 65500 * 128]);
131        let chunks = chunker.chunk(input).unwrap();
132        assert_eq!(chunks.len(), 128);
133
134        for i in 0..chunks.len() {
135            assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 65500);
136            // Bytes 0 and 1: Magic bytes
137            assert_eq!(chunks[i][0..2], GELF_MAGIC_BYTES);
138            // Bytes 2 to 9: Random ID (not checked)
139            // Byte 10: Sequence number
140            assert_eq!(chunks[i][10], i as u8);
141            // Byte 11: Sequence count
142            assert_eq!(chunks[i][11], chunks.len() as u8);
143            // Payload bytes
144            assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], &[0; 65500]);
145        }
146    }
147}