vector/sinks/gcp_chronicle/
compression.rs

1use std::{cell::RefCell, collections::BTreeSet};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::{
7    Configurable, GenerateError, Metadata, ToValue,
8    attributes::CustomAttribute,
9    schema::{
10        SchemaGenerator, SchemaObject, apply_base_metadata, generate_one_of_schema,
11        generate_struct_schema, get_or_generate_schema,
12    },
13};
14
15use crate::sinks::util::{
16    Compression,
17    buffer::compression::{
18        ALGORITHM_NAME, CompressionLevel, ENUM_TAGGING_MODE, LEVEL_NAME, generate_string_schema,
19    },
20};
21
22/// Compression configuration.
23#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
24#[derivative(Default)]
25pub enum ChronicleCompression {
26    /// No compression.
27    #[derivative(Default)]
28    None,
29
30    /// [Gzip][gzip] compression.
31    ///
32    /// [gzip]: https://www.gzip.org/
33    Gzip(CompressionLevel),
34}
35
36impl From<ChronicleCompression> for Compression {
37    fn from(compression: ChronicleCompression) -> Self {
38        match compression {
39            ChronicleCompression::None => Compression::None,
40            ChronicleCompression::Gzip(compression_level) => Compression::Gzip(compression_level),
41        }
42    }
43}
44
45impl TryFrom<Compression> for ChronicleCompression {
46    type Error = String;
47
48    fn try_from(compression: Compression) -> Result<Self, Self::Error> {
49        match compression {
50            Compression::None => Ok(ChronicleCompression::None),
51            Compression::Gzip(compression_level) => {
52                Ok(ChronicleCompression::Gzip(compression_level))
53            }
54            _ => Err("Compression type is not supported by Chronicle".to_string()),
55        }
56    }
57}
58
59// Schema generation largely copied from `src/sinks/util/buffer/compression`
60impl Configurable for ChronicleCompression {
61    fn metadata() -> Metadata {
62        Compression::metadata()
63    }
64
65    fn generate_schema(
66        generator: &RefCell<SchemaGenerator>,
67    ) -> Result<SchemaObject, GenerateError> {
68        // First, we'll create the string-only subschemas for each algorithm, and wrap those up
69        // within a one-of schema.
70        let mut string_metadata = Metadata::with_description("Compression algorithm.");
71        string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
72
73        let none_string_subschema = generate_string_schema("None", None, "No compression.");
74        let gzip_string_subschema = generate_string_schema(
75            "Gzip",
76            Some("[Gzip][gzip] compression."),
77            "[gzip]: https://www.gzip.org/",
78        );
79
80        let mut all_string_oneof_subschema =
81            generate_one_of_schema(&[none_string_subschema, gzip_string_subschema]);
82        apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
83
84        let compression_level_schema =
85            get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
86
87        let mut required = BTreeSet::new();
88        required.insert(ALGORITHM_NAME.to_string());
89
90        let mut properties = IndexMap::new();
91        properties.insert(
92            ALGORITHM_NAME.to_string(),
93            all_string_oneof_subschema.clone(),
94        );
95        properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
96
97        let mut full_subschema = generate_struct_schema(properties, required, None);
98        let mut full_metadata =
99            Metadata::with_description("Compression algorithm and compression level.");
100        full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
101        apply_base_metadata(&mut full_subschema, full_metadata);
102
103        Ok(generate_one_of_schema(&[
104            all_string_oneof_subschema,
105            full_subschema,
106        ]))
107    }
108}
109
110impl ToValue for ChronicleCompression {
111    fn to_value(&self) -> Value {
112        serde_json::to_value(Compression::from(*self))
113            .expect("Could not convert compression settings to JSON")
114    }
115}
116
117impl<'de> de::Deserialize<'de> for ChronicleCompression {
118    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
119    where
120        D: de::Deserializer<'de>,
121    {
122        Compression::deserialize(deserializer)
123            .and_then(|x| ChronicleCompression::try_from(x).map_err(de::Error::custom))
124    }
125}
126
127impl ser::Serialize for ChronicleCompression {
128    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129    where
130        S: ser::Serializer,
131    {
132        Compression::serialize(&Compression::from(*self), serializer)
133    }
134}