vector/sources/util/http/
encoding.rs1use std::io::Read;
2
3use bytes::{Buf, Bytes};
4use flate2::read::{MultiGzDecoder, ZlibDecoder};
5use snap::raw::Decoder as SnappyDecoder;
6use warp::http::StatusCode;
7
8use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
9
10pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
11 if let Some(encodings) = header {
12 for encoding in encodings.rsplit(',').map(str::trim) {
13 body = match encoding {
14 "identity" => body,
15 "gzip" => {
16 let mut decoded = Vec::new();
17 MultiGzDecoder::new(body.reader())
18 .read_to_end(&mut decoded)
19 .map_err(|error| handle_decode_error(encoding, error))?;
20 decoded.into()
21 }
22 "deflate" => {
23 let mut decoded = Vec::new();
24 ZlibDecoder::new(body.reader())
25 .read_to_end(&mut decoded)
26 .map_err(|error| handle_decode_error(encoding, error))?;
27 decoded.into()
28 }
29 "snappy" => SnappyDecoder::new()
30 .decompress_vec(&body)
31 .map_err(|error| handle_decode_error(encoding, error))?
32 .into(),
33 "zstd" => zstd::decode_all(body.reader())
34 .map_err(|error| handle_decode_error(encoding, error))?
35 .into(),
36 encoding => {
37 return Err(ErrorMessage::new(
38 StatusCode::UNSUPPORTED_MEDIA_TYPE,
39 format!("Unsupported encoding {encoding}"),
40 ))
41 }
42 }
43 }
44 }
45
46 Ok(body)
47}
48
49fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
50 emit!(HttpDecompressError {
51 encoding,
52 error: &error
53 });
54 ErrorMessage::new(
55 StatusCode::UNPROCESSABLE_ENTITY,
56 format!("Failed decompressing payload with {encoding} decoder."),
57 )
58}