vector/sinks/util/buffer/
compression.rs

1use std::{cell::RefCell, collections::BTreeSet, fmt};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::{
7    Configurable, GenerateError, Metadata, ToValue,
8    attributes::CustomAttribute,
9    schema::{
10        SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
11        generate_enum_schema, generate_one_of_schema, generate_struct_schema,
12        get_or_generate_schema,
13    },
14};
15
16use crate::sinks::util::zstd::ZstdCompressionLevel;
17
18/// Compression configuration.
19#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
20pub enum Compression {
21    /// No compression.
22    #[default]
23    None,
24
25    /// [Gzip][gzip] compression.
26    ///
27    /// [gzip]: https://www.gzip.org/
28    Gzip(CompressionLevel),
29
30    /// [Zlib][zlib] compression.
31    ///
32    /// [zlib]: https://zlib.net/
33    Zlib(CompressionLevel),
34
35    /// [Zstandard][zstd] compression.
36    ///
37    /// [zstd]: https://facebook.github.io/zstd/
38    Zstd(CompressionLevel),
39
40    /// [Snappy][snappy] compression.
41    ///
42    /// [snappy]: https://github.com/google/snappy/blob/main/docs/README.md
43    Snappy,
44}
45
46impl Compression {
47    /// Gets whether or not this compression will actually compress the input.
48    ///
49    /// While it may be counterintuitive for "compression" to not compress, this is simply a
50    /// consequence of designing a single type that may or may not compress so that we can avoid
51    /// having to box writers at a higher-level.
52    ///
53    /// Some callers can benefit from knowing whether or not compression is actually taking place,
54    /// as different size limitations may come into play.
55    pub const fn is_compressed(&self) -> bool {
56        !matches!(self, Compression::None)
57    }
58
59    pub const fn gzip_default() -> Compression {
60        Compression::Gzip(CompressionLevel::const_default())
61    }
62
63    pub const fn zlib_default() -> Compression {
64        Compression::Zlib(CompressionLevel::const_default())
65    }
66
67    pub const fn zstd_default() -> Compression {
68        Compression::Zstd(CompressionLevel::const_default())
69    }
70
71    pub const fn content_encoding(self) -> Option<&'static str> {
72        match self {
73            Self::None => None,
74            Self::Gzip(_) => Some("gzip"),
75            Self::Zlib(_) => Some("deflate"),
76            Self::Zstd(_) => Some("zstd"),
77            Self::Snappy => Some("snappy"),
78        }
79    }
80
81    pub const fn accept_encoding(self) -> Option<&'static str> {
82        match self {
83            Self::Gzip(_) => Some("gzip"),
84            Self::Zlib(_) => Some("deflate"),
85            Self::Zstd(_) => Some("zstd"),
86            Self::Snappy => Some("snappy"),
87            _ => None,
88        }
89    }
90
91    pub const fn extension(self) -> &'static str {
92        match self {
93            Self::None => "log",
94            Self::Gzip(_) => "log.gz",
95            Self::Zlib(_) => "log.zz",
96            Self::Zstd(_) => "log.zst",
97            Self::Snappy => "log.snappy",
98        }
99    }
100
101    pub const fn max_compression_level_val(self) -> u32 {
102        match self {
103            Compression::None => 0,
104            Compression::Gzip(_) => 9,
105            Compression::Zlib(_) => 9,
106            Compression::Zstd(_) => 21,
107            Compression::Snappy => 0,
108        }
109    }
110
111    pub const fn compression_level(self) -> CompressionLevel {
112        match self {
113            Self::None | Self::Snappy => CompressionLevel::None,
114            Self::Gzip(level) | Self::Zlib(level) | Self::Zstd(level) => level,
115        }
116    }
117}
118
119impl fmt::Display for Compression {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        match *self {
122            Compression::None => write!(f, "none"),
123            Compression::Gzip(ref level) => write!(f, "gzip({})", level.as_flate2().level()),
124            Compression::Zlib(ref level) => write!(f, "zlib({})", level.as_flate2().level()),
125            Compression::Zstd(ref level) => {
126                write!(f, "zstd({})", ZstdCompressionLevel::from(*level))
127            }
128            Compression::Snappy => write!(f, "snappy"),
129        }
130    }
131}
132
133impl<'de> de::Deserialize<'de> for Compression {
134    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135    where
136        D: de::Deserializer<'de>,
137    {
138        struct StringOrMap;
139
140        impl<'de> de::Visitor<'de> for StringOrMap {
141            type Value = Compression;
142
143            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
144                f.write_str("string or map")
145            }
146
147            fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
148            where
149                E: de::Error,
150            {
151                match s {
152                    "none" => Ok(Compression::None),
153                    "gzip" => Ok(Compression::gzip_default()),
154                    "zlib" => Ok(Compression::zlib_default()),
155                    "zstd" => Ok(Compression::zstd_default()),
156                    "snappy" => Ok(Compression::Snappy),
157                    _ => Err(de::Error::invalid_value(
158                        de::Unexpected::Str(s),
159                        &r#""none" or "gzip" or "zlib" or "zstd""#,
160                    )),
161                }
162            }
163
164            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
165            where
166                A: de::MapAccess<'de>,
167            {
168                let mut algorithm = None;
169                let mut level = None;
170
171                while let Some(key) = map.next_key::<String>()? {
172                    match key.as_str() {
173                        "algorithm" => {
174                            if algorithm.is_some() {
175                                return Err(de::Error::duplicate_field("algorithm"));
176                            }
177                            algorithm = Some(map.next_value::<String>()?);
178                        }
179                        "level" => {
180                            if level.is_some() {
181                                return Err(de::Error::duplicate_field("level"));
182                            }
183                            level = Some(map.next_value::<CompressionLevel>()?);
184                        }
185                        _ => return Err(de::Error::unknown_field(&key, &["algorithm", "level"])),
186                    };
187                }
188
189                let compression = match algorithm
190                    .ok_or_else(|| de::Error::missing_field("algorithm"))?
191                    .as_str()
192                {
193                    "none" => match level {
194                        Some(_) => Err(de::Error::unknown_field("level", &[])),
195                        None => Ok(Compression::None),
196                    },
197                    "gzip" => Ok(Compression::Gzip(level.unwrap_or_default())),
198                    "zlib" => Ok(Compression::Zlib(level.unwrap_or_default())),
199                    "zstd" => Ok(Compression::Zstd(level.unwrap_or_default())),
200                    "snappy" => match level {
201                        Some(_) => Err(de::Error::unknown_field("level", &[])),
202                        None => Ok(Compression::Snappy),
203                    },
204                    algorithm => Err(de::Error::unknown_variant(
205                        algorithm,
206                        &["none", "gzip", "zlib", "zstd", "snappy"],
207                    )),
208                }?;
209
210                if let CompressionLevel::Val(level) = compression.compression_level() {
211                    let max_level = compression.max_compression_level_val();
212                    if level > max_level {
213                        let msg = std::format!(
214                            "invalid value `{level}`, expected value in range [0, {max_level}]"
215                        );
216                        return Err(de::Error::custom(msg));
217                    }
218                }
219
220                Ok(compression)
221            }
222        }
223
224        deserializer.deserialize_any(StringOrMap)
225    }
226}
227
228impl ser::Serialize for Compression {
229    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
230    where
231        S: ser::Serializer,
232    {
233        use ser::SerializeMap;
234
235        match self {
236            Compression::None => serializer.serialize_str("none"),
237            Compression::Gzip(gzip_level) => {
238                if *gzip_level != CompressionLevel::Default {
239                    let mut map = serializer.serialize_map(None)?;
240                    map.serialize_entry("algorithm", "gzip")?;
241                    map.serialize_entry("level", &gzip_level)?;
242                    map.end()
243                } else {
244                    serializer.serialize_str("gzip")
245                }
246            }
247            Compression::Zlib(zlib_level) => {
248                if *zlib_level != CompressionLevel::Default {
249                    let mut map = serializer.serialize_map(None)?;
250                    map.serialize_entry("algorithm", "zlib")?;
251                    map.serialize_entry("level", &zlib_level)?;
252                    map.end()
253                } else {
254                    serializer.serialize_str("zlib")
255                }
256            }
257            Compression::Zstd(zstd_level) => {
258                if *zstd_level != CompressionLevel::Default {
259                    let mut map = serializer.serialize_map(None)?;
260                    map.serialize_entry("algorithm", "zstd")?;
261                    map.serialize_entry("level", &zstd_level)?;
262                    map.end()
263                } else {
264                    serializer.serialize_str("zstd")
265                }
266            }
267            Compression::Snappy => serializer.serialize_str("snappy"),
268        }
269    }
270}
271
272pub const ALGORITHM_NAME: &str = "algorithm";
273pub const LEVEL_NAME: &str = "level";
274pub const LOGICAL_NAME: &str = "logical_name";
275pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";
276
277pub fn generate_string_schema(
278    logical_name: &str,
279    title: Option<&'static str>,
280    description: &'static str,
281) -> SchemaObject {
282    let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
283    let mut const_metadata = Metadata::with_description(description);
284    if let Some(title) = title {
285        const_metadata.set_title(title);
286    }
287    const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
288    apply_base_metadata(&mut const_schema, const_metadata);
289    const_schema
290}
291
292// TODO: Consider an approach for generating schema of "string or object" structure used by this type.
293impl Configurable for Compression {
294    fn referenceable_name() -> Option<&'static str> {
295        Some(std::any::type_name::<Self>())
296    }
297
298    fn metadata() -> Metadata {
299        let mut metadata = Metadata::default();
300        metadata.set_title("Compression configuration.");
301        metadata.set_description("All compression algorithms use the default compression level unless otherwise specified.");
302        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
303        metadata.add_custom_attribute(CustomAttribute::flag("docs::advanced"));
304        metadata
305    }
306
307    fn generate_schema(
308        generator: &RefCell<SchemaGenerator>,
309    ) -> Result<SchemaObject, GenerateError> {
310        // First, we'll create the string-only subschemas for each algorithm, and wrap those up
311        // within a one-of schema.
312        let mut string_metadata = Metadata::with_description("Compression algorithm.");
313        string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
314
315        let none_string_subschema = generate_string_schema("None", None, "No compression.");
316        let gzip_string_subschema = generate_string_schema(
317            "Gzip",
318            Some("[Gzip][gzip] compression."),
319            "[gzip]: https://www.gzip.org/",
320        );
321        let zlib_string_subschema = generate_string_schema(
322            "Zlib",
323            Some("[Zlib][zlib] compression."),
324            "[zlib]: https://zlib.net/",
325        );
326
327        let zstd_string_subschema = generate_string_schema(
328            "Zstd",
329            Some("[Zstandard][zstd] compression."),
330            "[zstd]: https://facebook.github.io/zstd/",
331        );
332
333        let snappy_string_subschema = generate_string_schema(
334            "Snappy",
335            Some("[Snappy][snappy] compression."),
336            "[snappy]: https://github.com/google/snappy/blob/main/docs/README.md",
337        );
338
339        let mut all_string_oneof_subschema = generate_one_of_schema(&[
340            none_string_subschema,
341            gzip_string_subschema,
342            zlib_string_subschema,
343            zstd_string_subschema,
344            snappy_string_subschema,
345        ]);
346        apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
347
348        // Next we'll create a full schema for the given algorithms.
349        //
350        // TODO: We're currently using all three algorithms in the enum subschema for `algorithm`,
351        // but in reality, `level` is never used when the algorithm is `none`. This is _currently_
352        // fine because the field is optional, and we don't use `deny_unknown_fields`, so if users
353        // specify it when the algorithm is `none`: no harm, no foul.
354        //
355        // However, it does lead to a suboptimal schema being generated, one that sort of implies it
356        // may have value when set, even if the algorithm is `none`. We do this because, otherwise,
357        // it's very hard to reconcile the resolved schemas during component documentation
358        // generation, where we need to be able to generate the right enum key/value pair for the
359        // `none` algorithm as part of the overall set of enum values declared for the `algorithm`
360        // field in the "full" schema version.
361        let compression_level_schema =
362            get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
363
364        let mut required = BTreeSet::new();
365        required.insert(ALGORITHM_NAME.to_string());
366
367        let mut properties = IndexMap::new();
368        properties.insert(
369            ALGORITHM_NAME.to_string(),
370            all_string_oneof_subschema.clone(),
371        );
372        properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
373
374        let mut full_subschema = generate_struct_schema(properties, required, None);
375        let mut full_metadata =
376            Metadata::with_description("Compression algorithm and compression level.");
377        full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
378        apply_base_metadata(&mut full_subschema, full_metadata);
379
380        // Finally, we zip both schemas together.
381        Ok(generate_one_of_schema(&[
382            all_string_oneof_subschema,
383            full_subschema,
384        ]))
385    }
386}
387
388impl ToValue for Compression {
389    fn to_value(&self) -> Value {
390        serde_json::to_value(self).expect("Could not convert compression settings to JSON")
391    }
392}
393
394/// Compression level.
395#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
396pub enum CompressionLevel {
397    None,
398    #[default]
399    Default,
400    Best,
401    Fast,
402    Val(u32),
403}
404
405impl CompressionLevel {
406    pub const fn const_default() -> Self {
407        CompressionLevel::Default
408    }
409
410    pub fn as_flate2(self) -> flate2::Compression {
411        match self {
412            CompressionLevel::None => flate2::Compression::none(),
413            CompressionLevel::Default => flate2::Compression::default(),
414            CompressionLevel::Best => flate2::Compression::best(),
415            CompressionLevel::Fast => flate2::Compression::fast(),
416            CompressionLevel::Val(level) => flate2::Compression::new(level),
417        }
418    }
419}
420
421impl<'de> de::Deserialize<'de> for CompressionLevel {
422    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423    where
424        D: de::Deserializer<'de>,
425    {
426        struct NumberOrString;
427
428        impl de::Visitor<'_> for NumberOrString {
429            type Value = CompressionLevel;
430
431            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
432                f.write_str("unsigned number or string")
433            }
434
435            fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
436            where
437                E: de::Error,
438            {
439                match s {
440                    "none" => Ok(CompressionLevel::None),
441                    "fast" => Ok(CompressionLevel::Fast),
442                    "default" => Ok(CompressionLevel::Default),
443                    "best" => Ok(CompressionLevel::Best),
444                    level => Err(de::Error::invalid_value(
445                        de::Unexpected::Str(level),
446                        &r#""none", "fast", "best" or "default""#,
447                    )),
448                }
449            }
450
451            fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
452            where
453                E: de::Error,
454            {
455                u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
456                    de::Error::custom(format!(
457                        "unsigned integer could not be converted to u32: {err}"
458                    ))
459                })
460            }
461
462            fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
463            where
464                E: de::Error,
465            {
466                u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
467                    de::Error::custom(format!("integer could not be converted to u32: {err}"))
468                })
469            }
470        }
471
472        deserializer.deserialize_any(NumberOrString)
473    }
474}
475
476impl ser::Serialize for CompressionLevel {
477    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
478    where
479        S: ser::Serializer,
480    {
481        match *self {
482            CompressionLevel::None => serializer.serialize_str("none"),
483            CompressionLevel::Default => serializer.serialize_str("default"),
484            CompressionLevel::Best => serializer.serialize_str("best"),
485            CompressionLevel::Fast => serializer.serialize_str("fast"),
486            CompressionLevel::Val(level) => serializer.serialize_u64(u64::from(level)),
487        }
488    }
489}
490
491// TODO: Consider an approach for generating schema of "string or number" structure used by this type.
492impl Configurable for CompressionLevel {
493    fn referenceable_name() -> Option<&'static str> {
494        Some(std::any::type_name::<Self>())
495    }
496
497    fn metadata() -> Metadata {
498        let mut metadata = Metadata::default();
499        metadata.set_description("Compression level.");
500        metadata
501    }
502
503    fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
504        let string_consts = ["none", "fast", "best", "default"]
505            .iter()
506            .map(|s| serde_json::Value::from(*s));
507
508        let level_consts = (0u32..=21).map(serde_json::Value::from);
509
510        let valid_values = string_consts.chain(level_consts).collect();
511        Ok(generate_enum_schema(valid_values))
512    }
513}
514
515impl ToValue for CompressionLevel {
516    fn to_value(&self) -> Value {
517        // FIXME
518        serde_json::to_value(self).expect("Could not convert compression level to JSON")
519    }
520}
521
522#[cfg(test)]
523mod test {
524    use super::{Compression, CompressionLevel};
525
526    #[test]
527    fn deserialization_json() {
528        let fixtures_valid = [
529            (r#""none""#, Compression::None),
530            (r#""gzip""#, Compression::Gzip(CompressionLevel::default())),
531            (r#""zlib""#, Compression::Zlib(CompressionLevel::default())),
532            (r#""snappy""#, Compression::Snappy),
533            (r#"{"algorithm": "none"}"#, Compression::None),
534            (
535                r#"{"algorithm": "gzip"}"#,
536                Compression::Gzip(CompressionLevel::default()),
537            ),
538            (
539                r#"{"algorithm": "gzip", "level": "best"}"#,
540                Compression::Gzip(CompressionLevel::Best),
541            ),
542            (
543                r#"{"algorithm": "gzip", "level": 8}"#,
544                Compression::Gzip(CompressionLevel::Val(8)),
545            ),
546            (
547                r#"{"algorithm": "zlib"}"#,
548                Compression::Zlib(CompressionLevel::default()),
549            ),
550            (
551                r#"{"algorithm": "zlib", "level": "best"}"#,
552                Compression::Zlib(CompressionLevel::Best),
553            ),
554            (
555                r#"{"algorithm": "zlib", "level": 8}"#,
556                Compression::Zlib(CompressionLevel::Val(8)),
557            ),
558        ];
559        for (sources, result) in fixtures_valid.iter() {
560            let deserialized: Result<Compression, _> = serde_json::from_str(sources);
561            assert_eq!(deserialized.expect("valid source"), *result);
562        }
563
564        let fixtures_invalid = [
565            (
566                r"42",
567                r"invalid type: integer `42`, expected string or map at line 1 column 2",
568            ),
569            (
570                r#""b42""#,
571                r#"invalid value: string "b42", expected "none" or "gzip" or "zlib" or "zstd" at line 1 column 5"#,
572            ),
573            (
574                r#"{"algorithm": "b42"}"#,
575                r"unknown variant `b42`, expected one of `none`, `gzip`, `zlib`, `zstd`, `snappy` at line 1 column 20",
576            ),
577            (
578                r#"{"algorithm": "none", "level": "default"}"#,
579                r"unknown field `level`, there are no fields at line 1 column 41",
580            ),
581            (
582                r#"{"algorithm": "gzip", "level": -1}"#,
583                r"integer could not be converted to u32: out of range integral type conversion attempted at line 1 column 33",
584            ),
585            (
586                r#"{"algorithm": "gzip", "level": "good"}"#,
587                r#"invalid value: string "good", expected "none", "fast", "best" or "default" at line 1 column 37"#,
588            ),
589            (
590                r#"{"algorithm": "gzip", "level": {}}"#,
591                r"invalid type: map, expected unsigned number or string at line 1 column 33",
592            ),
593            (
594                r#"{"algorithm": "gzip", "level": "default", "key": 42}"#,
595                r"unknown field `key`, expected `algorithm` or `level` at line 1 column 47",
596            ),
597            (
598                r#"{"algorithm": "gzip", "level": 10}"#,
599                r"invalid value `10`, expected value in range [0, 9] at line 1 column 34",
600            ),
601            (
602                r#"{"algorithm": "zstd", "level": 22}"#,
603                r"invalid value `22`, expected value in range [0, 21] at line 1 column 34",
604            ),
605            (
606                r#"{"algorithm": "snappy", "level": 3}"#,
607                r"unknown field `level`, there are no fields at line 1 column 35",
608            ),
609        ];
610        for (source, result) in fixtures_invalid.iter() {
611            let deserialized: Result<Compression, _> = serde_json::from_str(source);
612            let error = deserialized.expect_err("invalid source");
613            assert_eq!(error.to_string().as_str(), *result);
614        }
615    }
616
617    #[test]
618    fn deserialization_toml() {
619        let fixtures_valid = [
620            // TOML differs from YAML and JSON by always parsing integers as signed
621            (
622                r#"algorithm = "gzip"
623                   level = 8"#,
624                Compression::Gzip(CompressionLevel::Val(8)),
625            ),
626        ];
627        for (sources, result) in fixtures_valid.iter() {
628            let deserialized: Result<Compression, _> = toml::from_str(sources);
629            assert_eq!(deserialized.expect("valid source"), *result);
630        }
631    }
632
633    #[test]
634    fn from_and_to_value() {
635        let fixtures_valid = [
636            Compression::None,
637            Compression::Gzip(CompressionLevel::default()),
638            Compression::Gzip(CompressionLevel::Val(7)),
639            Compression::Zlib(CompressionLevel::Best),
640            Compression::Zlib(CompressionLevel::Val(7)),
641            Compression::Zstd(CompressionLevel::Val(6)),
642            Compression::Zstd(CompressionLevel::default()),
643            Compression::Zstd(CompressionLevel::Best),
644            Compression::Zstd(CompressionLevel::Fast),
645        ];
646
647        for v in fixtures_valid {
648            // Check serialize-deserialize round trip with defaults
649            let value = serde_json::to_value(v).unwrap();
650            serde_json::from_value::<Compression>(value).unwrap();
651        }
652    }
653}