vector/codecs/encoding/
config.rs

1use vector_lib::{
2    codecs::{
3        CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
4        encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
5    },
6    configurable::configurable_component,
7};
8
9use crate::codecs::Transformer;
10
11/// Encoding configuration.
12#[configurable_component]
13#[derive(Clone, Debug)]
14/// Configures how events are encoded into raw bytes.
15/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
16pub struct EncodingConfig {
17    #[serde(flatten)]
18    encoding: SerializerConfig,
19
20    #[serde(flatten)]
21    transformer: Transformer,
22}
23
24impl EncodingConfig {
25    /// Creates a new `EncodingConfig` with the provided `SerializerConfig` and `Transformer`.
26    pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
27        Self {
28            encoding,
29            transformer,
30        }
31    }
32
33    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
34    pub fn transformer(&self) -> Transformer {
35        self.transformer.clone()
36    }
37
38    /// Get the encoding configuration.
39    pub const fn config(&self) -> &SerializerConfig {
40        &self.encoding
41    }
42
43    /// Build the `Serializer` for this config.
44    pub fn build(&self) -> crate::Result<Serializer> {
45        self.encoding.build()
46    }
47}
48
49impl<T> From<T> for EncodingConfig
50where
51    T: Into<SerializerConfig>,
52{
53    fn from(encoding: T) -> Self {
54        Self {
55            encoding: encoding.into(),
56            transformer: Default::default(),
57        }
58    }
59}
60
61/// Encoding configuration.
62#[configurable_component]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct EncodingConfigWithFraming {
66    #[configurable(derived)]
67    framing: Option<FramingConfig>,
68
69    #[configurable(derived)]
70    encoding: EncodingConfig,
71}
72
73impl EncodingConfigWithFraming {
74    /// Creates a new `EncodingConfigWithFraming` with the provided `FramingConfig`,
75    /// `SerializerConfig` and `Transformer`.
76    pub const fn new(
77        framing: Option<FramingConfig>,
78        encoding: SerializerConfig,
79        transformer: Transformer,
80    ) -> Self {
81        Self {
82            framing,
83            encoding: EncodingConfig {
84                encoding,
85                transformer,
86            },
87        }
88    }
89
90    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
91    pub fn transformer(&self) -> Transformer {
92        self.encoding.transformer.clone()
93    }
94
95    /// Get the encoding configuration.
96    pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
97        (&self.framing, &self.encoding.encoding)
98    }
99
100    /// Build the `Framer` and `Serializer` for this config.
101    pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
102        let framer = self.framing.as_ref().map(|framing| framing.build());
103        let serializer = self.encoding.build()?;
104
105        let framer = match (framer, &serializer) {
106            (Some(framer), _) => framer,
107            (None, Serializer::Json(_)) => match sink_type {
108                SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
109                SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
110            },
111            (None, Serializer::Avro(_) | Serializer::Native(_)) => {
112                LengthDelimitedEncoder::default().into()
113            }
114            (None, Serializer::Gelf(_)) => {
115                // Graylog/GELF always uses null byte delimiter on TCP, see
116                // https://github.com/Graylog2/graylog2-server/issues/1240
117                CharacterDelimitedEncoder::new(0).into()
118            }
119            (None, Serializer::Protobuf(_)) => {
120                // Protobuf uses length-delimited messages, see:
121                // https://developers.google.com/protocol-buffers/docs/techniques#streaming
122                LengthDelimitedEncoder::default().into()
123            }
124            (
125                None,
126                Serializer::Cef(_)
127                | Serializer::Csv(_)
128                | Serializer::Logfmt(_)
129                | Serializer::NativeJson(_)
130                | Serializer::RawMessage(_)
131                | Serializer::Text(_),
132            ) => NewlineDelimitedEncoder::default().into(),
133        };
134
135        Ok((framer, serializer))
136    }
137}
138
139/// The way a sink processes outgoing events.
140pub enum SinkType {
141    /// Events are sent in a continuous stream.
142    StreamBased,
143    /// Events are sent in a batch as a message.
144    MessageBased,
145}
146
147impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
148where
149    F: Into<FramingConfig>,
150    S: Into<SerializerConfig>,
151{
152    fn from((framing, encoding): (Option<F>, S)) -> Self {
153        Self {
154            framing: framing.map(Into::into),
155            encoding: encoding.into().into(),
156        }
157    }
158}
159
160#[cfg(test)]
161mod test {
162    use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
163
164    use super::*;
165    use crate::codecs::encoding::TimestampFormat;
166
167    #[test]
168    fn deserialize_encoding_config() {
169        let string = r#"
170            {
171                "codec": "json",
172                "only_fields": ["a.b[0]"],
173                "except_fields": ["ignore_me"],
174                "timestamp_format": "unix"
175            }
176        "#;
177
178        let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
179        let serializer = encoding.config();
180
181        assert!(matches!(serializer, SerializerConfig::Json(_)));
182
183        let transformer = encoding.transformer();
184
185        assert_eq!(
186            transformer.only_fields(),
187            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
188        );
189        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
190        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
191    }
192
193    #[test]
194    fn deserialize_encoding_config_with_framing() {
195        let string = r#"
196            {
197                "framing": {
198                    "method": "newline_delimited"
199                },
200                "encoding": {
201                    "codec": "json",
202                    "only_fields": ["a.b[0]"],
203                    "except_fields": ["ignore_me"],
204                    "timestamp_format": "unix"
205                }
206            }
207        "#;
208
209        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
210        let (framing, serializer) = encoding.config();
211
212        assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
213        assert!(matches!(serializer, SerializerConfig::Json(_)));
214
215        let transformer = encoding.transformer();
216
217        assert_eq!(
218            transformer.only_fields(),
219            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
220        );
221        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
222        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
223    }
224
225    #[test]
226    fn deserialize_encoding_config_without_framing() {
227        let string = r#"
228            {
229                "encoding": {
230                    "codec": "json",
231                    "only_fields": ["a.b[0]"],
232                    "except_fields": ["ignore_me"],
233                    "timestamp_format": "unix"
234                }
235            }
236        "#;
237
238        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
239        let (framing, serializer) = encoding.config();
240
241        assert!(framing.is_none());
242        assert!(matches!(serializer, SerializerConfig::Json(_)));
243
244        let transformer = encoding.transformer();
245
246        assert_eq!(
247            transformer.only_fields(),
248            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
249        );
250        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
251        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
252    }
253}