1use std::{cell::RefCell, collections::BTreeSet, fmt};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::attributes::CustomAttribute;
7use vector_lib::configurable::{
8 schema::{
9 apply_base_metadata, generate_const_string_schema, generate_enum_schema,
10 generate_one_of_schema, generate_struct_schema, get_or_generate_schema, SchemaGenerator,
11 SchemaObject,
12 },
13 Configurable, GenerateError, Metadata, ToValue,
14};
15
16use crate::sinks::util::zstd::ZstdCompressionLevel;
17
18#[derive(Copy, Clone, Debug, Derivative, Eq, PartialEq)]
20#[derivative(Default)]
21pub enum Compression {
22 #[derivative(Default)]
24 None,
25
26 Gzip(CompressionLevel),
30
31 Zlib(CompressionLevel),
35
36 Zstd(CompressionLevel),
40
41 Snappy,
45}
46
47impl Compression {
48 pub const fn is_compressed(&self) -> bool {
57 !matches!(self, Compression::None)
58 }
59
60 pub const fn gzip_default() -> Compression {
61 Compression::Gzip(CompressionLevel::const_default())
62 }
63
64 pub const fn zlib_default() -> Compression {
65 Compression::Zlib(CompressionLevel::const_default())
66 }
67
68 pub const fn zstd_default() -> Compression {
69 Compression::Zstd(CompressionLevel::const_default())
70 }
71
72 pub const fn content_encoding(self) -> Option<&'static str> {
73 match self {
74 Self::None => None,
75 Self::Gzip(_) => Some("gzip"),
76 Self::Zlib(_) => Some("deflate"),
77 Self::Zstd(_) => Some("zstd"),
78 Self::Snappy => Some("snappy"),
79 }
80 }
81
82 pub const fn accept_encoding(self) -> Option<&'static str> {
83 match self {
84 Self::Gzip(_) => Some("gzip"),
85 Self::Zlib(_) => Some("deflate"),
86 Self::Zstd(_) => Some("zstd"),
87 Self::Snappy => Some("snappy"),
88 _ => None,
89 }
90 }
91
92 pub const fn extension(self) -> &'static str {
93 match self {
94 Self::None => "log",
95 Self::Gzip(_) => "log.gz",
96 Self::Zlib(_) => "log.zz",
97 Self::Zstd(_) => "log.zst",
98 Self::Snappy => "log.snappy",
99 }
100 }
101
102 pub const fn max_compression_level_val(self) -> u32 {
103 match self {
104 Compression::None => 0,
105 Compression::Gzip(_) => 9,
106 Compression::Zlib(_) => 9,
107 Compression::Zstd(_) => 21,
108 Compression::Snappy => 0,
109 }
110 }
111
112 pub const fn compression_level(self) -> CompressionLevel {
113 match self {
114 Self::None | Self::Snappy => CompressionLevel::None,
115 Self::Gzip(level) | Self::Zlib(level) | Self::Zstd(level) => level,
116 }
117 }
118}
119
120impl fmt::Display for Compression {
121 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122 match *self {
123 Compression::None => write!(f, "none"),
124 Compression::Gzip(ref level) => write!(f, "gzip({})", level.as_flate2().level()),
125 Compression::Zlib(ref level) => write!(f, "zlib({})", level.as_flate2().level()),
126 Compression::Zstd(ref level) => {
127 write!(f, "zstd({})", ZstdCompressionLevel::from(*level))
128 }
129 Compression::Snappy => write!(f, "snappy"),
130 }
131 }
132}
133
134impl<'de> de::Deserialize<'de> for Compression {
135 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
136 where
137 D: de::Deserializer<'de>,
138 {
139 struct StringOrMap;
140
141 impl<'de> de::Visitor<'de> for StringOrMap {
142 type Value = Compression;
143
144 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 f.write_str("string or map")
146 }
147
148 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
149 where
150 E: de::Error,
151 {
152 match s {
153 "none" => Ok(Compression::None),
154 "gzip" => Ok(Compression::gzip_default()),
155 "zlib" => Ok(Compression::zlib_default()),
156 "zstd" => Ok(Compression::zstd_default()),
157 "snappy" => Ok(Compression::Snappy),
158 _ => Err(de::Error::invalid_value(
159 de::Unexpected::Str(s),
160 &r#""none" or "gzip" or "zlib" or "zstd""#,
161 )),
162 }
163 }
164
165 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
166 where
167 A: de::MapAccess<'de>,
168 {
169 let mut algorithm = None;
170 let mut level = None;
171
172 while let Some(key) = map.next_key::<String>()? {
173 match key.as_str() {
174 "algorithm" => {
175 if algorithm.is_some() {
176 return Err(de::Error::duplicate_field("algorithm"));
177 }
178 algorithm = Some(map.next_value::<String>()?);
179 }
180 "level" => {
181 if level.is_some() {
182 return Err(de::Error::duplicate_field("level"));
183 }
184 level = Some(map.next_value::<CompressionLevel>()?);
185 }
186 _ => return Err(de::Error::unknown_field(&key, &["algorithm", "level"])),
187 };
188 }
189
190 let compression = match algorithm
191 .ok_or_else(|| de::Error::missing_field("algorithm"))?
192 .as_str()
193 {
194 "none" => match level {
195 Some(_) => Err(de::Error::unknown_field("level", &[])),
196 None => Ok(Compression::None),
197 },
198 "gzip" => Ok(Compression::Gzip(level.unwrap_or_default())),
199 "zlib" => Ok(Compression::Zlib(level.unwrap_or_default())),
200 "zstd" => Ok(Compression::Zstd(level.unwrap_or_default())),
201 "snappy" => match level {
202 Some(_) => Err(de::Error::unknown_field("level", &[])),
203 None => Ok(Compression::Snappy),
204 },
205 algorithm => Err(de::Error::unknown_variant(
206 algorithm,
207 &["none", "gzip", "zlib", "zstd", "snappy"],
208 )),
209 }?;
210
211 if let CompressionLevel::Val(level) = compression.compression_level() {
212 let max_level = compression.max_compression_level_val();
213 if level > max_level {
214 let msg = std::format!(
215 "invalid value `{level}`, expected value in range [0, {max_level}]"
216 );
217 return Err(de::Error::custom(msg));
218 }
219 }
220
221 Ok(compression)
222 }
223 }
224
225 deserializer.deserialize_any(StringOrMap)
226 }
227}
228
229impl ser::Serialize for Compression {
230 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
231 where
232 S: ser::Serializer,
233 {
234 use ser::SerializeMap;
235
236 match self {
237 Compression::None => serializer.serialize_str("none"),
238 Compression::Gzip(gzip_level) => {
239 if *gzip_level != CompressionLevel::Default {
240 let mut map = serializer.serialize_map(None)?;
241 map.serialize_entry("algorithm", "gzip")?;
242 map.serialize_entry("level", &gzip_level)?;
243 map.end()
244 } else {
245 serializer.serialize_str("gzip")
246 }
247 }
248 Compression::Zlib(zlib_level) => {
249 if *zlib_level != CompressionLevel::Default {
250 let mut map = serializer.serialize_map(None)?;
251 map.serialize_entry("algorithm", "zlib")?;
252 map.serialize_entry("level", &zlib_level)?;
253 map.end()
254 } else {
255 serializer.serialize_str("zlib")
256 }
257 }
258 Compression::Zstd(zstd_level) => {
259 if *zstd_level != CompressionLevel::Default {
260 let mut map = serializer.serialize_map(None)?;
261 map.serialize_entry("algorithm", "zstd")?;
262 map.serialize_entry("level", &zstd_level)?;
263 map.end()
264 } else {
265 serializer.serialize_str("zstd")
266 }
267 }
268 Compression::Snappy => serializer.serialize_str("snappy"),
269 }
270 }
271}
272
273pub const ALGORITHM_NAME: &str = "algorithm";
274pub const LEVEL_NAME: &str = "level";
275pub const LOGICAL_NAME: &str = "logical_name";
276pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";
277
278pub fn generate_string_schema(
279 logical_name: &str,
280 title: Option<&'static str>,
281 description: &'static str,
282) -> SchemaObject {
283 let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
284 let mut const_metadata = Metadata::with_description(description);
285 if let Some(title) = title {
286 const_metadata.set_title(title);
287 }
288 const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
289 apply_base_metadata(&mut const_schema, const_metadata);
290 const_schema
291}
292
293impl Configurable for Compression {
295 fn referenceable_name() -> Option<&'static str> {
296 Some(std::any::type_name::<Self>())
297 }
298
299 fn metadata() -> Metadata {
300 let mut metadata = Metadata::default();
301 metadata.set_title("Compression configuration.");
302 metadata.set_description("All compression algorithms use the default compression level unless otherwise specified.");
303 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
304 metadata.add_custom_attribute(CustomAttribute::flag("docs::advanced"));
305 metadata
306 }
307
308 fn generate_schema(
309 generator: &RefCell<SchemaGenerator>,
310 ) -> Result<SchemaObject, GenerateError> {
311 let mut string_metadata = Metadata::with_description("Compression algorithm.");
314 string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
315
316 let none_string_subschema = generate_string_schema("None", None, "No compression.");
317 let gzip_string_subschema = generate_string_schema(
318 "Gzip",
319 Some("[Gzip][gzip] compression."),
320 "[gzip]: https://www.gzip.org/",
321 );
322 let zlib_string_subschema = generate_string_schema(
323 "Zlib",
324 Some("[Zlib][zlib] compression."),
325 "[zlib]: https://zlib.net/",
326 );
327
328 let zstd_string_subschema = generate_string_schema(
329 "Zstd",
330 Some("[Zstandard][zstd] compression."),
331 "[zstd]: https://facebook.github.io/zstd/",
332 );
333
334 let snappy_string_subschema = generate_string_schema(
335 "Snappy",
336 Some("[Snappy][snappy] compression."),
337 "[snappy]: https://github.com/google/snappy/blob/main/docs/README.md",
338 );
339
340 let mut all_string_oneof_subschema = generate_one_of_schema(&[
341 none_string_subschema,
342 gzip_string_subschema,
343 zlib_string_subschema,
344 zstd_string_subschema,
345 snappy_string_subschema,
346 ]);
347 apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
348
349 let compression_level_schema =
363 get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
364
365 let mut required = BTreeSet::new();
366 required.insert(ALGORITHM_NAME.to_string());
367
368 let mut properties = IndexMap::new();
369 properties.insert(
370 ALGORITHM_NAME.to_string(),
371 all_string_oneof_subschema.clone(),
372 );
373 properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
374
375 let mut full_subschema = generate_struct_schema(properties, required, None);
376 let mut full_metadata =
377 Metadata::with_description("Compression algorithm and compression level.");
378 full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
379 apply_base_metadata(&mut full_subschema, full_metadata);
380
381 Ok(generate_one_of_schema(&[
383 all_string_oneof_subschema,
384 full_subschema,
385 ]))
386 }
387}
388
389impl ToValue for Compression {
390 fn to_value(&self) -> Value {
391 serde_json::to_value(self).expect("Could not convert compression settings to JSON")
392 }
393}
394
395#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
397pub enum CompressionLevel {
398 None,
399 #[default]
400 Default,
401 Best,
402 Fast,
403 Val(u32),
404}
405
406impl CompressionLevel {
407 pub const fn const_default() -> Self {
408 CompressionLevel::Default
409 }
410
411 pub fn as_flate2(self) -> flate2::Compression {
412 match self {
413 CompressionLevel::None => flate2::Compression::none(),
414 CompressionLevel::Default => flate2::Compression::default(),
415 CompressionLevel::Best => flate2::Compression::best(),
416 CompressionLevel::Fast => flate2::Compression::fast(),
417 CompressionLevel::Val(level) => flate2::Compression::new(level),
418 }
419 }
420}
421
422impl<'de> de::Deserialize<'de> for CompressionLevel {
423 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
424 where
425 D: de::Deserializer<'de>,
426 {
427 struct NumberOrString;
428
429 impl de::Visitor<'_> for NumberOrString {
430 type Value = CompressionLevel;
431
432 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
433 f.write_str("unsigned number or string")
434 }
435
436 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
437 where
438 E: de::Error,
439 {
440 match s {
441 "none" => Ok(CompressionLevel::None),
442 "fast" => Ok(CompressionLevel::Fast),
443 "default" => Ok(CompressionLevel::Default),
444 "best" => Ok(CompressionLevel::Best),
445 level => Err(de::Error::invalid_value(
446 de::Unexpected::Str(level),
447 &r#""none", "fast", "best" or "default""#,
448 )),
449 }
450 }
451
452 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
453 where
454 E: de::Error,
455 {
456 u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
457 de::Error::custom(format!(
458 "unsigned integer could not be converted to u32: {err}"
459 ))
460 })
461 }
462
463 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
464 where
465 E: de::Error,
466 {
467 u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
468 de::Error::custom(format!("integer could not be converted to u32: {err}"))
469 })
470 }
471 }
472
473 deserializer.deserialize_any(NumberOrString)
474 }
475}
476
477impl ser::Serialize for CompressionLevel {
478 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
479 where
480 S: ser::Serializer,
481 {
482 match *self {
483 CompressionLevel::None => serializer.serialize_str("none"),
484 CompressionLevel::Default => serializer.serialize_str("default"),
485 CompressionLevel::Best => serializer.serialize_str("best"),
486 CompressionLevel::Fast => serializer.serialize_str("fast"),
487 CompressionLevel::Val(level) => serializer.serialize_u64(u64::from(level)),
488 }
489 }
490}
491
492impl Configurable for CompressionLevel {
494 fn referenceable_name() -> Option<&'static str> {
495 Some(std::any::type_name::<Self>())
496 }
497
498 fn metadata() -> Metadata {
499 let mut metadata = Metadata::default();
500 metadata.set_description("Compression level.");
501 metadata
502 }
503
504 fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
505 let string_consts = ["none", "fast", "best", "default"]
506 .iter()
507 .map(|s| serde_json::Value::from(*s));
508
509 let level_consts = (0u32..=21).map(serde_json::Value::from);
510
511 let valid_values = string_consts.chain(level_consts).collect();
512 Ok(generate_enum_schema(valid_values))
513 }
514}
515
516impl ToValue for CompressionLevel {
517 fn to_value(&self) -> Value {
518 serde_json::to_value(self).expect("Could not convert compression level to JSON")
520 }
521}
522
523#[cfg(test)]
524mod test {
525 use super::{Compression, CompressionLevel};
526
527 #[test]
528 fn deserialization_json() {
529 let fixtures_valid = [
530 (r#""none""#, Compression::None),
531 (r#""gzip""#, Compression::Gzip(CompressionLevel::default())),
532 (r#""zlib""#, Compression::Zlib(CompressionLevel::default())),
533 (r#""snappy""#, Compression::Snappy),
534 (r#"{"algorithm": "none"}"#, Compression::None),
535 (
536 r#"{"algorithm": "gzip"}"#,
537 Compression::Gzip(CompressionLevel::default()),
538 ),
539 (
540 r#"{"algorithm": "gzip", "level": "best"}"#,
541 Compression::Gzip(CompressionLevel::Best),
542 ),
543 (
544 r#"{"algorithm": "gzip", "level": 8}"#,
545 Compression::Gzip(CompressionLevel::Val(8)),
546 ),
547 (
548 r#"{"algorithm": "zlib"}"#,
549 Compression::Zlib(CompressionLevel::default()),
550 ),
551 (
552 r#"{"algorithm": "zlib", "level": "best"}"#,
553 Compression::Zlib(CompressionLevel::Best),
554 ),
555 (
556 r#"{"algorithm": "zlib", "level": 8}"#,
557 Compression::Zlib(CompressionLevel::Val(8)),
558 ),
559 ];
560 for (sources, result) in fixtures_valid.iter() {
561 let deserialized: Result<Compression, _> = serde_json::from_str(sources);
562 assert_eq!(deserialized.expect("valid source"), *result);
563 }
564
565 let fixtures_invalid = [
566 (
567 r"42",
568 r"invalid type: integer `42`, expected string or map at line 1 column 2",
569 ),
570 (
571 r#""b42""#,
572 r#"invalid value: string "b42", expected "none" or "gzip" or "zlib" or "zstd" at line 1 column 5"#,
573 ),
574 (
575 r#"{"algorithm": "b42"}"#,
576 r"unknown variant `b42`, expected one of `none`, `gzip`, `zlib`, `zstd`, `snappy` at line 1 column 20",
577 ),
578 (
579 r#"{"algorithm": "none", "level": "default"}"#,
580 r"unknown field `level`, there are no fields at line 1 column 41",
581 ),
582 (
583 r#"{"algorithm": "gzip", "level": -1}"#,
584 r"integer could not be converted to u32: out of range integral type conversion attempted at line 1 column 33",
585 ),
586 (
587 r#"{"algorithm": "gzip", "level": "good"}"#,
588 r#"invalid value: string "good", expected "none", "fast", "best" or "default" at line 1 column 37"#,
589 ),
590 (
591 r#"{"algorithm": "gzip", "level": {}}"#,
592 r"invalid type: map, expected unsigned number or string at line 1 column 33",
593 ),
594 (
595 r#"{"algorithm": "gzip", "level": "default", "key": 42}"#,
596 r"unknown field `key`, expected `algorithm` or `level` at line 1 column 47",
597 ),
598 (
599 r#"{"algorithm": "gzip", "level": 10}"#,
600 r"invalid value `10`, expected value in range [0, 9] at line 1 column 34",
601 ),
602 (
603 r#"{"algorithm": "zstd", "level": 22}"#,
604 r"invalid value `22`, expected value in range [0, 21] at line 1 column 34",
605 ),
606 (
607 r#"{"algorithm": "snappy", "level": 3}"#,
608 r"unknown field `level`, there are no fields at line 1 column 35",
609 ),
610 ];
611 for (source, result) in fixtures_invalid.iter() {
612 let deserialized: Result<Compression, _> = serde_json::from_str(source);
613 let error = deserialized.expect_err("invalid source");
614 assert_eq!(error.to_string().as_str(), *result);
615 }
616 }
617
618 #[test]
619 fn deserialization_toml() {
620 let fixtures_valid = [
621 (
623 r#"algorithm = "gzip"
624 level = 8"#,
625 Compression::Gzip(CompressionLevel::Val(8)),
626 ),
627 ];
628 for (sources, result) in fixtures_valid.iter() {
629 let deserialized: Result<Compression, _> = toml::from_str(sources);
630 assert_eq!(deserialized.expect("valid source"), *result);
631 }
632 }
633
634 #[test]
635 fn from_and_to_value() {
636 let fixtures_valid = [
637 Compression::None,
638 Compression::Gzip(CompressionLevel::default()),
639 Compression::Gzip(CompressionLevel::Val(7)),
640 Compression::Zlib(CompressionLevel::Best),
641 Compression::Zlib(CompressionLevel::Val(7)),
642 Compression::Zstd(CompressionLevel::Val(6)),
643 Compression::Zstd(CompressionLevel::default()),
644 Compression::Zstd(CompressionLevel::Best),
645 Compression::Zstd(CompressionLevel::Fast),
646 ];
647
648 for v in fixtures_valid {
649 let value = serde_json::to_value(v).unwrap();
651 serde_json::from_value::<Compression>(value).unwrap();
652 }
653 }
654}