vector/sinks/gcp_chronicle/
compression.rs1use std::{cell::RefCell, collections::BTreeSet};
2
3use indexmap::IndexMap;
4use serde::{de, ser};
5use serde_json::Value;
6use vector_lib::configurable::{
7 Configurable, GenerateError, Metadata, ToValue,
8 attributes::CustomAttribute,
9 schema::{
10 SchemaGenerator, SchemaObject, apply_base_metadata, generate_one_of_schema,
11 generate_struct_schema, get_or_generate_schema,
12 },
13};
14
15use crate::sinks::util::{
16 Compression,
17 buffer::compression::{
18 ALGORITHM_NAME, CompressionLevel, ENUM_TAGGING_MODE, LEVEL_NAME, generate_string_schema,
19 },
20};
21
22#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
24#[derivative(Default)]
25pub enum ChronicleCompression {
26 #[derivative(Default)]
28 None,
29
30 Gzip(CompressionLevel),
34}
35
36impl From<ChronicleCompression> for Compression {
37 fn from(compression: ChronicleCompression) -> Self {
38 match compression {
39 ChronicleCompression::None => Compression::None,
40 ChronicleCompression::Gzip(compression_level) => Compression::Gzip(compression_level),
41 }
42 }
43}
44
45impl TryFrom<Compression> for ChronicleCompression {
46 type Error = String;
47
48 fn try_from(compression: Compression) -> Result<Self, Self::Error> {
49 match compression {
50 Compression::None => Ok(ChronicleCompression::None),
51 Compression::Gzip(compression_level) => {
52 Ok(ChronicleCompression::Gzip(compression_level))
53 }
54 _ => Err("Compression type is not supported by Chronicle".to_string()),
55 }
56 }
57}
58
59impl Configurable for ChronicleCompression {
61 fn metadata() -> Metadata {
62 Compression::metadata()
63 }
64
65 fn generate_schema(
66 generator: &RefCell<SchemaGenerator>,
67 ) -> Result<SchemaObject, GenerateError> {
68 let mut string_metadata = Metadata::with_description("Compression algorithm.");
71 string_metadata.add_custom_attribute(CustomAttribute::kv(ENUM_TAGGING_MODE, "external"));
72
73 let none_string_subschema = generate_string_schema("None", None, "No compression.");
74 let gzip_string_subschema = generate_string_schema(
75 "Gzip",
76 Some("[Gzip][gzip] compression."),
77 "[gzip]: https://www.gzip.org/",
78 );
79
80 let mut all_string_oneof_subschema =
81 generate_one_of_schema(&[none_string_subschema, gzip_string_subschema]);
82 apply_base_metadata(&mut all_string_oneof_subschema, string_metadata);
83
84 let compression_level_schema =
85 get_or_generate_schema(&CompressionLevel::as_configurable_ref(), generator, None)?;
86
87 let mut required = BTreeSet::new();
88 required.insert(ALGORITHM_NAME.to_string());
89
90 let mut properties = IndexMap::new();
91 properties.insert(
92 ALGORITHM_NAME.to_string(),
93 all_string_oneof_subschema.clone(),
94 );
95 properties.insert(LEVEL_NAME.to_string(), compression_level_schema);
96
97 let mut full_subschema = generate_struct_schema(properties, required, None);
98 let mut full_metadata =
99 Metadata::with_description("Compression algorithm and compression level.");
100 full_metadata.add_custom_attribute(CustomAttribute::flag("docs::hidden"));
101 apply_base_metadata(&mut full_subschema, full_metadata);
102
103 Ok(generate_one_of_schema(&[
104 all_string_oneof_subschema,
105 full_subschema,
106 ]))
107 }
108}
109
110impl ToValue for ChronicleCompression {
111 fn to_value(&self) -> Value {
112 serde_json::to_value(Compression::from(*self))
113 .expect("Could not convert compression settings to JSON")
114 }
115}
116
117impl<'de> de::Deserialize<'de> for ChronicleCompression {
118 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
119 where
120 D: de::Deserializer<'de>,
121 {
122 Compression::deserialize(deserializer)
123 .and_then(|x| ChronicleCompression::try_from(x).map_err(de::Error::custom))
124 }
125}
126
127impl ser::Serialize for ChronicleCompression {
128 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129 where
130 S: ser::Serializer,
131 {
132 Compression::serialize(&Compression::from(*self), serializer)
133 }
134}