vector/sources/util/http/
encoding.rs

1use std::io::Read;
2
3use bytes::{Buf, Bytes};
4use flate2::read::{MultiGzDecoder, ZlibDecoder};
5use snap::raw::Decoder as SnappyDecoder;
6use warp::http::StatusCode;
7
8use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
9
10pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
11    if let Some(encodings) = header {
12        for encoding in encodings.rsplit(',').map(str::trim) {
13            body = match encoding {
14                "identity" => body,
15                "gzip" => {
16                    let mut decoded = Vec::new();
17                    MultiGzDecoder::new(body.reader())
18                        .read_to_end(&mut decoded)
19                        .map_err(|error| handle_decode_error(encoding, error))?;
20                    decoded.into()
21                }
22                "deflate" => {
23                    let mut decoded = Vec::new();
24                    ZlibDecoder::new(body.reader())
25                        .read_to_end(&mut decoded)
26                        .map_err(|error| handle_decode_error(encoding, error))?;
27                    decoded.into()
28                }
29                "snappy" => SnappyDecoder::new()
30                    .decompress_vec(&body)
31                    .map_err(|error| handle_decode_error(encoding, error))?
32                    .into(),
33                "zstd" => zstd::decode_all(body.reader())
34                    .map_err(|error| handle_decode_error(encoding, error))?
35                    .into(),
36                encoding => {
37                    return Err(ErrorMessage::new(
38                        StatusCode::UNSUPPORTED_MEDIA_TYPE,
39                        format!("Unsupported encoding {encoding}"),
40                    ))
41                }
42            }
43        }
44    }
45
46    Ok(body)
47}
48
49fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
50    emit!(HttpDecompressError {
51        encoding,
52        error: &error
53    });
54    ErrorMessage::new(
55        StatusCode::UNPROCESSABLE_ENTITY,
56        format!("Failed decompressing payload with {encoding} decoder."),
57    )
58}