codecs/encoding/chunking/
gelf.rs1use std::vec;
2
3use super::Chunking;
4use bytes::{BufMut, Bytes, BytesMut};
5use tracing::trace;
6
7const GELF_MAX_TOTAL_CHUNKS: usize = 128;
8const GELF_CHUNK_HEADERS_LENGTH: usize = 12;
9const GELF_MAGIC_BYTES: [u8; 2] = [0x1e, 0x0f];
10
11#[derive(Clone, Debug)]
16pub struct GelfChunker {
17 pub max_chunk_size: usize,
25}
26
27impl Chunking for GelfChunker {
28 fn chunk(&self, bytes: Bytes) -> Result<Vec<Bytes>, vector_common::Error> {
29 if bytes.len() <= self.max_chunk_size {
30 return Ok(vec![bytes]);
31 }
32
33 let chunk_size = self.max_chunk_size - GELF_CHUNK_HEADERS_LENGTH;
34 let message_id: u64 = rand::random();
35 let chunk_count = bytes.len().div_ceil(chunk_size);
36
37 trace!(
38 message_id = message_id,
39 chunk_count = chunk_count,
40 chunk_size = chunk_size,
41 "Generating chunks for GELF."
42 );
43
44 if chunk_count > GELF_MAX_TOTAL_CHUNKS {
45 return Err(vector_common::Error::from(format!(
46 "Too many chunks to generate for GELF: {}, max: {}",
47 chunk_count, GELF_MAX_TOTAL_CHUNKS
48 )));
49 }
50
51 let chunks = bytes
54 .chunks(chunk_size)
55 .enumerate()
56 .map(|(i, chunk)| {
57 let framed = Bytes::copy_from_slice(chunk);
58 let sequence_number = i as u8;
59 let sequence_count = chunk_count as u8;
60
61 let mut headers = BytesMut::with_capacity(GELF_CHUNK_HEADERS_LENGTH);
62 headers.put_slice(&GELF_MAGIC_BYTES);
63 headers.put_u64(message_id);
64 headers.put_u8(sequence_number);
65 headers.put_u8(sequence_count);
66
67 [headers.freeze(), framed].concat().into()
68 })
69 .collect();
70 Ok(chunks)
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use bytes::Bytes;
77
78 use super::{Chunking, GELF_CHUNK_HEADERS_LENGTH, GELF_MAGIC_BYTES, GelfChunker};
79 use crate::encoding::Chunker;
80
81 #[test]
82 fn test_gelf_chunker_noop() {
83 let chunker = Chunker::Gelf(GelfChunker {
84 max_chunk_size: 8192,
85 });
86 let input = Bytes::from("1234123412341234123");
87 let chunks = chunker.chunk(input.clone()).unwrap();
88 assert_eq!(chunks.len(), 1);
89 assert_eq!(chunks[0], input);
90 }
91
92 #[test]
93 fn test_gelf_chunker_chunk() {
94 let chunker = Chunker::Gelf(GelfChunker {
95 max_chunk_size: GELF_CHUNK_HEADERS_LENGTH + 4,
96 });
97 let input = Bytes::from("1234123412341234123");
99 let chunks = chunker.chunk(input).unwrap();
100 assert_eq!(chunks.len(), 5);
101
102 for i in 0..chunks.len() {
103 if i < 4 {
104 assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 4);
105 } else {
106 assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 3);
107 }
108 assert_eq!(chunks[i][0..2], GELF_MAGIC_BYTES);
110 assert_eq!(chunks[i][10], i as u8);
113 assert_eq!(chunks[i][11], chunks.len() as u8);
115 if i < 4 {
117 assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], b"1234");
118 } else {
119 assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], b"123");
120 }
121 }
122 }
123
124 #[test]
125 fn test_gelf_chunker_max() {
126 let chunker = Chunker::Gelf(GelfChunker {
127 max_chunk_size: GELF_CHUNK_HEADERS_LENGTH + 65500,
128 });
129 let input = Bytes::from_static(&[0; 65500 * 128]);
131 let chunks = chunker.chunk(input).unwrap();
132 assert_eq!(chunks.len(), 128);
133
134 for i in 0..chunks.len() {
135 assert_eq!(chunks[i].len(), GELF_CHUNK_HEADERS_LENGTH + 65500);
136 assert_eq!(chunks[i][0..2], GELF_MAGIC_BYTES);
138 assert_eq!(chunks[i][10], i as u8);
141 assert_eq!(chunks[i][11], chunks.len() as u8);
143 assert_eq!(&chunks[i][GELF_CHUNK_HEADERS_LENGTH..], &[0; 65500]);
145 }
146 }
147}