vector/sinks/gcp_chronicle/
compression.rs

1use serde::{de, ser};
2use serde_json::Value;
3use std::{cell::RefCell, collections::BTreeSet};
4use vector_lib::configurable::ToValue;
5
6use indexmap::IndexMap;
7use vector_lib::configurable::attributes::CustomAttribute;
8use vector_lib::configurable::{
9    schema::{
10        apply_base_metadata, generate_one_of_schema, generate_struct_schema,
11        get_or_generate_schema, SchemaGenerator, SchemaObject,
12    },
13    Configurable, GenerateError, Metadata,
14};
15
16use crate::sinks::util::buffer::compression::{
17    generate_string_schema, CompressionLevel, ALGORITHM_NAME, ENUM_TAGGING_MODE, LEVEL_NAME,
18};
19use crate::sinks::util::Compression;
20
21/// Compression configuration.
22#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
23#[derivative(Default)]
24pub enum ChronicleCompression {
25    /// No compression.
26    #[derivative(Default)]
27    None,
28
29    /// [Gzip][gzip] compression.
30    ///
31    /// [gzip]: https://www.gzip.org/
32    Gzip(CompressionLevel),
33}
34
35impl From<ChronicleCompression> for Compression {
36    fn from(compression: ChronicleCompression) -> Self {
37        match compression {
38            ChronicleCompression::None => Compression::None,
39            ChronicleCompression::Gzip(compression_level) => Compression::Gzip(compression_level),
40        }
41    }
42}
43
44impl TryFrom<Compression> for ChronicleCompression {
45    type Error = String;
46
47    fn try_from(compression: Compression) -> Result<Self, Self::Error> {
48        match compression {
49            Compression::None => Ok(ChronicleCompression::None),
50            Compression::Gzip(compression_level) => {
51                Ok(ChronicleCompression::Gzip(compression_level))
52            }
53            _ => Err("Compression type is not supported by Chronicle".to_string()),
54        }
55    }
56}
57
58// Schema generation largely copied from `src/sinks/util/buffer/compression`
59impl Configurable for ChronicleCompression {
60    fn metadata() -> Metadata {
61        Compression::metadata()
62    }
63
64    fn generate_schema(
65        generator: &RefCell<SchemaGenerator>,
66    ) -> Result<SchemaObject, GenerateError> {
67        // First, we'll create the string-only subschemas for each algorithm, and wrap those up
68        // within a one-of schema.
69        let mut string_metadata = Metadata::with_description("Compression algorithm.");
70        string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
71
72        let none_string_subschema = generate_string_schema("None", None, "No compression.");
73        let gzip_string_subschema = generate_string_schema(
74            "Gzip",
75            Some("[Gzip][gzip] compression."),
76            "[gzip]: https://www.gzip.org/",
77        );
78
79        let mut all_string_oneof_subschema =
80            generate_one_of_schema(&[none_string_subschema, gzip_string_subschema]);
81        apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
82
83        let compression_level_schema =
84            get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
85
86        let mut required = BTreeSet::new();
87        required.insert(ALGORITHM_NAME.to_string());
88
89        let mut properties = IndexMap::new();
90        properties.insert(
91            ALGORITHM_NAME.to_string(),
92            all_string_oneof_subschema.clone(),
93        );
94        properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
95
96        let mut full_subschema = generate_struct_schema(properties, required, None);
97        let mut full_metadata =
98            Metadata::with_description("Compression algorithm and compression level.");
99        full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
100        apply_base_metadata(&mut full_subschema, full_metadata);
101
102        Ok(generate_one_of_schema(&[
103            all_string_oneof_subschema,
104            full_subschema,
105        ]))
106    }
107}
108
109impl ToValue for ChronicleCompression {
110    fn to_value(&self) -> Value {
111        serde_json::to_value(Compression::from(*self))
112            .expect("Could not convert compression settings to JSON")
113    }
114}
115
116impl<'de> de::Deserialize<'de> for ChronicleCompression {
117    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
118    where
119        D: de::Deserializer<'de>,
120    {
121        Compression::deserialize(deserializer)
122            .and_then(|x| ChronicleCompression::try_from(x).map_err(de::Error::custom))
123    }
124}
125
126impl ser::Serialize for ChronicleCompression {
127    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
128    where
129        S: ser::Serializer,
130    {
131        Compression::serialize(&Compression::from(*self), serializer)
132    }
133}