vector/sinks/util/
compressor.rs1use std::{io, io::BufWriter};
2
3use bytes::{BufMut, BytesMut};
4use flate2::write::{GzEncoder, ZlibEncoder};
5
6use super::{snappy::SnappyEncoder, zstd::ZstdEncoder, Compression};
7
8const GZIP_INPUT_BUFFER_CAPACITY: usize = 4_096;
9const ZLIB_INPUT_BUFFER_CAPACITY: usize = 4_096;
10
11const OUTPUT_BUFFER_CAPACITY: usize = 1_024;
12
13enum Writer {
14 Plain(bytes::buf::Writer<BytesMut>),
15 Gzip(BufWriter<GzEncoder<bytes::buf::Writer<BytesMut>>>),
16 Zlib(BufWriter<ZlibEncoder<bytes::buf::Writer<BytesMut>>>),
17 Zstd(ZstdEncoder<bytes::buf::Writer<BytesMut>>),
18 Snappy(SnappyEncoder<bytes::buf::Writer<BytesMut>>),
19}
20
21impl Writer {
22 pub fn get_ref(&self) -> &BytesMut {
23 match self {
24 Writer::Plain(inner) => inner.get_ref(),
25 Writer::Gzip(inner) => inner.get_ref().get_ref().get_ref(),
26 Writer::Zlib(inner) => inner.get_ref().get_ref().get_ref(),
27 Writer::Zstd(inner) => inner.get_ref().get_ref(),
28 Writer::Snappy(inner) => inner.get_ref().get_ref(),
29 }
30 }
31
32 pub fn into_inner(self) -> BytesMut {
33 match self {
34 Writer::Plain(writer) => writer,
35 Writer::Gzip(writer) => writer
36 .into_inner()
37 .expect("BufWriter writer should not fail to finish")
38 .finish()
39 .expect("gzip writer should not fail to finish"),
40 Writer::Zlib(writer) => writer
41 .into_inner()
42 .expect("BufWriter writer should not fail to finish")
43 .finish()
44 .expect("zlib writer should not fail to finish"),
45 Writer::Zstd(writer) => writer
46 .finish()
47 .expect("zstd writer should not fail to finish"),
48 Writer::Snappy(writer) => writer
49 .finish()
50 .expect("snappy writer should not fail to finish"),
51 }
52 .into_inner()
53 }
54
55 pub fn finish(self) -> io::Result<BytesMut> {
56 let buf = match self {
57 Writer::Plain(writer) => writer,
58 Writer::Gzip(writer) => writer.into_inner()?.finish()?,
59 Writer::Zlib(writer) => writer.into_inner()?.finish()?,
60 Writer::Zstd(writer) => writer.finish()?,
61 Writer::Snappy(writer) => writer.finish()?,
62 }
63 .into_inner();
64
65 Ok(buf)
66 }
67}
68
69impl From<Compression> for Writer {
70 fn from(compression: Compression) -> Self {
71 let writer = BytesMut::with_capacity(OUTPUT_BUFFER_CAPACITY).writer();
72 match compression {
73 Compression::None => Writer::Plain(writer),
74 Compression::Gzip(level) => Writer::Gzip(BufWriter::with_capacity(
78 GZIP_INPUT_BUFFER_CAPACITY,
79 GzEncoder::new(writer, level.as_flate2()),
80 )),
81 Compression::Zlib(level) => Writer::Zlib(BufWriter::with_capacity(
85 ZLIB_INPUT_BUFFER_CAPACITY,
86 ZlibEncoder::new(writer, level.as_flate2()),
87 )),
88 Compression::Zstd(level) => {
89 let encoder = ZstdEncoder::new(writer, level.into())
90 .expect("Zstd encoder should not fail on init.");
91 Writer::Zstd(encoder)
92 }
93 Compression::Snappy => Writer::Snappy(SnappyEncoder::new(writer)),
94 }
95 }
96}
97
98impl io::Write for Writer {
99 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
100 #[allow(clippy::disallowed_methods)] match self {
102 Writer::Plain(inner_buf) => inner_buf.write(buf),
103 Writer::Gzip(writer) => writer.write(buf),
104 Writer::Zlib(writer) => writer.write(buf),
105 Writer::Zstd(writer) => writer.write(buf),
106 Writer::Snappy(writer) => writer.write(buf),
107 }
108 }
109
110 fn flush(&mut self) -> io::Result<()> {
111 match self {
112 Writer::Plain(writer) => writer.flush(),
113 Writer::Gzip(writer) => writer.flush(),
114 Writer::Zlib(writer) => writer.flush(),
115 Writer::Zstd(writer) => writer.flush(),
116 Writer::Snappy(writer) => writer.flush(),
117 }
118 }
119}
120
121pub struct Compressor {
125 compression: Compression,
126 inner: Writer,
127}
128
129impl Compressor {
130 pub fn get_ref(&self) -> &BytesMut {
132 self.inner.get_ref()
133 }
134
135 pub const fn is_compressed(&self) -> bool {
144 self.compression.is_compressed()
145 }
146
147 pub fn finish(self) -> io::Result<BytesMut> {
154 self.inner.finish()
155 }
156
157 pub fn into_inner(self) -> BytesMut {
167 self.inner.into_inner()
168 }
169}
170
171impl io::Write for Compressor {
172 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
173 #[allow(clippy::disallowed_methods)] self.inner.write(buf)
175 }
176
177 fn flush(&mut self) -> io::Result<()> {
178 self.inner.flush()
179 }
180}
181
182impl From<Compression> for Compressor {
183 fn from(compression: Compression) -> Self {
184 Compressor {
185 compression,
186 inner: compression.into(),
187 }
188 }
189}