vector/sinks/util/
compressor.rs

1use std::{io, io::BufWriter};
2
3use bytes::{BufMut, BytesMut};
4use flate2::write::{GzEncoder, ZlibEncoder};
5
6use super::{snappy::SnappyEncoder, zstd::ZstdEncoder, Compression};
7
8const GZIP_INPUT_BUFFER_CAPACITY: usize = 4_096;
9const ZLIB_INPUT_BUFFER_CAPACITY: usize = 4_096;
10
11const OUTPUT_BUFFER_CAPACITY: usize = 1_024;
12
13enum Writer {
14    Plain(bytes::buf::Writer<BytesMut>),
15    Gzip(BufWriter<GzEncoder<bytes::buf::Writer<BytesMut>>>),
16    Zlib(BufWriter<ZlibEncoder<bytes::buf::Writer<BytesMut>>>),
17    Zstd(ZstdEncoder<bytes::buf::Writer<BytesMut>>),
18    Snappy(SnappyEncoder<bytes::buf::Writer<BytesMut>>),
19}
20
21impl Writer {
22    pub fn get_ref(&self) -> &BytesMut {
23        match self {
24            Writer::Plain(inner) => inner.get_ref(),
25            Writer::Gzip(inner) => inner.get_ref().get_ref().get_ref(),
26            Writer::Zlib(inner) => inner.get_ref().get_ref().get_ref(),
27            Writer::Zstd(inner) => inner.get_ref().get_ref(),
28            Writer::Snappy(inner) => inner.get_ref().get_ref(),
29        }
30    }
31
32    pub fn into_inner(self) -> BytesMut {
33        match self {
34            Writer::Plain(writer) => writer,
35            Writer::Gzip(writer) => writer
36                .into_inner()
37                .expect("BufWriter writer should not fail to finish")
38                .finish()
39                .expect("gzip writer should not fail to finish"),
40            Writer::Zlib(writer) => writer
41                .into_inner()
42                .expect("BufWriter writer should not fail to finish")
43                .finish()
44                .expect("zlib writer should not fail to finish"),
45            Writer::Zstd(writer) => writer
46                .finish()
47                .expect("zstd writer should not fail to finish"),
48            Writer::Snappy(writer) => writer
49                .finish()
50                .expect("snappy writer should not fail to finish"),
51        }
52        .into_inner()
53    }
54
55    pub fn finish(self) -> io::Result<BytesMut> {
56        let buf = match self {
57            Writer::Plain(writer) => writer,
58            Writer::Gzip(writer) => writer.into_inner()?.finish()?,
59            Writer::Zlib(writer) => writer.into_inner()?.finish()?,
60            Writer::Zstd(writer) => writer.finish()?,
61            Writer::Snappy(writer) => writer.finish()?,
62        }
63        .into_inner();
64
65        Ok(buf)
66    }
67}
68
69impl From<Compression> for Writer {
70    fn from(compression: Compression) -> Self {
71        let writer = BytesMut::with_capacity(OUTPUT_BUFFER_CAPACITY).writer();
72        match compression {
73            Compression::None => Writer::Plain(writer),
74            // Buffering writes to the underlying Encoder writer
75            // to avoid Vec-trashing and expensive memset syscalls.
76            // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152
77            Compression::Gzip(level) => Writer::Gzip(BufWriter::with_capacity(
78                GZIP_INPUT_BUFFER_CAPACITY,
79                GzEncoder::new(writer, level.as_flate2()),
80            )),
81            // Buffering writes to the underlying Encoder writer
82            // to avoid Vec-trashing and expensive memset syscalls.
83            // https://github.com/rust-lang/flate2-rs/issues/395#issuecomment-1975088152
84            Compression::Zlib(level) => Writer::Zlib(BufWriter::with_capacity(
85                ZLIB_INPUT_BUFFER_CAPACITY,
86                ZlibEncoder::new(writer, level.as_flate2()),
87            )),
88            Compression::Zstd(level) => {
89                let encoder = ZstdEncoder::new(writer, level.into())
90                    .expect("Zstd encoder should not fail on init.");
91                Writer::Zstd(encoder)
92            }
93            Compression::Snappy => Writer::Snappy(SnappyEncoder::new(writer)),
94        }
95    }
96}
97
98impl io::Write for Writer {
99    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
100        #[allow(clippy::disallowed_methods)] // Caller handles the result of `write`.
101        match self {
102            Writer::Plain(inner_buf) => inner_buf.write(buf),
103            Writer::Gzip(writer) => writer.write(buf),
104            Writer::Zlib(writer) => writer.write(buf),
105            Writer::Zstd(writer) => writer.write(buf),
106            Writer::Snappy(writer) => writer.write(buf),
107        }
108    }
109
110    fn flush(&mut self) -> io::Result<()> {
111        match self {
112            Writer::Plain(writer) => writer.flush(),
113            Writer::Gzip(writer) => writer.flush(),
114            Writer::Zlib(writer) => writer.flush(),
115            Writer::Zstd(writer) => writer.flush(),
116            Writer::Snappy(writer) => writer.flush(),
117        }
118    }
119}
120
121/// Simple compressor implementation based on [`Compression`].
122///
123/// Users can acquire a `Compressor` via [`Compressor::from`] based on the desired compression scheme.
124pub struct Compressor {
125    compression: Compression,
126    inner: Writer,
127}
128
129impl Compressor {
130    /// Gets a mutable reference to the underlying buffer.
131    pub fn get_ref(&self) -> &BytesMut {
132        self.inner.get_ref()
133    }
134
135    /// Gets whether or not this compressor will actually compress the input.
136    ///
137    /// While it may be counterintuitive for "compression" to not compress, this is simply a
138    /// consequence of designing a single type that may or may not compress so that we can avoid
139    /// having to box writers at a higher-level.
140    ///
141    /// Some callers can benefit from knowing whether or not compression is actually taking place,
142    /// as different size limitations may come into play.
143    pub const fn is_compressed(&self) -> bool {
144        self.compression.is_compressed()
145    }
146
147    /// Consumes the compressor, returning the internal buffer used by the compressor.
148    ///
149    /// # Errors
150    ///
151    /// If the compressor encounters an I/O error while finalizing the payload, an error
152    /// variant will be returned.
153    pub fn finish(self) -> io::Result<BytesMut> {
154        self.inner.finish()
155    }
156
157    /// Consumes the compressor, returning the internal buffer used by the compressor.
158    ///
159    /// # Panics
160    ///
161    /// Panics if finalizing the compressor encounters an I/O error.  This should generally only be
162    /// possible when the system is out of memory and allocations cannot be performed to write any
163    /// footer/checksum data.
164    ///
165    /// Consider using `finish` if catching these scenarios is important.
166    pub fn into_inner(self) -> BytesMut {
167        self.inner.into_inner()
168    }
169}
170
171impl io::Write for Compressor {
172    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
173        #[allow(clippy::disallowed_methods)] // Caller handles the result of `write`.
174        self.inner.write(buf)
175    }
176
177    fn flush(&mut self) -> io::Result<()> {
178        self.inner.flush()
179    }
180}
181
182impl From<Compression> for Compressor {
183    fn from(compression: Compression) -> Self {
184        Compressor {
185            compression,
186            inner: compression.into(),
187        }
188    }
189}