vector/codecs/encoding/
config.rs

1use crate::codecs::Transformer;
2use vector_lib::codecs::{
3    encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
4    CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5};
6use vector_lib::configurable::configurable_component;
7
8/// Encoding configuration.
9#[configurable_component]
10#[derive(Clone, Debug)]
11/// Configures how events are encoded into raw bytes.
12/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
13pub struct EncodingConfig {
14    #[serde(flatten)]
15    encoding: SerializerConfig,
16
17    #[serde(flatten)]
18    transformer: Transformer,
19}
20
21impl EncodingConfig {
22    /// Creates a new `EncodingConfig` with the provided `SerializerConfig` and `Transformer`.
23    pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
24        Self {
25            encoding,
26            transformer,
27        }
28    }
29
30    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
31    pub fn transformer(&self) -> Transformer {
32        self.transformer.clone()
33    }
34
35    /// Get the encoding configuration.
36    pub const fn config(&self) -> &SerializerConfig {
37        &self.encoding
38    }
39
40    /// Build the `Serializer` for this config.
41    pub fn build(&self) -> crate::Result<Serializer> {
42        self.encoding.build()
43    }
44}
45
46impl<T> From<T> for EncodingConfig
47where
48    T: Into<SerializerConfig>,
49{
50    fn from(encoding: T) -> Self {
51        Self {
52            encoding: encoding.into(),
53            transformer: Default::default(),
54        }
55    }
56}
57
58/// Encoding configuration.
59#[configurable_component]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct EncodingConfigWithFraming {
63    #[configurable(derived)]
64    framing: Option<FramingConfig>,
65
66    #[configurable(derived)]
67    encoding: EncodingConfig,
68}
69
70impl EncodingConfigWithFraming {
71    /// Creates a new `EncodingConfigWithFraming` with the provided `FramingConfig`,
72    /// `SerializerConfig` and `Transformer`.
73    pub const fn new(
74        framing: Option<FramingConfig>,
75        encoding: SerializerConfig,
76        transformer: Transformer,
77    ) -> Self {
78        Self {
79            framing,
80            encoding: EncodingConfig {
81                encoding,
82                transformer,
83            },
84        }
85    }
86
87    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
88    pub fn transformer(&self) -> Transformer {
89        self.encoding.transformer.clone()
90    }
91
92    /// Get the encoding configuration.
93    pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
94        (&self.framing, &self.encoding.encoding)
95    }
96
97    /// Build the `Framer` and `Serializer` for this config.
98    pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
99        let framer = self.framing.as_ref().map(|framing| framing.build());
100        let serializer = self.encoding.build()?;
101
102        let framer = match (framer, &serializer) {
103            (Some(framer), _) => framer,
104            (None, Serializer::Json(_)) => match sink_type {
105                SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
106                SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
107            },
108            (None, Serializer::Avro(_) | Serializer::Native(_)) => {
109                LengthDelimitedEncoder::default().into()
110            }
111            (None, Serializer::Gelf(_)) => {
112                // Graylog/GELF always uses null byte delimiter on TCP, see
113                // https://github.com/Graylog2/graylog2-server/issues/1240
114                CharacterDelimitedEncoder::new(0).into()
115            }
116            (None, Serializer::Protobuf(_)) => {
117                // Protobuf uses length-delimited messages, see:
118                // https://developers.google.com/protocol-buffers/docs/techniques#streaming
119                LengthDelimitedEncoder::default().into()
120            }
121            (
122                None,
123                Serializer::Cef(_)
124                | Serializer::Csv(_)
125                | Serializer::Logfmt(_)
126                | Serializer::NativeJson(_)
127                | Serializer::RawMessage(_)
128                | Serializer::Text(_),
129            ) => NewlineDelimitedEncoder::default().into(),
130        };
131
132        Ok((framer, serializer))
133    }
134}
135
136/// The way a sink processes outgoing events.
137pub enum SinkType {
138    /// Events are sent in a continuous stream.
139    StreamBased,
140    /// Events are sent in a batch as a message.
141    MessageBased,
142}
143
144impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
145where
146    F: Into<FramingConfig>,
147    S: Into<SerializerConfig>,
148{
149    fn from((framing, encoding): (Option<F>, S)) -> Self {
150        Self {
151            framing: framing.map(Into::into),
152            encoding: encoding.into().into(),
153        }
154    }
155}
156
157#[cfg(test)]
158mod test {
159    use vector_lib::lookup::lookup_v2::{parse_value_path, ConfigValuePath};
160
161    use super::*;
162    use crate::codecs::encoding::TimestampFormat;
163
164    #[test]
165    fn deserialize_encoding_config() {
166        let string = r#"
167            {
168                "codec": "json",
169                "only_fields": ["a.b[0]"],
170                "except_fields": ["ignore_me"],
171                "timestamp_format": "unix"
172            }
173        "#;
174
175        let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
176        let serializer = encoding.config();
177
178        assert!(matches!(serializer, SerializerConfig::Json(_)));
179
180        let transformer = encoding.transformer();
181
182        assert_eq!(
183            transformer.only_fields(),
184            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
185        );
186        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
187        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
188    }
189
190    #[test]
191    fn deserialize_encoding_config_with_framing() {
192        let string = r#"
193            {
194                "framing": {
195                    "method": "newline_delimited"
196                },
197                "encoding": {
198                    "codec": "json",
199                    "only_fields": ["a.b[0]"],
200                    "except_fields": ["ignore_me"],
201                    "timestamp_format": "unix"
202                }
203            }
204        "#;
205
206        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
207        let (framing, serializer) = encoding.config();
208
209        assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
210        assert!(matches!(serializer, SerializerConfig::Json(_)));
211
212        let transformer = encoding.transformer();
213
214        assert_eq!(
215            transformer.only_fields(),
216            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
217        );
218        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
219        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
220    }
221
222    #[test]
223    fn deserialize_encoding_config_without_framing() {
224        let string = r#"
225            {
226                "encoding": {
227                    "codec": "json",
228                    "only_fields": ["a.b[0]"],
229                    "except_fields": ["ignore_me"],
230                    "timestamp_format": "unix"
231                }
232            }
233        "#;
234
235        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
236        let (framing, serializer) = encoding.config();
237
238        assert!(framing.is_none());
239        assert!(matches!(serializer, SerializerConfig::Json(_)));
240
241        let transformer = encoding.transformer();
242
243        assert_eq!(
244            transformer.only_fields(),
245            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
246        );
247        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
248        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
249    }
250}