vector/sinks/util/buffer/
mod.rs1use std::io::Write;
2
3use bytes::{BufMut, BytesMut};
4use flate2::write::{GzEncoder, ZlibEncoder};
5
6use super::{
7 batch::{err_event_too_large, Batch, BatchSize, PushResult},
8 snappy::SnappyEncoder,
9 zstd::ZstdEncoder,
10};
11
12pub mod compression;
13pub mod json;
14pub mod metrics;
15pub mod partition;
16pub mod vec;
17
18pub use compression::Compression;
19pub use partition::{Partition, PartitionBuffer, PartitionInnerBuffer};
20
21#[derive(Debug)]
22pub struct Buffer {
23 inner: Option<InnerBuffer>,
24 num_items: usize,
25 num_bytes: usize,
26 settings: BatchSize<Self>,
27 compression: Compression,
28}
29
30#[derive(Debug)]
31pub enum InnerBuffer {
32 Plain(bytes::buf::Writer<BytesMut>),
33 Gzip(GzEncoder<bytes::buf::Writer<BytesMut>>),
34 Zlib(ZlibEncoder<bytes::buf::Writer<BytesMut>>),
35 Zstd(ZstdEncoder<bytes::buf::Writer<BytesMut>>),
36 Snappy(SnappyEncoder<bytes::buf::Writer<BytesMut>>),
37}
38
39impl Buffer {
40 pub const fn new(settings: BatchSize<Self>, compression: Compression) -> Self {
41 Self {
42 inner: None,
43 num_items: 0,
44 num_bytes: 0,
45 settings,
46 compression,
47 }
48 }
49
50 fn buffer(&mut self) -> &mut InnerBuffer {
51 let bytes = self.settings.bytes;
52 let compression = self.compression;
53 self.inner.get_or_insert_with(|| {
54 let writer = BytesMut::with_capacity(bytes).writer();
55 match compression {
56 Compression::None => InnerBuffer::Plain(writer),
57 Compression::Gzip(level) => {
58 InnerBuffer::Gzip(GzEncoder::new(writer, level.as_flate2()))
59 }
60 Compression::Zlib(level) => {
61 InnerBuffer::Zlib(ZlibEncoder::new(writer, level.as_flate2()))
62 }
63 Compression::Zstd(level) => InnerBuffer::Zstd(
64 ZstdEncoder::new(writer, level.into())
65 .expect("Zstd encoder should not fail on init."),
66 ),
67 Compression::Snappy => InnerBuffer::Snappy(SnappyEncoder::new(writer)),
68 }
69 })
70 }
71
72 pub fn push(&mut self, input: &[u8]) {
73 self.num_items += 1;
74 match self.buffer() {
75 InnerBuffer::Plain(inner) => {
76 inner.write_all(input).unwrap();
77 }
78 InnerBuffer::Gzip(inner) => {
79 inner.write_all(input).unwrap();
80 }
81 InnerBuffer::Zlib(inner) => {
82 inner.write_all(input).unwrap();
83 }
84 InnerBuffer::Zstd(inner) => {
85 inner.write_all(input).unwrap();
86 }
87 InnerBuffer::Snappy(inner) => inner.write_all(input).unwrap(),
88 }
89 }
90
91 pub fn is_empty(&self) -> bool {
92 self.inner
93 .as_ref()
94 .map(|inner| match inner {
95 InnerBuffer::Plain(inner) => inner.get_ref().is_empty(),
96 InnerBuffer::Gzip(inner) => inner.get_ref().get_ref().is_empty(),
97 InnerBuffer::Zlib(inner) => inner.get_ref().get_ref().is_empty(),
98 InnerBuffer::Zstd(inner) => inner.get_ref().get_ref().is_empty(),
99 InnerBuffer::Snappy(inner) => inner.is_empty(),
100 })
101 .unwrap_or(true)
102 }
103}
104
105impl Batch for Buffer {
106 type Input = BytesMut;
107 type Output = BytesMut;
108
109 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
110 let new_bytes = self.num_bytes + item.len();
114 if self.is_empty() && item.len() > self.settings.bytes {
115 err_event_too_large(item.len(), self.settings.bytes)
116 } else if self.num_items >= self.settings.events || new_bytes > self.settings.bytes {
117 PushResult::Overflow(item)
118 } else {
119 self.push(&item);
120 self.num_bytes = new_bytes;
121 PushResult::Ok(
122 self.num_items >= self.settings.events || new_bytes >= self.settings.bytes,
123 )
124 }
125 }
126
127 fn is_empty(&self) -> bool {
128 self.is_empty()
129 }
130
131 fn fresh(&self) -> Self {
132 Self::new(self.settings, self.compression)
133 }
134
135 fn finish(self) -> Self::Output {
136 match self.inner {
137 Some(InnerBuffer::Plain(inner)) => inner.into_inner(),
138 Some(InnerBuffer::Gzip(inner)) => inner
139 .finish()
140 .expect("This can't fail because the inner writer is a Vec")
141 .into_inner(),
142 Some(InnerBuffer::Zlib(inner)) => inner
143 .finish()
144 .expect("This can't fail because the inner writer is a Vec")
145 .into_inner(),
146 Some(InnerBuffer::Zstd(inner)) => inner
147 .finish()
148 .expect("This can't fail because the inner writer is a Vec")
149 .into_inner(),
150 Some(InnerBuffer::Snappy(inner)) => inner
151 .finish()
152 .expect("This can't fail because the inner writer is a Vec")
153 .into_inner(),
154 None => BytesMut::new(),
155 }
156 }
157
158 fn num_items(&self) -> usize {
159 self.num_items
160 }
161}
162
163#[cfg(test)]
164mod test {
165 use std::{
166 io::Read,
167 sync::{Arc, Mutex},
168 };
169
170 use bytes::{Buf, BytesMut};
171 use futures::{future, stream, SinkExt, StreamExt};
172 use tokio::time::Duration;
173 use vector_lib::json_size::JsonSize;
174
175 use super::{Buffer, Compression};
176 use crate::sinks::util::{BatchSettings, BatchSink, EncodedEvent};
177
178 #[tokio::test]
179 async fn gzip() {
180 use flate2::read::MultiGzDecoder;
181
182 let sent_requests = Arc::new(Mutex::new(Vec::new()));
183
184 let svc = tower::service_fn(|req| {
185 let sent_requests = Arc::clone(&sent_requests);
186 sent_requests.lock().unwrap().push(req);
187 future::ok::<_, std::io::Error>(())
188 });
189
190 let mut batch_settings = BatchSettings::default();
191 batch_settings.size.bytes = 100_000;
192 batch_settings.size.events = 1_000;
193 batch_settings.timeout = Duration::from_secs(0);
194
195 let buffered = BatchSink::new(
196 svc,
197 Buffer::new(batch_settings.size, Compression::gzip_default()),
198 batch_settings.timeout,
199 );
200
201 let input = std::iter::repeat_n(
202 BytesMut::from(
203 "It's going down, I'm yelling timber, You better move, you better dance",
204 ),
205 100_000,
206 );
207
208 buffered
209 .sink_map_err(drop)
210 .send_all(
211 &mut stream::iter(input)
212 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
213 )
214 .await
215 .unwrap();
216
217 let output = Arc::try_unwrap(sent_requests)
218 .unwrap()
219 .into_inner()
220 .unwrap();
221
222 assert!(output.len() > 1);
223 assert!(output.iter().map(|o| o.len()).sum::<usize>() < 80_000);
224
225 let decompressed = output.into_iter().flat_map(|batch| {
226 let mut decompressed = vec![];
227 MultiGzDecoder::new(batch.reader())
228 .read_to_end(&mut decompressed)
229 .unwrap();
230 decompressed
231 });
232
233 assert!(decompressed.eq(std::iter::repeat_n(
234 b"It's going down, I'm yelling timber, You better move, you better dance".to_vec(),
235 100_000
236 )
237 .flatten()));
238 }
239}