vector/sinks/util/buffer/
compression.rs

1use std::{cell::RefCell, collections::BTreeSet, fmt};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::attributes::CustomAttribute;
7use vector_lib::configurable::{
8    schema::{
9        apply_base_metadata, generate_const_string_schema, generate_enum_schema,
10        generate_one_of_schema, generate_struct_schema, get_or_generate_schema, SchemaGenerator,
11        SchemaObject,
12    },
13    Configurable, GenerateError, Metadata, ToValue,
14};
15
16use crate::sinks::util::zstd::ZstdCompressionLevel;
17
18/// Compression configuration.
19#[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)]
20#[derivative(Default)]
21pub enum Compression {
22    /// No compression.
23    #[derivative(Default)]
24    None,
25
26    /// [Gzip][gzip] compression.
27    ///
28    /// [gzip]: https://www.gzip.org/
29    Gzip(CompressionLevel),
30
31    /// [Zlib][zlib] compression.
32    ///
33    /// [zlib]: https://zlib.net/
34    Zlib(CompressionLevel),
35
36    /// [Zstandard][zstd] compression.
37    ///
38    /// [zstd]: https://facebook.github.io/zstd/
39    Zstd(CompressionLevel),
40
41    /// [Snappy][snappy] compression.
42    ///
43    /// [snappy]: https://github.com/google/snappy/blob/main/docs/README.md
44    Snappy,
45}
46
47impl Compression {
48    /// Gets whether or not this compression will actually compress the input.
49    ///
50    /// While it may be counterintuitive for "compression" to not compress, this is simply a
51    /// consequence of designing a single type that may or may not compress so that we can avoid
52    /// having to box writers at a higher-level.
53    ///
54    /// Some callers can benefit from knowing whether or not compression is actually taking place,
55    /// as different size limitations may come into play.
56    pub const fn is_compressed(&self) -> bool {
57        !matches!(self, Compression::None)
58    }
59
60    pub const fn gzip_default() -> Compression {
61        Compression::Gzip(CompressionLevel::const_default())
62    }
63
64    pub const fn zlib_default() -> Compression {
65        Compression::Zlib(CompressionLevel::const_default())
66    }
67
68    pub const fn zstd_default() -> Compression {
69        Compression::Zstd(CompressionLevel::const_default())
70    }
71
72    pub const fn content_encoding(self) -> Option<&'static str> {
73        match self {
74            Self::None => None,
75            Self::Gzip(_) => Some("gzip"),
76            Self::Zlib(_) => Some("deflate"),
77            Self::Zstd(_) => Some("zstd"),
78            Self::Snappy => Some("snappy"),
79        }
80    }
81
82    pub const fn accept_encoding(self) -> Option<&'static str> {
83        match self {
84            Self::Gzip(_) => Some("gzip"),
85            Self::Zlib(_) => Some("deflate"),
86            Self::Zstd(_) => Some("zstd"),
87            Self::Snappy => Some("snappy"),
88            _ => None,
89        }
90    }
91
92    pub const fn extension(self) -> &'static str {
93        match self {
94            Self::None => "log",
95            Self::Gzip(_) => "log.gz",
96            Self::Zlib(_) => "log.zz",
97            Self::Zstd(_) => "log.zst",
98            Self::Snappy => "log.snappy",
99        }
100    }
101
102    pub const fn max_compression_level_val(self) -> u32 {
103        match self {
104            Compression::None => 0,
105            Compression::Gzip(_) => 9,
106            Compression::Zlib(_) => 9,
107            Compression::Zstd(_) => 21,
108            Compression::Snappy => 0,
109        }
110    }
111
112    pub const fn compression_level(self) -> CompressionLevel {
113        match self {
114            Self::None | Self::Snappy => CompressionLevel::None,
115            Self::Gzip(level) | Self::Zlib(level) | Self::Zstd(level) => level,
116        }
117    }
118}
119
120impl fmt::Display for Compression {
121    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122        match *self {
123            Compression::None => write!(f, "none"),
124            Compression::Gzip(ref level) => write!(f, "gzip({})", level.as_flate2().level()),
125            Compression::Zlib(ref level) => write!(f, "zlib({})", level.as_flate2().level()),
126            Compression::Zstd(ref level) => {
127                write!(f, "zstd({})", ZstdCompressionLevel::from(*level))
128            }
129            Compression::Snappy => write!(f, "snappy"),
130        }
131    }
132}
133
134impl<'de> de::Deserialize<'de> for Compression {
135    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
136    where
137        D: de::Deserializer<'de>,
138    {
139        struct StringOrMap;
140
141        impl<'de> de::Visitor<'de> for StringOrMap {
142            type Value = Compression;
143
144            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
145                f.write_str("string or map")
146            }
147
148            fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
149            where
150                E: de::Error,
151            {
152                match s {
153                    "none" => Ok(Compression::None),
154                    "gzip" => Ok(Compression::gzip_default()),
155                    "zlib" => Ok(Compression::zlib_default()),
156                    "zstd" => Ok(Compression::zstd_default()),
157                    "snappy" => Ok(Compression::Snappy),
158                    _ => Err(de::Error::invalid_value(
159                        de::Unexpected::Str(s),
160                        &r#""none" or "gzip" or "zlib" or "zstd""#,
161                    )),
162                }
163            }
164
165            fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
166            where
167                A: de::MapAccess<'de>,
168            {
169                let mut algorithm = None;
170                let mut level = None;
171
172                while let Some(key) = map.next_key::<String>()? {
173                    match key.as_str() {
174                        "algorithm" => {
175                            if algorithm.is_some() {
176                                return Err(de::Error::duplicate_field("algorithm"));
177                            }
178                            algorithm = Some(map.next_value::<String>()?);
179                        }
180                        "level" => {
181                            if level.is_some() {
182                                return Err(de::Error::duplicate_field("level"));
183                            }
184                            level = Some(map.next_value::<CompressionLevel>()?);
185                        }
186                        _ => return Err(de::Error::unknown_field(&key, &["algorithm", "level"])),
187                    };
188                }
189
190                let compression = match algorithm
191                    .ok_or_else(|| de::Error::missing_field("algorithm"))?
192                    .as_str()
193                {
194                    "none" => match level {
195                        Some(_) => Err(de::Error::unknown_field("level", &[])),
196                        None => Ok(Compression::None),
197                    },
198                    "gzip" => Ok(Compression::Gzip(level.unwrap_or_default())),
199                    "zlib" => Ok(Compression::Zlib(level.unwrap_or_default())),
200                    "zstd" => Ok(Compression::Zstd(level.unwrap_or_default())),
201                    "snappy" => match level {
202                        Some(_) => Err(de::Error::unknown_field("level", &[])),
203                        None => Ok(Compression::Snappy),
204                    },
205                    algorithm => Err(de::Error::unknown_variant(
206                        algorithm,
207                        &["none", "gzip", "zlib", "zstd", "snappy"],
208                    )),
209                }?;
210
211                if let CompressionLevel::Val(level) = compression.compression_level() {
212                    let max_level = compression.max_compression_level_val();
213                    if level > max_level {
214                        let msg = std::format!(
215                            "invalid value `{level}`, expected value in range [0, {max_level}]"
216                        );
217                        return Err(de::Error::custom(msg));
218                    }
219                }
220
221                Ok(compression)
222            }
223        }
224
225        deserializer.deserialize_any(StringOrMap)
226    }
227}
228
229impl ser::Serialize for Compression {
230    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
231    where
232        S: ser::Serializer,
233    {
234        use ser::SerializeMap;
235
236        match self {
237            Compression::None => serializer.serialize_str("none"),
238            Compression::Gzip(gzip_level) => {
239                if *gzip_level != CompressionLevel::Default {
240                    let mut map = serializer.serialize_map(None)?;
241                    map.serialize_entry("algorithm", "gzip")?;
242                    map.serialize_entry("level", &gzip_level)?;
243                    map.end()
244                } else {
245                    serializer.serialize_str("gzip")
246                }
247            }
248            Compression::Zlib(zlib_level) => {
249                if *zlib_level != CompressionLevel::Default {
250                    let mut map = serializer.serialize_map(None)?;
251                    map.serialize_entry("algorithm", "zlib")?;
252                    map.serialize_entry("level", &zlib_level)?;
253                    map.end()
254                } else {
255                    serializer.serialize_str("zlib")
256                }
257            }
258            Compression::Zstd(zstd_level) => {
259                if *zstd_level != CompressionLevel::Default {
260                    let mut map = serializer.serialize_map(None)?;
261                    map.serialize_entry("algorithm", "zstd")?;
262                    map.serialize_entry("level", &zstd_level)?;
263                    map.end()
264                } else {
265                    serializer.serialize_str("zstd")
266                }
267            }
268            Compression::Snappy => serializer.serialize_str("snappy"),
269        }
270    }
271}
272
273pub const ALGORITHM_NAME: &str = "algorithm";
274pub const LEVEL_NAME: &str = "level";
275pub const LOGICAL_NAME: &str = "logical_name";
276pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";
277
278pub fn generate_string_schema(
279    logical_name: &str,
280    title: Option<&'static str>,
281    description: &'static str,
282) -> SchemaObject {
283    let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
284    let mut const_metadata = Metadata::with_description(description);
285    if let Some(title) = title {
286        const_metadata.set_title(title);
287    }
288    const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
289    apply_base_metadata(&mut const_schema, const_metadata);
290    const_schema
291}
292
293// TODO: Consider an approach for generating schema of "string or object" structure used by this type.
294impl Configurable for Compression {
295    fn referenceable_name() -> Option<&'static str> {
296        Some(std::any::type_name::<Self>())
297    }
298
299    fn metadata() -> Metadata {
300        let mut metadata = Metadata::default();
301        metadata.set_title("Compression configuration.");
302        metadata.set_description("All compression algorithms use the default compression level unless otherwise specified.");
303        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
304        metadata.add_custom_attribute(CustomAttribute::flag("docs::advanced"));
305        metadata
306    }
307
308    fn generate_schema(
309        generator: &RefCell<SchemaGenerator>,
310    ) -> Result<SchemaObject, GenerateError> {
311        // First, we'll create the string-only subschemas for each algorithm, and wrap those up
312        // within a one-of schema.
313        let mut string_metadata = Metadata::with_description("Compression algorithm.");
314        string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
315
316        let none_string_subschema = generate_string_schema("None", None, "No compression.");
317        let gzip_string_subschema = generate_string_schema(
318            "Gzip",
319            Some("[Gzip][gzip] compression."),
320            "[gzip]: https://www.gzip.org/",
321        );
322        let zlib_string_subschema = generate_string_schema(
323            "Zlib",
324            Some("[Zlib][zlib] compression."),
325            "[zlib]: https://zlib.net/",
326        );
327
328        let zstd_string_subschema = generate_string_schema(
329            "Zstd",
330            Some("[Zstandard][zstd] compression."),
331            "[zstd]: https://facebook.github.io/zstd/",
332        );
333
334        let snappy_string_subschema = generate_string_schema(
335            "Snappy",
336            Some("[Snappy][snappy] compression."),
337            "[snappy]: https://github.com/google/snappy/blob/main/docs/README.md",
338        );
339
340        let mut all_string_oneof_subschema = generate_one_of_schema(&[
341            none_string_subschema,
342            gzip_string_subschema,
343            zlib_string_subschema,
344            zstd_string_subschema,
345            snappy_string_subschema,
346        ]);
347        apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
348
349        // Next we'll create a full schema for the given algorithms.
350        //
351        // TODO: We're currently using all three algorithms in the enum subschema for `algorithm`,
352        // but in reality, `level` is never used when the algorithm is `none`. This is _currently_
353        // fine because the field is optional, and we don't use `deny_unknown_fields`, so if users
354        // specify it when the algorithm is `none`: no harm, no foul.
355        //
356        // However, it does lead to a suboptimal schema being generated, one that sort of implies it
357        // may have value when set, even if the algorithm is `none`. We do this because, otherwise,
358        // it's very hard to reconcile the resolved schemas during component documentation
359        // generation, where we need to be able to generate the right enum key/value pair for the
360        // `none` algorithm as part of the overall set of enum values declared for the `algorithm`
361        // field in the "full" schema version.
362        let compression_level_schema =
363            get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
364
365        let mut required = BTreeSet::new();
366        required.insert(ALGORITHM_NAME.to_string());
367
368        let mut properties = IndexMap::new();
369        properties.insert(
370            ALGORITHM_NAME.to_string(),
371            all_string_oneof_subschema.clone(),
372        );
373        properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
374
375        let mut full_subschema = generate_struct_schema(properties, required, None);
376        let mut full_metadata =
377            Metadata::with_description("Compression algorithm and compression level.");
378        full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
379        apply_base_metadata(&mut full_subschema, full_metadata);
380
381        // Finally, we zip both schemas together.
382        Ok(generate_one_of_schema(&[
383            all_string_oneof_subschema,
384            full_subschema,
385        ]))
386    }
387}
388
389impl ToValue for Compression {
390    fn to_value(&self) -> Value {
391        serde_json::to_value(self).expect("Could not convert compression settings to JSON")
392    }
393}
394
395/// Compression level.
396#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
397pub enum CompressionLevel {
398    None,
399    #[default]
400    Default,
401    Best,
402    Fast,
403    Val(u32),
404}
405
406impl CompressionLevel {
407    pub const fn const_default() -> Self {
408        CompressionLevel::Default
409    }
410
411    pub fn as_flate2(self) -> flate2::Compression {
412        match self {
413            CompressionLevel::None => flate2::Compression::none(),
414            CompressionLevel::Default => flate2::Compression::default(),
415            CompressionLevel::Best => flate2::Compression::best(),
416            CompressionLevel::Fast => flate2::Compression::fast(),
417            CompressionLevel::Val(level) => flate2::Compression::new(level),
418        }
419    }
420}
421
422impl<'de> de::Deserialize<'de> for CompressionLevel {
423    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
424    where
425        D: de::Deserializer<'de>,
426    {
427        struct NumberOrString;
428
429        impl de::Visitor<'_> for NumberOrString {
430            type Value = CompressionLevel;
431
432            fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
433                f.write_str("unsigned number or string")
434            }
435
436            fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
437            where
438                E: de::Error,
439            {
440                match s {
441                    "none" => Ok(CompressionLevel::None),
442                    "fast" => Ok(CompressionLevel::Fast),
443                    "default" => Ok(CompressionLevel::Default),
444                    "best" => Ok(CompressionLevel::Best),
445                    level => Err(de::Error::invalid_value(
446                        de::Unexpected::Str(level),
447                        &r#""none", "fast", "best" or "default""#,
448                    )),
449                }
450            }
451
452            fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
453            where
454                E: de::Error,
455            {
456                u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
457                    de::Error::custom(format!(
458                        "unsigned integer could not be converted to u32: {err}"
459                    ))
460                })
461            }
462
463            fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
464            where
465                E: de::Error,
466            {
467                u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
468                    de::Error::custom(format!("integer could not be converted to u32: {err}"))
469                })
470            }
471        }
472
473        deserializer.deserialize_any(NumberOrString)
474    }
475}
476
477impl ser::Serialize for CompressionLevel {
478    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
479    where
480        S: ser::Serializer,
481    {
482        match *self {
483            CompressionLevel::None => serializer.serialize_str("none"),
484            CompressionLevel::Default => serializer.serialize_str("default"),
485            CompressionLevel::Best => serializer.serialize_str("best"),
486            CompressionLevel::Fast => serializer.serialize_str("fast"),
487            CompressionLevel::Val(level) => serializer.serialize_u64(u64::from(level)),
488        }
489    }
490}
491
492// TODO: Consider an approach for generating schema of "string or number" structure used by this type.
493impl Configurable for CompressionLevel {
494    fn referenceable_name() -> Option<&'static str> {
495        Some(std::any::type_name::<Self>())
496    }
497
498    fn metadata() -> Metadata {
499        let mut metadata = Metadata::default();
500        metadata.set_description("Compression level.");
501        metadata
502    }
503
504    fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
505        let string_consts = ["none", "fast", "best", "default"]
506            .iter()
507            .map(|s| serde_json::Value::from(*s));
508
509        let level_consts = (0u32..=21).map(serde_json::Value::from);
510
511        let valid_values = string_consts.chain(level_consts).collect();
512        Ok(generate_enum_schema(valid_values))
513    }
514}
515
516impl ToValue for CompressionLevel {
517    fn to_value(&self) -> Value {
518        // FIXME
519        serde_json::to_value(self).expect("Could not convert compression level to JSON")
520    }
521}
522
523#[cfg(test)]
524mod test {
525    use super::{Compression, CompressionLevel};
526
527    #[test]
528    fn deserialization_json() {
529        let fixtures_valid = [
530            (r#""none""#, Compression::None),
531            (r#""gzip""#, Compression::Gzip(CompressionLevel::default())),
532            (r#""zlib""#, Compression::Zlib(CompressionLevel::default())),
533            (r#""snappy""#, Compression::Snappy),
534            (r#"{"algorithm": "none"}"#, Compression::None),
535            (
536                r#"{"algorithm": "gzip"}"#,
537                Compression::Gzip(CompressionLevel::default()),
538            ),
539            (
540                r#"{"algorithm": "gzip", "level": "best"}"#,
541                Compression::Gzip(CompressionLevel::Best),
542            ),
543            (
544                r#"{"algorithm": "gzip", "level": 8}"#,
545                Compression::Gzip(CompressionLevel::Val(8)),
546            ),
547            (
548                r#"{"algorithm": "zlib"}"#,
549                Compression::Zlib(CompressionLevel::default()),
550            ),
551            (
552                r#"{"algorithm": "zlib", "level": "best"}"#,
553                Compression::Zlib(CompressionLevel::Best),
554            ),
555            (
556                r#"{"algorithm": "zlib", "level": 8}"#,
557                Compression::Zlib(CompressionLevel::Val(8)),
558            ),
559        ];
560        for (sources, result) in fixtures_valid.iter() {
561            let deserialized: Result<Compression, _> = serde_json::from_str(sources);
562            assert_eq!(deserialized.expect("valid source"), *result);
563        }
564
565        let fixtures_invalid = [
566            (
567                r"42",
568                r"invalid type: integer `42`, expected string or map at line 1 column 2",
569            ),
570            (
571                r#""b42""#,
572                r#"invalid value: string "b42", expected "none" or "gzip" or "zlib" or "zstd" at line 1 column 5"#,
573            ),
574            (
575                r#"{"algorithm": "b42"}"#,
576                r"unknown variant `b42`, expected one of `none`, `gzip`, `zlib`, `zstd`, `snappy` at line 1 column 20",
577            ),
578            (
579                r#"{"algorithm": "none", "level": "default"}"#,
580                r"unknown field `level`, there are no fields at line 1 column 41",
581            ),
582            (
583                r#"{"algorithm": "gzip", "level": -1}"#,
584                r"integer could not be converted to u32: out of range integral type conversion attempted at line 1 column 33",
585            ),
586            (
587                r#"{"algorithm": "gzip", "level": "good"}"#,
588                r#"invalid value: string "good", expected "none", "fast", "best" or "default" at line 1 column 37"#,
589            ),
590            (
591                r#"{"algorithm": "gzip", "level": {}}"#,
592                r"invalid type: map, expected unsigned number or string at line 1 column 33",
593            ),
594            (
595                r#"{"algorithm": "gzip", "level": "default", "key": 42}"#,
596                r"unknown field `key`, expected `algorithm` or `level` at line 1 column 47",
597            ),
598            (
599                r#"{"algorithm": "gzip", "level": 10}"#,
600                r"invalid value `10`, expected value in range [0, 9] at line 1 column 34",
601            ),
602            (
603                r#"{"algorithm": "zstd", "level": 22}"#,
604                r"invalid value `22`, expected value in range [0, 21] at line 1 column 34",
605            ),
606            (
607                r#"{"algorithm": "snappy", "level": 3}"#,
608                r"unknown field `level`, there are no fields at line 1 column 35",
609            ),
610        ];
611        for (source, result) in fixtures_invalid.iter() {
612            let deserialized: Result<Compression, _> = serde_json::from_str(source);
613            let error = deserialized.expect_err("invalid source");
614            assert_eq!(error.to_string().as_str(), *result);
615        }
616    }
617
618    #[test]
619    fn deserialization_toml() {
620        let fixtures_valid = [
621            // TOML differs from YAML and JSON by always parsing integers as signed
622            (
623                r#"algorithm = "gzip"
624                   level = 8"#,
625                Compression::Gzip(CompressionLevel::Val(8)),
626            ),
627        ];
628        for (sources, result) in fixtures_valid.iter() {
629            let deserialized: Result<Compression, _> = toml::from_str(sources);
630            assert_eq!(deserialized.expect("valid source"), *result);
631        }
632    }
633
634    #[test]
635    fn from_and_to_value() {
636        let fixtures_valid = [
637            Compression::None,
638            Compression::Gzip(CompressionLevel::default()),
639            Compression::Gzip(CompressionLevel::Val(7)),
640            Compression::Zlib(CompressionLevel::Best),
641            Compression::Zlib(CompressionLevel::Val(7)),
642            Compression::Zstd(CompressionLevel::Val(6)),
643            Compression::Zstd(CompressionLevel::default()),
644            Compression::Zstd(CompressionLevel::Best),
645            Compression::Zstd(CompressionLevel::Fast),
646        ];
647
648        for v in fixtures_valid {
649            // Check serialize-deserialize round trip with defaults
650            let value = serde_json::to_value(v).unwrap();
651            serde_json::from_value::<Compression>(value).unwrap();
652        }
653    }
654}