vector/sources/util/http/
encoding.rs1use std::io::Read;
2
3use bytes::{Buf, Bytes};
4use flate2::read::{MultiGzDecoder, ZlibDecoder};
5use snap::raw::Decoder as SnappyDecoder;
6use warp::http::StatusCode;
7
8use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
9
10pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
14 if let Some(encodings) = header {
15 for encoding in encodings.rsplit(',').map(str::trim) {
16 body = match encoding {
17 "identity" => body,
18 "gzip" => {
19 let mut decoded = Vec::new();
20 MultiGzDecoder::new(body.reader())
21 .read_to_end(&mut decoded)
22 .map_err(|error| emit_decompress_error(encoding, error))?;
23 decoded.into()
24 }
25 "deflate" => {
26 let mut decoded = Vec::new();
27 ZlibDecoder::new(body.reader())
28 .read_to_end(&mut decoded)
29 .map_err(|error| emit_decompress_error(encoding, error))?;
30 decoded.into()
31 }
32 "snappy" => SnappyDecoder::new()
33 .decompress_vec(&body)
34 .map_err(|error| emit_decompress_error(encoding, error))?
35 .into(),
36 "zstd" => zstd::decode_all(body.reader())
37 .map_err(|error| emit_decompress_error(encoding, error))?
38 .into(),
39 encoding => {
40 return Err(ErrorMessage::new(
41 StatusCode::UNSUPPORTED_MEDIA_TYPE,
42 format!("Unsupported encoding {encoding}"),
43 ));
44 }
45 }
46 }
47 }
48
49 Ok(body)
50}
51
52pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
53 emit!(HttpDecompressError {
54 encoding,
55 error: &error
56 });
57 ErrorMessage::new(
58 StatusCode::UNPROCESSABLE_ENTITY,
59 format!("Failed decompressing payload with {encoding} decoder."),
60 )
61}