use std::{cell::RefCell, collections::BTreeSet, fmt};
use indexmap::IndexMap;
use serde::{de, ser};
use serde_json::Value;
use vector_lib::configurable::attributes::CustomAttribute;
use vector_lib::configurable::{
schema::{
apply_base_metadata, generate_const_string_schema, generate_enum_schema,
generate_one_of_schema, generate_struct_schema, get_or_generate_schema, SchemaGenerator,
SchemaObject,
},
Configurable, GenerateError, Metadata, ToValue,
};
use crate::sinks::util::zstd::ZstdCompressionLevel;
#[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)]
#[derivative(Default)]
pub enum Compression {
#[derivative(Default)]
None,
Gzip(CompressionLevel),
Zlib(CompressionLevel),
Zstd(CompressionLevel),
Snappy,
}
impl Compression {
pub const fn is_compressed(&self) -> bool {
!matches!(self, Compression::None)
}
pub const fn gzip_default() -> Compression {
Compression::Gzip(CompressionLevel::const_default())
}
pub const fn zlib_default() -> Compression {
Compression::Zlib(CompressionLevel::const_default())
}
pub const fn zstd_default() -> Compression {
Compression::Zstd(CompressionLevel::const_default())
}
pub const fn content_encoding(self) -> Option<&'static str> {
match self {
Self::None => None,
Self::Gzip(_) => Some("gzip"),
Self::Zlib(_) => Some("deflate"),
Self::Zstd(_) => Some("zstd"),
Self::Snappy => Some("snappy"),
}
}
pub const fn accept_encoding(self) -> Option<&'static str> {
match self {
Self::Gzip(_) => Some("gzip"),
Self::Zlib(_) => Some("deflate"),
Self::Zstd(_) => Some("zstd"),
Self::Snappy => Some("snappy"),
_ => None,
}
}
pub const fn extension(self) -> &'static str {
match self {
Self::None => "log",
Self::Gzip(_) => "log.gz",
Self::Zlib(_) => "log.zz",
Self::Zstd(_) => "log.zst",
Self::Snappy => "log.snappy",
}
}
pub const fn max_compression_level_val(self) -> u32 {
match self {
Compression::None => 0,
Compression::Gzip(_) => 9,
Compression::Zlib(_) => 9,
Compression::Zstd(_) => 21,
Compression::Snappy => 0,
}
}
pub const fn compression_level(self) -> CompressionLevel {
match self {
Self::None | Self::Snappy => CompressionLevel::None,
Self::Gzip(level) | Self::Zlib(level) | Self::Zstd(level) => level,
}
}
}
impl fmt::Display for Compression {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Compression::None => write!(f, "none"),
Compression::Gzip(ref level) => write!(f, "gzip({})", level.as_flate2().level()),
Compression::Zlib(ref level) => write!(f, "zlib({})", level.as_flate2().level()),
Compression::Zstd(ref level) => {
write!(f, "zstd({})", ZstdCompressionLevel::from(*level))
}
Compression::Snappy => write!(f, "snappy"),
}
}
}
impl<'de> de::Deserialize<'de> for Compression {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
struct StringOrMap;
impl<'de> de::Visitor<'de> for StringOrMap {
type Value = Compression;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("string or map")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match s {
"none" => Ok(Compression::None),
"gzip" => Ok(Compression::gzip_default()),
"zlib" => Ok(Compression::zlib_default()),
"zstd" => Ok(Compression::zstd_default()),
"snappy" => Ok(Compression::Snappy),
_ => Err(de::Error::invalid_value(
de::Unexpected::Str(s),
&r#""none" or "gzip" or "zlib" or "zstd""#,
)),
}
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: de::MapAccess<'de>,
{
let mut algorithm = None;
let mut level = None;
while let Some(key) = map.next_key::<String>()? {
match key.as_str() {
"algorithm" => {
if algorithm.is_some() {
return Err(de::Error::duplicate_field("algorithm"));
}
algorithm = Some(map.next_value::<String>()?);
}
"level" => {
if level.is_some() {
return Err(de::Error::duplicate_field("level"));
}
level = Some(map.next_value::<CompressionLevel>()?);
}
_ => return Err(de::Error::unknown_field(&key, &["algorithm", "level"])),
};
}
let compression = match algorithm
.ok_or_else(|| de::Error::missing_field("algorithm"))?
.as_str()
{
"none" => match level {
Some(_) => Err(de::Error::unknown_field("level", &[])),
None => Ok(Compression::None),
},
"gzip" => Ok(Compression::Gzip(level.unwrap_or_default())),
"zlib" => Ok(Compression::Zlib(level.unwrap_or_default())),
"zstd" => Ok(Compression::Zstd(level.unwrap_or_default())),
"snappy" => match level {
Some(_) => Err(de::Error::unknown_field("level", &[])),
None => Ok(Compression::Snappy),
},
algorithm => Err(de::Error::unknown_variant(
algorithm,
&["none", "gzip", "zlib", "zstd", "snappy"],
)),
}?;
if let CompressionLevel::Val(level) = compression.compression_level() {
let max_level = compression.max_compression_level_val();
if level > max_level {
let msg = std::format!(
"invalid value `{}`, expected value in range [0, {}]",
level,
max_level
);
return Err(de::Error::custom(msg));
}
}
Ok(compression)
}
}
deserializer.deserialize_any(StringOrMap)
}
}
impl ser::Serialize for Compression {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
use ser::SerializeMap;
match self {
Compression::None => serializer.serialize_str("none"),
Compression::Gzip(gzip_level) => {
if *gzip_level != CompressionLevel::Default {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("algorithm", "gzip")?;
map.serialize_entry("level", &gzip_level)?;
map.end()
} else {
serializer.serialize_str("gzip")
}
}
Compression::Zlib(zlib_level) => {
if *zlib_level != CompressionLevel::Default {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("algorithm", "zlib")?;
map.serialize_entry("level", &zlib_level)?;
map.end()
} else {
serializer.serialize_str("zlib")
}
}
Compression::Zstd(zstd_level) => {
if *zstd_level != CompressionLevel::Default {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("algorithm", "zstd")?;
map.serialize_entry("level", &zstd_level)?;
map.end()
} else {
serializer.serialize_str("zstd")
}
}
Compression::Snappy => serializer.serialize_str("snappy"),
}
}
}
pub const ALGORITHM_NAME: &str = "algorithm";
pub const LEVEL_NAME: &str = "level";
pub const LOGICAL_NAME: &str = "logical_name";
pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";
pub fn generate_string_schema(
logical_name: &str,
title: Option<&'static str>,
description: &'static str,
) -> SchemaObject {
let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
let mut const_metadata = Metadata::with_description(description);
if let Some(title) = title {
const_metadata.set_title(title);
}
const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
apply_base_metadata(&mut const_schema, const_metadata);
const_schema
}
impl Configurable for Compression {
fn referenceable_name() -> Option<&'static str> {
Some(std::any::type_name::<Self>())
}
fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_title("Compression configuration.");
metadata.set_description("All compression algorithms use the default compression level unless otherwise specified.");
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
metadata.add_custom_attribute(CustomAttribute::flag("docs::advanced"));
metadata
}
fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
let mut string_metadata = Metadata::with_description("Compression algorithm.");
string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
let none_string_subschema = generate_string_schema("None", None, "No compression.");
let gzip_string_subschema = generate_string_schema(
"Gzip",
Some("[Gzip][gzip] compression."),
"[gzip]: https://www.gzip.org/",
);
let zlib_string_subschema = generate_string_schema(
"Zlib",
Some("[Zlib][zlib] compression."),
"[zlib]: https://zlib.net/",
);
let zstd_string_subschema = generate_string_schema(
"Zstd",
Some("[Zstandard][zstd] compression."),
"[zstd]: https://facebook.github.io/zstd/",
);
let snappy_string_subschema = generate_string_schema(
"Snappy",
Some("[Snappy][snappy] compression."),
"[snappy]: https://github.com/google/snappy/blob/main/docs/README.md",
);
let mut all_string_oneof_subschema = generate_one_of_schema(&[
none_string_subschema,
gzip_string_subschema,
zlib_string_subschema,
zstd_string_subschema,
snappy_string_subschema,
]);
apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
let compression_level_schema =
get_or_generate_schema(&CompressionLevel::as_configurable_ref(), gen, None)?;
let mut required = BTreeSet::new();
required.insert(ALGORITHM_NAME.to_string());
let mut properties = IndexMap::new();
properties.insert(
ALGORITHM_NAME.to_string(),
all_string_oneof_subschema.clone(),
);
properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
let mut full_subschema = generate_struct_schema(properties, required, None);
let mut full_metadata =
Metadata::with_description("Compression algorithm and compression level.");
full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
apply_base_metadata(&mut full_subschema, full_metadata);
Ok(generate_one_of_schema(&[
all_string_oneof_subschema,
full_subschema,
]))
}
}
impl ToValue for Compression {
fn to_value(&self) -> Value {
serde_json::to_value(self).expect("Could not convert compression settings to JSON")
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum CompressionLevel {
None,
#[default]
Default,
Best,
Fast,
Val(u32),
}
impl CompressionLevel {
pub const fn const_default() -> Self {
CompressionLevel::Default
}
pub fn as_flate2(self) -> flate2::Compression {
match self {
CompressionLevel::None => flate2::Compression::none(),
CompressionLevel::Default => flate2::Compression::default(),
CompressionLevel::Best => flate2::Compression::best(),
CompressionLevel::Fast => flate2::Compression::fast(),
CompressionLevel::Val(level) => flate2::Compression::new(level),
}
}
}
impl<'de> de::Deserialize<'de> for CompressionLevel {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: de::Deserializer<'de>,
{
struct NumberOrString;
impl<'de> de::Visitor<'de> for NumberOrString {
type Value = CompressionLevel;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("unsigned number or string")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
match s {
"none" => Ok(CompressionLevel::None),
"fast" => Ok(CompressionLevel::Fast),
"default" => Ok(CompressionLevel::Default),
"best" => Ok(CompressionLevel::Best),
level => {
return Err(de::Error::invalid_value(
de::Unexpected::Str(level),
&r#""none", "fast", "best" or "default""#,
))
}
}
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
de::Error::custom(format!(
"unsigned integer could not be converted to u32: {}",
err
))
})
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
de::Error::custom(format!("integer could not be converted to u32: {}", err))
})
}
}
deserializer.deserialize_any(NumberOrString)
}
}
impl ser::Serialize for CompressionLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
match *self {
CompressionLevel::None => serializer.serialize_str("none"),
CompressionLevel::Default => serializer.serialize_str("default"),
CompressionLevel::Best => serializer.serialize_str("best"),
CompressionLevel::Fast => serializer.serialize_str("fast"),
CompressionLevel::Val(level) => serializer.serialize_u64(u64::from(level)),
}
}
}
impl Configurable for CompressionLevel {
fn referenceable_name() -> Option<&'static str> {
Some(std::any::type_name::<Self>())
}
fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("Compression level.");
metadata
}
fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
let string_consts = ["none", "fast", "best", "default"]
.iter()
.map(|s| serde_json::Value::from(*s));
let level_consts = (0u32..=21).map(serde_json::Value::from);
let valid_values = string_consts.chain(level_consts).collect();
Ok(generate_enum_schema(valid_values))
}
}
impl ToValue for CompressionLevel {
fn to_value(&self) -> Value {
serde_json::to_value(self).expect("Could not convert compression level to JSON")
}
}
#[cfg(test)]
mod test {
use super::{Compression, CompressionLevel};
#[test]
fn deserialization_json() {
let fixtures_valid = [
(r#""none""#, Compression::None),
(r#""gzip""#, Compression::Gzip(CompressionLevel::default())),
(r#""zlib""#, Compression::Zlib(CompressionLevel::default())),
(r#""snappy""#, Compression::Snappy),
(r#"{"algorithm": "none"}"#, Compression::None),
(
r#"{"algorithm": "gzip"}"#,
Compression::Gzip(CompressionLevel::default()),
),
(
r#"{"algorithm": "gzip", "level": "best"}"#,
Compression::Gzip(CompressionLevel::Best),
),
(
r#"{"algorithm": "gzip", "level": 8}"#,
Compression::Gzip(CompressionLevel::Val(8)),
),
(
r#"{"algorithm": "zlib"}"#,
Compression::Zlib(CompressionLevel::default()),
),
(
r#"{"algorithm": "zlib", "level": "best"}"#,
Compression::Zlib(CompressionLevel::Best),
),
(
r#"{"algorithm": "zlib", "level": 8}"#,
Compression::Zlib(CompressionLevel::Val(8)),
),
];
for (sources, result) in fixtures_valid.iter() {
let deserialized: Result<Compression, _> = serde_json::from_str(sources);
assert_eq!(deserialized.expect("valid source"), *result);
}
let fixtures_invalid = [
(
r"42",
r"invalid type: integer `42`, expected string or map at line 1 column 2",
),
(
r#""b42""#,
r#"invalid value: string "b42", expected "none" or "gzip" or "zlib" or "zstd" at line 1 column 5"#,
),
(
r#"{"algorithm": "b42"}"#,
r"unknown variant `b42`, expected one of `none`, `gzip`, `zlib`, `zstd`, `snappy` at line 1 column 20",
),
(
r#"{"algorithm": "none", "level": "default"}"#,
r"unknown field `level`, there are no fields at line 1 column 41",
),
(
r#"{"algorithm": "gzip", "level": -1}"#,
r"integer could not be converted to u32: out of range integral type conversion attempted at line 1 column 33",
),
(
r#"{"algorithm": "gzip", "level": "good"}"#,
r#"invalid value: string "good", expected "none", "fast", "best" or "default" at line 1 column 37"#,
),
(
r#"{"algorithm": "gzip", "level": {}}"#,
r"invalid type: map, expected unsigned number or string at line 1 column 33",
),
(
r#"{"algorithm": "gzip", "level": "default", "key": 42}"#,
r"unknown field `key`, expected `algorithm` or `level` at line 1 column 47",
),
(
r#"{"algorithm": "gzip", "level": 10}"#,
r"invalid value `10`, expected value in range [0, 9] at line 1 column 34",
),
(
r#"{"algorithm": "zstd", "level": 22}"#,
r"invalid value `22`, expected value in range [0, 21] at line 1 column 34",
),
(
r#"{"algorithm": "snappy", "level": 3}"#,
r"unknown field `level`, there are no fields at line 1 column 35",
),
];
for (source, result) in fixtures_invalid.iter() {
let deserialized: Result<Compression, _> = serde_json::from_str(source);
let error = deserialized.expect_err("invalid source");
assert_eq!(error.to_string().as_str(), *result);
}
}
#[test]
fn deserialization_toml() {
let fixtures_valid = [
(
r#"algorithm = "gzip"
level = 8"#,
Compression::Gzip(CompressionLevel::Val(8)),
),
];
for (sources, result) in fixtures_valid.iter() {
let deserialized: Result<Compression, _> = toml::from_str(sources);
assert_eq!(deserialized.expect("valid source"), *result);
}
}
#[test]
fn from_and_to_value() {
let fixtures_valid = [
Compression::None,
Compression::Gzip(CompressionLevel::default()),
Compression::Gzip(CompressionLevel::Val(7)),
Compression::Zlib(CompressionLevel::Best),
Compression::Zlib(CompressionLevel::Val(7)),
Compression::Zstd(CompressionLevel::Val(6)),
Compression::Zstd(CompressionLevel::default()),
Compression::Zstd(CompressionLevel::Best),
Compression::Zstd(CompressionLevel::Fast),
];
for v in fixtures_valid {
let value = serde_json::to_value(v).unwrap();
serde_json::from_value::<Compression>(value).unwrap();
}
}
}