vector/sinks/gcp_chronicle/
compression.rs1use serde::{de, ser};
2use serde_json::Value;
3use std::{cell::RefCell, collections::BTreeSet};
4use vector_lib::configurable::ToValue;
5
6use indexmap::IndexMap;
7use vector_lib::configurable::attributes::CustomAttribute;
8use vector_lib::configurable::{
9 schema::{
10 apply_base_metadata, generate_one_of_schema, generate_struct_schema,
11 get_or_generate_schema, SchemaGenerator, SchemaObject,
12 },
13 Configurable, GenerateError, Metadata,
14};
15
16use crate::sinks::util::buffer::compression::{
17 generate_string_schema, CompressionLevel, ALGORITHM_NAME, ENUM_TAGGING_MODE, LEVEL_NAME,
18};
19use crate::sinks::util::Compression;
20
21#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
23#[derivative(Default)]
24pub enum ChronicleCompression {
25 #[derivative(Default)]
27 None,
28
29 Gzip(CompressionLevel),
33}
34
35impl From<ChronicleCompression> for Compression {
36 fn from(compression: ChronicleCompression) -> Self {
37 match compression {
38 ChronicleCompression::None => Compression::None,
39 ChronicleCompression::Gzip(compression_level) => Compression::Gzip(compression_level),
40 }
41 }
42}
43
44impl TryFrom<Compression> for ChronicleCompression {
45 type Error = String;
46
47 fn try_from(compression: Compression) -> Result<Self, Self::Error> {
48 match compression {
49 Compression::None => Ok(ChronicleCompression::None),
50 Compression::Gzip(compression_level) => {
51 Ok(ChronicleCompression::Gzip(compression_level))
52 }
53 _ => Err("Compression type is not supported by Chronicle".to_string()),
54 }
55 }
56}
57
58impl Configurable for ChronicleCompression {
60 fn metadata() -> Metadata {
61 Compression::metadata()
62 }
63
64 fn generate_schema(
65 generator: &RefCell<SchemaGenerator>,
66 ) -> Result<SchemaObject, GenerateError> {
67 let mut string_metadata = Metadata::with_description("Compression algorithm.");
70 string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
71
72 let none_string_subschema = generate_string_schema("None", None, "No compression.");
73 let gzip_string_subschema = generate_string_schema(
74 "Gzip",
75 Some("[Gzip][gzip] compression."),
76 "[gzip]: https://www.gzip.org/",
77 );
78
79 let mut all_string_oneof_subschema =
80 generate_one_of_schema(&[none_string_subschema, gzip_string_subschema]);
81 apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
82
83 let compression_level_schema =
84 get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
85
86 let mut required = BTreeSet::new();
87 required.insert(ALGORITHM_NAME.to_string());
88
89 let mut properties = IndexMap::new();
90 properties.insert(
91 ALGORITHM_NAME.to_string(),
92 all_string_oneof_subschema.clone(),
93 );
94 properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
95
96 let mut full_subschema = generate_struct_schema(properties, required, None);
97 let mut full_metadata =
98 Metadata::with_description("Compression algorithm and compression level.");
99 full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
100 apply_base_metadata(&mut full_subschema, full_metadata);
101
102 Ok(generate_one_of_schema(&[
103 all_string_oneof_subschema,
104 full_subschema,
105 ]))
106 }
107}
108
109impl ToValue for ChronicleCompression {
110 fn to_value(&self) -> Value {
111 serde_json::to_value(Compression::from(*self))
112 .expect("Could not convert compression settings to JSON")
113 }
114}
115
116impl<'de> de::Deserialize<'de> for ChronicleCompression {
117 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
118 where
119 D: de::Deserializer<'de>,
120 {
121 Compression::deserialize(deserializer)
122 .and_then(|x| ChronicleCompression::try_from(x).map_err(de::Error::custom))
123 }
124}
125
126impl ser::Serialize for ChronicleCompression {
127 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
128 where
129 S: ser::Serializer,
130 {
131 Compression::serialize(&Compression::from(*self), serializer)
132 }
133}