vector/codecs/encoding/
config.rs

1use crate::codecs::Transformer;
2use vector_lib::{
3    codecs::{
4        CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5        encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
6    },
7    configurable::configurable_component,
8};
9
10#[cfg(feature = "codecs-opentelemetry")]
11use vector_lib::codecs::BytesEncoder;
12
13/// Encoding configuration.
14#[configurable_component]
15#[derive(Clone, Debug)]
16/// Configures how events are encoded into raw bytes.
17/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
18pub struct EncodingConfig {
19    #[serde(flatten)]
20    encoding: SerializerConfig,
21
22    #[serde(flatten)]
23    transformer: Transformer,
24}
25
26impl EncodingConfig {
27    /// Creates a new `EncodingConfig` with the provided `SerializerConfig` and `Transformer`.
28    pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
29        Self {
30            encoding,
31            transformer,
32        }
33    }
34
35    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
36    pub fn transformer(&self) -> Transformer {
37        self.transformer.clone()
38    }
39
40    /// Get the encoding configuration.
41    pub const fn config(&self) -> &SerializerConfig {
42        &self.encoding
43    }
44
45    /// Build the `Serializer` for this config.
46    pub fn build(&self) -> crate::Result<Serializer> {
47        self.encoding.build()
48    }
49}
50
51impl<T> From<T> for EncodingConfig
52where
53    T: Into<SerializerConfig>,
54{
55    fn from(encoding: T) -> Self {
56        Self {
57            encoding: encoding.into(),
58            transformer: Default::default(),
59        }
60    }
61}
62
63/// Encoding configuration.
64#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct EncodingConfigWithFraming {
68    #[configurable(derived)]
69    framing: Option<FramingConfig>,
70
71    #[configurable(derived)]
72    encoding: EncodingConfig,
73}
74
75impl EncodingConfigWithFraming {
76    /// Creates a new `EncodingConfigWithFraming` with the provided `FramingConfig`,
77    /// `SerializerConfig` and `Transformer`.
78    pub const fn new(
79        framing: Option<FramingConfig>,
80        encoding: SerializerConfig,
81        transformer: Transformer,
82    ) -> Self {
83        Self {
84            framing,
85            encoding: EncodingConfig {
86                encoding,
87                transformer,
88            },
89        }
90    }
91
92    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
93    pub fn transformer(&self) -> Transformer {
94        self.encoding.transformer.clone()
95    }
96
97    /// Get the encoding configuration.
98    pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
99        (&self.framing, &self.encoding.encoding)
100    }
101
102    /// Build the `Framer` and `Serializer` for this config.
103    pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
104        let framer = self.framing.as_ref().map(|framing| framing.build());
105        let serializer = self.encoding.build()?;
106
107        let framer = match (framer, &serializer) {
108            (Some(framer), _) => framer,
109            (None, Serializer::Json(_)) => match sink_type {
110                SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
111                SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
112            },
113            (None, Serializer::Avro(_) | Serializer::Native(_)) => {
114                LengthDelimitedEncoder::default().into()
115            }
116            (None, Serializer::Gelf(_)) => {
117                // Graylog/GELF always uses null byte delimiter on TCP, see
118                // https://github.com/Graylog2/graylog2-server/issues/1240
119                CharacterDelimitedEncoder::new(0).into()
120            }
121            (None, Serializer::Protobuf(_)) => {
122                // Protobuf uses length-delimited messages, see:
123                // https://developers.google.com/protocol-buffers/docs/techniques#streaming
124                LengthDelimitedEncoder::default().into()
125            }
126            (
127                None,
128                Serializer::Cef(_)
129                | Serializer::Csv(_)
130                | Serializer::Logfmt(_)
131                | Serializer::NativeJson(_)
132                | Serializer::RawMessage(_)
133                | Serializer::Text(_),
134            ) => NewlineDelimitedEncoder::default().into(),
135            #[cfg(feature = "codecs-opentelemetry")]
136            (None, Serializer::Otlp(_)) => BytesEncoder.into(),
137        };
138
139        Ok((framer, serializer))
140    }
141}
142
143/// The way a sink processes outgoing events.
144pub enum SinkType {
145    /// Events are sent in a continuous stream.
146    StreamBased,
147    /// Events are sent in a batch as a message.
148    MessageBased,
149}
150
151impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
152where
153    F: Into<FramingConfig>,
154    S: Into<SerializerConfig>,
155{
156    fn from((framing, encoding): (Option<F>, S)) -> Self {
157        Self {
158            framing: framing.map(Into::into),
159            encoding: encoding.into().into(),
160        }
161    }
162}
163
164#[cfg(test)]
165mod test {
166    use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
167
168    use super::*;
169    use crate::codecs::encoding::TimestampFormat;
170
171    #[test]
172    fn deserialize_encoding_config() {
173        let string = r#"
174            {
175                "codec": "json",
176                "only_fields": ["a.b[0]"],
177                "except_fields": ["ignore_me"],
178                "timestamp_format": "unix"
179            }
180        "#;
181
182        let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
183        let serializer = encoding.config();
184
185        assert!(matches!(serializer, SerializerConfig::Json(_)));
186
187        let transformer = encoding.transformer();
188
189        assert_eq!(
190            transformer.only_fields(),
191            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
192        );
193        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
194        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
195    }
196
197    #[test]
198    fn deserialize_encoding_config_with_framing() {
199        let string = r#"
200            {
201                "framing": {
202                    "method": "newline_delimited"
203                },
204                "encoding": {
205                    "codec": "json",
206                    "only_fields": ["a.b[0]"],
207                    "except_fields": ["ignore_me"],
208                    "timestamp_format": "unix"
209                }
210            }
211        "#;
212
213        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
214        let (framing, serializer) = encoding.config();
215
216        assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
217        assert!(matches!(serializer, SerializerConfig::Json(_)));
218
219        let transformer = encoding.transformer();
220
221        assert_eq!(
222            transformer.only_fields(),
223            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
224        );
225        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
226        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
227    }
228
229    #[test]
230    fn deserialize_encoding_config_without_framing() {
231        let string = r#"
232            {
233                "encoding": {
234                    "codec": "json",
235                    "only_fields": ["a.b[0]"],
236                    "except_fields": ["ignore_me"],
237                    "timestamp_format": "unix"
238                }
239            }
240        "#;
241
242        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
243        let (framing, serializer) = encoding.config();
244
245        assert!(framing.is_none());
246        assert!(matches!(serializer, SerializerConfig::Json(_)));
247
248        let transformer = encoding.transformer();
249
250        assert_eq!(
251            transformer.only_fields(),
252            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
253        );
254        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
255        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
256    }
257}