vector/sources/util/http/
encoding.rs

1use std::io::Read;
2
3use bytes::{Buf, Bytes};
4use flate2::read::{MultiGzDecoder, ZlibDecoder};
5use snap::raw::Decoder as SnappyDecoder;
6use warp::http::StatusCode;
7
8use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
9
10/// Decompresses the body based on the Content-Encoding header.
11///
12/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
13pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
14    if let Some(encodings) = header {
15        for encoding in encodings.rsplit(',').map(str::trim) {
16            body = match encoding {
17                "identity" => body,
18                "gzip" => {
19                    let mut decoded = Vec::new();
20                    MultiGzDecoder::new(body.reader())
21                        .read_to_end(&mut decoded)
22                        .map_err(|error| emit_decompress_error(encoding, error))?;
23                    decoded.into()
24                }
25                "deflate" => {
26                    let mut decoded = Vec::new();
27                    ZlibDecoder::new(body.reader())
28                        .read_to_end(&mut decoded)
29                        .map_err(|error| emit_decompress_error(encoding, error))?;
30                    decoded.into()
31                }
32                "snappy" => SnappyDecoder::new()
33                    .decompress_vec(&body)
34                    .map_err(|error| emit_decompress_error(encoding, error))?
35                    .into(),
36                "zstd" => zstd::decode_all(body.reader())
37                    .map_err(|error| emit_decompress_error(encoding, error))?
38                    .into(),
39                encoding => {
40                    return Err(ErrorMessage::new(
41                        StatusCode::UNSUPPORTED_MEDIA_TYPE,
42                        format!("Unsupported encoding {encoding}"),
43                    ));
44                }
45            }
46        }
47    }
48
49    Ok(body)
50}
51
52pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
53    emit!(HttpDecompressError {
54        encoding,
55        error: &error
56    });
57    ErrorMessage::new(
58        StatusCode::UNPROCESSABLE_ENTITY,
59        format!("Failed decompressing payload with {encoding} decoder."),
60    )
61}