1use std::{cell::RefCell, collections::BTreeSet, fmt};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::{
7 Configurable, GenerateError, Metadata, ToValue,
8 attributes::CustomAttribute,
9 schema::{
10 SchemaGenerator, SchemaObject, apply_base_metadata, generate_const_string_schema,
11 generate_enum_schema, generate_one_of_schema, generate_struct_schema,
12 get_or_generate_schema,
13 },
14};
15
16use crate::sinks::util::zstd::ZstdCompressionLevel;
17
18#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
20pub enum Compression {
21 #[default]
23 None,
24
25 Gzip(CompressionLevel),
29
30 Zlib(CompressionLevel),
34
35 Zstd(CompressionLevel),
39
40 Snappy,
44}
45
46impl Compression {
47 pub const fn is_compressed(&self) -> bool {
56 !matches!(self, Compression::None)
57 }
58
59 pub const fn gzip_default() -> Compression {
60 Compression::Gzip(CompressionLevel::const_default())
61 }
62
63 pub const fn zlib_default() -> Compression {
64 Compression::Zlib(CompressionLevel::const_default())
65 }
66
67 pub const fn zstd_default() -> Compression {
68 Compression::Zstd(CompressionLevel::const_default())
69 }
70
71 pub const fn content_encoding(self) -> Option<&'static str> {
72 match self {
73 Self::None => None,
74 Self::Gzip(_) => Some("gzip"),
75 Self::Zlib(_) => Some("deflate"),
76 Self::Zstd(_) => Some("zstd"),
77 Self::Snappy => Some("snappy"),
78 }
79 }
80
81 pub const fn accept_encoding(self) -> Option<&'static str> {
82 match self {
83 Self::Gzip(_) => Some("gzip"),
84 Self::Zlib(_) => Some("deflate"),
85 Self::Zstd(_) => Some("zstd"),
86 Self::Snappy => Some("snappy"),
87 _ => None,
88 }
89 }
90
91 pub const fn extension(self) -> &'static str {
92 match self {
93 Self::None => "log",
94 Self::Gzip(_) => "log.gz",
95 Self::Zlib(_) => "log.zz",
96 Self::Zstd(_) => "log.zst",
97 Self::Snappy => "log.snappy",
98 }
99 }
100
101 pub const fn max_compression_level_val(self) -> u32 {
102 match self {
103 Compression::None => 0,
104 Compression::Gzip(_) => 9,
105 Compression::Zlib(_) => 9,
106 Compression::Zstd(_) => 21,
107 Compression::Snappy => 0,
108 }
109 }
110
111 pub const fn compression_level(self) -> CompressionLevel {
112 match self {
113 Self::None | Self::Snappy => CompressionLevel::None,
114 Self::Gzip(level) | Self::Zlib(level) | Self::Zstd(level) => level,
115 }
116 }
117}
118
119impl fmt::Display for Compression {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 match *self {
122 Compression::None => write!(f, "none"),
123 Compression::Gzip(ref level) => write!(f, "gzip({})", level.as_flate2().level()),
124 Compression::Zlib(ref level) => write!(f, "zlib({})", level.as_flate2().level()),
125 Compression::Zstd(ref level) => {
126 write!(f, "zstd({})", ZstdCompressionLevel::from(*level))
127 }
128 Compression::Snappy => write!(f, "snappy"),
129 }
130 }
131}
132
133impl<'de> de::Deserialize<'de> for Compression {
134 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
135 where
136 D: de::Deserializer<'de>,
137 {
138 struct StringOrMap;
139
140 impl<'de> de::Visitor<'de> for StringOrMap {
141 type Value = Compression;
142
143 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
144 f.write_str("string or map")
145 }
146
147 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
148 where
149 E: de::Error,
150 {
151 match s {
152 "none" => Ok(Compression::None),
153 "gzip" => Ok(Compression::gzip_default()),
154 "zlib" => Ok(Compression::zlib_default()),
155 "zstd" => Ok(Compression::zstd_default()),
156 "snappy" => Ok(Compression::Snappy),
157 _ => Err(de::Error::invalid_value(
158 de::Unexpected::Str(s),
159 &r#""none" or "gzip" or "zlib" or "zstd""#,
160 )),
161 }
162 }
163
164 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
165 where
166 A: de::MapAccess<'de>,
167 {
168 let mut algorithm = None;
169 let mut level = None;
170
171 while let Some(key) = map.next_key::<String>()? {
172 match key.as_str() {
173 "algorithm" => {
174 if algorithm.is_some() {
175 return Err(de::Error::duplicate_field("algorithm"));
176 }
177 algorithm = Some(map.next_value::<String>()?);
178 }
179 "level" => {
180 if level.is_some() {
181 return Err(de::Error::duplicate_field("level"));
182 }
183 level = Some(map.next_value::<CompressionLevel>()?);
184 }
185 _ => return Err(de::Error::unknown_field(&key, &["algorithm", "level"])),
186 };
187 }
188
189 let compression = match algorithm
190 .ok_or_else(|| de::Error::missing_field("algorithm"))?
191 .as_str()
192 {
193 "none" => match level {
194 Some(_) => Err(de::Error::unknown_field("level", &[])),
195 None => Ok(Compression::None),
196 },
197 "gzip" => Ok(Compression::Gzip(level.unwrap_or_default())),
198 "zlib" => Ok(Compression::Zlib(level.unwrap_or_default())),
199 "zstd" => Ok(Compression::Zstd(level.unwrap_or_default())),
200 "snappy" => match level {
201 Some(_) => Err(de::Error::unknown_field("level", &[])),
202 None => Ok(Compression::Snappy),
203 },
204 algorithm => Err(de::Error::unknown_variant(
205 algorithm,
206 &["none", "gzip", "zlib", "zstd", "snappy"],
207 )),
208 }?;
209
210 if let CompressionLevel::Val(level) = compression.compression_level() {
211 let max_level = compression.max_compression_level_val();
212 if level > max_level {
213 let msg = std::format!(
214 "invalid value `{level}`, expected value in range [0, {max_level}]"
215 );
216 return Err(de::Error::custom(msg));
217 }
218 }
219
220 Ok(compression)
221 }
222 }
223
224 deserializer.deserialize_any(StringOrMap)
225 }
226}
227
228impl ser::Serialize for Compression {
229 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
230 where
231 S: ser::Serializer,
232 {
233 use ser::SerializeMap;
234
235 match self {
236 Compression::None => serializer.serialize_str("none"),
237 Compression::Gzip(gzip_level) => {
238 if *gzip_level != CompressionLevel::Default {
239 let mut map = serializer.serialize_map(None)?;
240 map.serialize_entry("algorithm", "gzip")?;
241 map.serialize_entry("level", &gzip_level)?;
242 map.end()
243 } else {
244 serializer.serialize_str("gzip")
245 }
246 }
247 Compression::Zlib(zlib_level) => {
248 if *zlib_level != CompressionLevel::Default {
249 let mut map = serializer.serialize_map(None)?;
250 map.serialize_entry("algorithm", "zlib")?;
251 map.serialize_entry("level", &zlib_level)?;
252 map.end()
253 } else {
254 serializer.serialize_str("zlib")
255 }
256 }
257 Compression::Zstd(zstd_level) => {
258 if *zstd_level != CompressionLevel::Default {
259 let mut map = serializer.serialize_map(None)?;
260 map.serialize_entry("algorithm", "zstd")?;
261 map.serialize_entry("level", &zstd_level)?;
262 map.end()
263 } else {
264 serializer.serialize_str("zstd")
265 }
266 }
267 Compression::Snappy => serializer.serialize_str("snappy"),
268 }
269 }
270}
271
272pub const ALGORITHM_NAME: &str = "algorithm";
273pub const LEVEL_NAME: &str = "level";
274pub const LOGICAL_NAME: &str = "logical_name";
275pub const ENUM_TAGGING_MODE: &str = "docs::enum_tagging";
276
277pub fn generate_string_schema(
278 logical_name: &str,
279 title: Option<&'static str>,
280 description: &'static str,
281) -> SchemaObject {
282 let mut const_schema = generate_const_string_schema(logical_name.to_lowercase());
283 let mut const_metadata = Metadata::with_description(description);
284 if let Some(title) = title {
285 const_metadata.set_title(title);
286 }
287 const_metadata.add_custom_attribute(CustomAttribute::kv(LOGICAL_NAME, logical_name));
288 apply_base_metadata(&mut const_schema, const_metadata);
289 const_schema
290}
291
292impl Configurable for Compression {
294 fn referenceable_name() -> Option<&'static str> {
295 Some(std::any::type_name::<Self>())
296 }
297
298 fn metadata() -> Metadata {
299 let mut metadata = Metadata::default();
300 metadata.set_title("Compression configuration.");
301 metadata.set_description("All compression algorithms use the default compression level unless otherwise specified.");
302 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "external"));
303 metadata.add_custom_attribute(CustomAttribute::flag("docs::advanced"));
304 metadata
305 }
306
307 fn generate_schema(
308 generator: &RefCell<SchemaGenerator>,
309 ) -> Result<SchemaObject, GenerateError> {
310 let mut string_metadata = Metadata::with_description("Compression algorithm.");
313 string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
314
315 let none_string_subschema = generate_string_schema("None", None, "No compression.");
316 let gzip_string_subschema = generate_string_schema(
317 "Gzip",
318 Some("[Gzip][gzip] compression."),
319 "[gzip]: https://www.gzip.org/",
320 );
321 let zlib_string_subschema = generate_string_schema(
322 "Zlib",
323 Some("[Zlib][zlib] compression."),
324 "[zlib]: https://zlib.net/",
325 );
326
327 let zstd_string_subschema = generate_string_schema(
328 "Zstd",
329 Some("[Zstandard][zstd] compression."),
330 "[zstd]: https://facebook.github.io/zstd/",
331 );
332
333 let snappy_string_subschema = generate_string_schema(
334 "Snappy",
335 Some("[Snappy][snappy] compression."),
336 "[snappy]: https://github.com/google/snappy/blob/main/docs/README.md",
337 );
338
339 let mut all_string_oneof_subschema = generate_one_of_schema(&[
340 none_string_subschema,
341 gzip_string_subschema,
342 zlib_string_subschema,
343 zstd_string_subschema,
344 snappy_string_subschema,
345 ]);
346 apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
347
348 let compression_level_schema =
362 get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
363
364 let mut required = BTreeSet::new();
365 required.insert(ALGORITHM_NAME.to_string());
366
367 let mut properties = IndexMap::new();
368 properties.insert(
369 ALGORITHM_NAME.to_string(),
370 all_string_oneof_subschema.clone(),
371 );
372 properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
373
374 let mut full_subschema = generate_struct_schema(properties, required, None);
375 let mut full_metadata =
376 Metadata::with_description("Compression algorithm and compression level.");
377 full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
378 apply_base_metadata(&mut full_subschema, full_metadata);
379
380 Ok(generate_one_of_schema(&[
382 all_string_oneof_subschema,
383 full_subschema,
384 ]))
385 }
386}
387
388impl ToValue for Compression {
389 fn to_value(&self) -> Value {
390 serde_json::to_value(self).expect("Could not convert compression settings to JSON")
391 }
392}
393
394#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
396pub enum CompressionLevel {
397 None,
398 #[default]
399 Default,
400 Best,
401 Fast,
402 Val(u32),
403}
404
405impl CompressionLevel {
406 pub const fn const_default() -> Self {
407 CompressionLevel::Default
408 }
409
410 pub fn as_flate2(self) -> flate2::Compression {
411 match self {
412 CompressionLevel::None => flate2::Compression::none(),
413 CompressionLevel::Default => flate2::Compression::default(),
414 CompressionLevel::Best => flate2::Compression::best(),
415 CompressionLevel::Fast => flate2::Compression::fast(),
416 CompressionLevel::Val(level) => flate2::Compression::new(level),
417 }
418 }
419}
420
421impl<'de> de::Deserialize<'de> for CompressionLevel {
422 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
423 where
424 D: de::Deserializer<'de>,
425 {
426 struct NumberOrString;
427
428 impl de::Visitor<'_> for NumberOrString {
429 type Value = CompressionLevel;
430
431 fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
432 f.write_str("unsigned number or string")
433 }
434
435 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
436 where
437 E: de::Error,
438 {
439 match s {
440 "none" => Ok(CompressionLevel::None),
441 "fast" => Ok(CompressionLevel::Fast),
442 "default" => Ok(CompressionLevel::Default),
443 "best" => Ok(CompressionLevel::Best),
444 level => Err(de::Error::invalid_value(
445 de::Unexpected::Str(level),
446 &r#""none", "fast", "best" or "default""#,
447 )),
448 }
449 }
450
451 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
452 where
453 E: de::Error,
454 {
455 u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
456 de::Error::custom(format!(
457 "unsigned integer could not be converted to u32: {err}"
458 ))
459 })
460 }
461
462 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
463 where
464 E: de::Error,
465 {
466 u32::try_from(v).map(CompressionLevel::Val).map_err(|err| {
467 de::Error::custom(format!("integer could not be converted to u32: {err}"))
468 })
469 }
470 }
471
472 deserializer.deserialize_any(NumberOrString)
473 }
474}
475
476impl ser::Serialize for CompressionLevel {
477 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
478 where
479 S: ser::Serializer,
480 {
481 match *self {
482 CompressionLevel::None => serializer.serialize_str("none"),
483 CompressionLevel::Default => serializer.serialize_str("default"),
484 CompressionLevel::Best => serializer.serialize_str("best"),
485 CompressionLevel::Fast => serializer.serialize_str("fast"),
486 CompressionLevel::Val(level) => serializer.serialize_u64(u64::from(level)),
487 }
488 }
489}
490
491impl Configurable for CompressionLevel {
493 fn referenceable_name() -> Option<&'static str> {
494 Some(std::any::type_name::<Self>())
495 }
496
497 fn metadata() -> Metadata {
498 let mut metadata = Metadata::default();
499 metadata.set_description("Compression level.");
500 metadata
501 }
502
503 fn generate_schema(_: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
504 let string_consts = ["none", "fast", "best", "default"]
505 .iter()
506 .map(|s| serde_json::Value::from(*s));
507
508 let level_consts = (0u32..=21).map(serde_json::Value::from);
509
510 let valid_values = string_consts.chain(level_consts).collect();
511 Ok(generate_enum_schema(valid_values))
512 }
513}
514
515impl ToValue for CompressionLevel {
516 fn to_value(&self) -> Value {
517 serde_json::to_value(self).expect("Could not convert compression level to JSON")
519 }
520}
521
522#[cfg(test)]
523mod test {
524 use super::{Compression, CompressionLevel};
525
526 #[test]
527 fn deserialization_json() {
528 let fixtures_valid = [
529 (r#""none""#, Compression::None),
530 (r#""gzip""#, Compression::Gzip(CompressionLevel::default())),
531 (r#""zlib""#, Compression::Zlib(CompressionLevel::default())),
532 (r#""snappy""#, Compression::Snappy),
533 (r#"{"algorithm": "none"}"#, Compression::None),
534 (
535 r#"{"algorithm": "gzip"}"#,
536 Compression::Gzip(CompressionLevel::default()),
537 ),
538 (
539 r#"{"algorithm": "gzip", "level": "best"}"#,
540 Compression::Gzip(CompressionLevel::Best),
541 ),
542 (
543 r#"{"algorithm": "gzip", "level": 8}"#,
544 Compression::Gzip(CompressionLevel::Val(8)),
545 ),
546 (
547 r#"{"algorithm": "zlib"}"#,
548 Compression::Zlib(CompressionLevel::default()),
549 ),
550 (
551 r#"{"algorithm": "zlib", "level": "best"}"#,
552 Compression::Zlib(CompressionLevel::Best),
553 ),
554 (
555 r#"{"algorithm": "zlib", "level": 8}"#,
556 Compression::Zlib(CompressionLevel::Val(8)),
557 ),
558 ];
559 for (sources, result) in fixtures_valid.iter() {
560 let deserialized: Result<Compression, _> = serde_json::from_str(sources);
561 assert_eq!(deserialized.expect("valid source"), *result);
562 }
563
564 let fixtures_invalid = [
565 (
566 r"42",
567 r"invalid type: integer `42`, expected string or map at line 1 column 2",
568 ),
569 (
570 r#""b42""#,
571 r#"invalid value: string "b42", expected "none" or "gzip" or "zlib" or "zstd" at line 1 column 5"#,
572 ),
573 (
574 r#"{"algorithm": "b42"}"#,
575 r"unknown variant `b42`, expected one of `none`, `gzip`, `zlib`, `zstd`, `snappy` at line 1 column 20",
576 ),
577 (
578 r#"{"algorithm": "none", "level": "default"}"#,
579 r"unknown field `level`, there are no fields at line 1 column 41",
580 ),
581 (
582 r#"{"algorithm": "gzip", "level": -1}"#,
583 r"integer could not be converted to u32: out of range integral type conversion attempted at line 1 column 33",
584 ),
585 (
586 r#"{"algorithm": "gzip", "level": "good"}"#,
587 r#"invalid value: string "good", expected "none", "fast", "best" or "default" at line 1 column 37"#,
588 ),
589 (
590 r#"{"algorithm": "gzip", "level": {}}"#,
591 r"invalid type: map, expected unsigned number or string at line 1 column 33",
592 ),
593 (
594 r#"{"algorithm": "gzip", "level": "default", "key": 42}"#,
595 r"unknown field `key`, expected `algorithm` or `level` at line 1 column 47",
596 ),
597 (
598 r#"{"algorithm": "gzip", "level": 10}"#,
599 r"invalid value `10`, expected value in range [0, 9] at line 1 column 34",
600 ),
601 (
602 r#"{"algorithm": "zstd", "level": 22}"#,
603 r"invalid value `22`, expected value in range [0, 21] at line 1 column 34",
604 ),
605 (
606 r#"{"algorithm": "snappy", "level": 3}"#,
607 r"unknown field `level`, there are no fields at line 1 column 35",
608 ),
609 ];
610 for (source, result) in fixtures_invalid.iter() {
611 let deserialized: Result<Compression, _> = serde_json::from_str(source);
612 let error = deserialized.expect_err("invalid source");
613 assert_eq!(error.to_string().as_str(), *result);
614 }
615 }
616
617 #[test]
618 fn deserialization_toml() {
619 let fixtures_valid = [
620 (
622 r#"algorithm = "gzip"
623 level = 8"#,
624 Compression::Gzip(CompressionLevel::Val(8)),
625 ),
626 ];
627 for (sources, result) in fixtures_valid.iter() {
628 let deserialized: Result<Compression, _> = toml::from_str(sources);
629 assert_eq!(deserialized.expect("valid source"), *result);
630 }
631 }
632
633 #[test]
634 fn from_and_to_value() {
635 let fixtures_valid = [
636 Compression::None,
637 Compression::Gzip(CompressionLevel::default()),
638 Compression::Gzip(CompressionLevel::Val(7)),
639 Compression::Zlib(CompressionLevel::Best),
640 Compression::Zlib(CompressionLevel::Val(7)),
641 Compression::Zstd(CompressionLevel::Val(6)),
642 Compression::Zstd(CompressionLevel::default()),
643 Compression::Zstd(CompressionLevel::Best),
644 Compression::Zstd(CompressionLevel::Fast),
645 ];
646
647 for v in fixtures_valid {
648 let value = serde_json::to_value(v).unwrap();
650 serde_json::from_value::<Compression>(value).unwrap();
651 }
652 }
653}