vector/sources/util/http/
encoding.rsuse std::io::Read;
use bytes::{Buf, Bytes};
use flate2::read::{MultiGzDecoder, ZlibDecoder};
use snap::raw::Decoder as SnappyDecoder;
use warp::http::StatusCode;
use super::error::ErrorMessage;
use crate::internal_events::HttpDecompressError;
pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
if let Some(encodings) = header {
for encoding in encodings.rsplit(',').map(str::trim) {
body = match encoding {
"identity" => body,
"gzip" => {
let mut decoded = Vec::new();
MultiGzDecoder::new(body.reader())
.read_to_end(&mut decoded)
.map_err(|error| handle_decode_error(encoding, error))?;
decoded.into()
}
"deflate" => {
let mut decoded = Vec::new();
ZlibDecoder::new(body.reader())
.read_to_end(&mut decoded)
.map_err(|error| handle_decode_error(encoding, error))?;
decoded.into()
}
"snappy" => SnappyDecoder::new()
.decompress_vec(&body)
.map_err(|error| handle_decode_error(encoding, error))?
.into(),
"zstd" => zstd::decode_all(body.reader())
.map_err(|error| handle_decode_error(encoding, error))?
.into(),
encoding => {
return Err(ErrorMessage::new(
StatusCode::UNSUPPORTED_MEDIA_TYPE,
format!("Unsupported encoding {}", encoding),
))
}
}
}
}
Ok(body)
}
fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
emit!(HttpDecompressError {
encoding,
error: &error
});
ErrorMessage::new(
StatusCode::UNPROCESSABLE_ENTITY,
format!("Failed decompressing payload with {} decoder.", encoding),
)
}