vector/sinks/util/buffer/
mod.rs

1use std::io::Write;
2
3use bytes::{BufMut, BytesMut};
4use flate2::write::{GzEncoder, ZlibEncoder};
5
6use super::{
7    batch::{err_event_too_large, Batch, BatchSize, PushResult},
8    snappy::SnappyEncoder,
9    zstd::ZstdEncoder,
10};
11
12pub mod compression;
13pub mod json;
14pub mod metrics;
15pub mod partition;
16pub mod vec;
17
18pub use compression::Compression;
19pub use partition::{Partition, PartitionBuffer, PartitionInnerBuffer};
20
21#[derive(Debug)]
22pub struct Buffer {
23    inner: Option<InnerBuffer>,
24    num_items: usize,
25    num_bytes: usize,
26    settings: BatchSize<Self>,
27    compression: Compression,
28}
29
30#[derive(Debug)]
31pub enum InnerBuffer {
32    Plain(bytes::buf::Writer<BytesMut>),
33    Gzip(GzEncoder<bytes::buf::Writer<BytesMut>>),
34    Zlib(ZlibEncoder<bytes::buf::Writer<BytesMut>>),
35    Zstd(ZstdEncoder<bytes::buf::Writer<BytesMut>>),
36    Snappy(SnappyEncoder<bytes::buf::Writer<BytesMut>>),
37}
38
39impl Buffer {
40    pub const fn new(settings: BatchSize<Self>, compression: Compression) -> Self {
41        Self {
42            inner: None,
43            num_items: 0,
44            num_bytes: 0,
45            settings,
46            compression,
47        }
48    }
49
50    fn buffer(&mut self) -> &mut InnerBuffer {
51        let bytes = self.settings.bytes;
52        let compression = self.compression;
53        self.inner.get_or_insert_with(|| {
54            let writer = BytesMut::with_capacity(bytes).writer();
55            match compression {
56                Compression::None => InnerBuffer::Plain(writer),
57                Compression::Gzip(level) => {
58                    InnerBuffer::Gzip(GzEncoder::new(writer, level.as_flate2()))
59                }
60                Compression::Zlib(level) => {
61                    InnerBuffer::Zlib(ZlibEncoder::new(writer, level.as_flate2()))
62                }
63                Compression::Zstd(level) => InnerBuffer::Zstd(
64                    ZstdEncoder::new(writer, level.into())
65                        .expect("Zstd encoder should not fail on init."),
66                ),
67                Compression::Snappy => InnerBuffer::Snappy(SnappyEncoder::new(writer)),
68            }
69        })
70    }
71
72    pub fn push(&mut self, input: &[u8]) {
73        self.num_items += 1;
74        match self.buffer() {
75            InnerBuffer::Plain(inner) => {
76                inner.write_all(input).unwrap();
77            }
78            InnerBuffer::Gzip(inner) => {
79                inner.write_all(input).unwrap();
80            }
81            InnerBuffer::Zlib(inner) => {
82                inner.write_all(input).unwrap();
83            }
84            InnerBuffer::Zstd(inner) => {
85                inner.write_all(input).unwrap();
86            }
87            InnerBuffer::Snappy(inner) => inner.write_all(input).unwrap(),
88        }
89    }
90
91    pub fn is_empty(&self) -> bool {
92        self.inner
93            .as_ref()
94            .map(|inner| match inner {
95                InnerBuffer::Plain(inner) => inner.get_ref().is_empty(),
96                InnerBuffer::Gzip(inner) => inner.get_ref().get_ref().is_empty(),
97                InnerBuffer::Zlib(inner) => inner.get_ref().get_ref().is_empty(),
98                InnerBuffer::Zstd(inner) => inner.get_ref().get_ref().is_empty(),
99                InnerBuffer::Snappy(inner) => inner.is_empty(),
100            })
101            .unwrap_or(true)
102    }
103}
104
105impl Batch for Buffer {
106    type Input = BytesMut;
107    type Output = BytesMut;
108
109    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
110        // The compressed encoders don't flush bytes immediately, so we
111        // can't track compressed sizes. Keep a running count of the
112        // number of bytes written instead.
113        let new_bytes = self.num_bytes + item.len();
114        if self.is_empty() && item.len() > self.settings.bytes {
115            err_event_too_large(item.len(), self.settings.bytes)
116        } else if self.num_items >= self.settings.events || new_bytes > self.settings.bytes {
117            PushResult::Overflow(item)
118        } else {
119            self.push(&item);
120            self.num_bytes = new_bytes;
121            PushResult::Ok(
122                self.num_items >= self.settings.events || new_bytes >= self.settings.bytes,
123            )
124        }
125    }
126
127    fn is_empty(&self) -> bool {
128        self.is_empty()
129    }
130
131    fn fresh(&self) -> Self {
132        Self::new(self.settings, self.compression)
133    }
134
135    fn finish(self) -> Self::Output {
136        match self.inner {
137            Some(InnerBuffer::Plain(inner)) => inner.into_inner(),
138            Some(InnerBuffer::Gzip(inner)) => inner
139                .finish()
140                .expect("This can't fail because the inner writer is a Vec")
141                .into_inner(),
142            Some(InnerBuffer::Zlib(inner)) => inner
143                .finish()
144                .expect("This can't fail because the inner writer is a Vec")
145                .into_inner(),
146            Some(InnerBuffer::Zstd(inner)) => inner
147                .finish()
148                .expect("This can't fail because the inner writer is a Vec")
149                .into_inner(),
150            Some(InnerBuffer::Snappy(inner)) => inner
151                .finish()
152                .expect("This can't fail because the inner writer is a Vec")
153                .into_inner(),
154            None => BytesMut::new(),
155        }
156    }
157
158    fn num_items(&self) -> usize {
159        self.num_items
160    }
161}
162
163#[cfg(test)]
164mod test {
165    use std::{
166        io::Read,
167        sync::{Arc, Mutex},
168    };
169
170    use bytes::{Buf, BytesMut};
171    use futures::{future, stream, SinkExt, StreamExt};
172    use tokio::time::Duration;
173    use vector_lib::json_size::JsonSize;
174
175    use super::{Buffer, Compression};
176    use crate::sinks::util::{BatchSettings, BatchSink, EncodedEvent};
177
178    #[tokio::test]
179    async fn gzip() {
180        use flate2::read::MultiGzDecoder;
181
182        let sent_requests = Arc::new(Mutex::new(Vec::new()));
183
184        let svc = tower::service_fn(|req| {
185            let sent_requests = Arc::clone(&sent_requests);
186            sent_requests.lock().unwrap().push(req);
187            future::ok::<_, std::io::Error>(())
188        });
189
190        let mut batch_settings = BatchSettings::default();
191        batch_settings.size.bytes = 100_000;
192        batch_settings.size.events = 1_000;
193        batch_settings.timeout = Duration::from_secs(0);
194
195        let buffered = BatchSink::new(
196            svc,
197            Buffer::new(batch_settings.size, Compression::gzip_default()),
198            batch_settings.timeout,
199        );
200
201        let input = std::iter::repeat_n(
202            BytesMut::from(
203                "It's going down, I'm yelling timber, You better move, you better dance",
204            ),
205            100_000,
206        );
207
208        buffered
209            .sink_map_err(drop)
210            .send_all(
211                &mut stream::iter(input)
212                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
213            )
214            .await
215            .unwrap();
216
217        let output = Arc::try_unwrap(sent_requests)
218            .unwrap()
219            .into_inner()
220            .unwrap();
221
222        assert!(output.len() > 1);
223        assert!(output.iter().map(|o| o.len()).sum::<usize>() < 80_000);
224
225        let decompressed = output.into_iter().flat_map(|batch| {
226            let mut decompressed = vec![];
227            MultiGzDecoder::new(batch.reader())
228                .read_to_end(&mut decompressed)
229                .unwrap();
230            decompressed
231        });
232
233        assert!(decompressed.eq(std::iter::repeat_n(
234            b"It's going down, I'm yelling timber, You better move, you better dance".to_vec(),
235            100_000
236        )
237        .flatten()));
238    }
239}