vector/codecs/encoding/
config.rs

1use crate::codecs::{Encoder, EncoderKind, Transformer};
2use vector_lib::{
3    codecs::{
4        CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5        encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
6    },
7    configurable::configurable_component,
8};
9
10#[cfg(feature = "codecs-opentelemetry")]
11use vector_lib::codecs::BytesEncoder;
12
13/// Encoding configuration.
14#[configurable_component]
15#[derive(Clone, Debug)]
16/// Configures how events are encoded into raw bytes.
17/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
18pub struct EncodingConfig {
19    #[serde(flatten)]
20    encoding: SerializerConfig,
21
22    #[serde(flatten)]
23    transformer: Transformer,
24}
25
26impl EncodingConfig {
27    /// Creates a new `EncodingConfig` with the provided `SerializerConfig` and `Transformer`.
28    pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
29        Self {
30            encoding,
31            transformer,
32        }
33    }
34
35    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
36    pub fn transformer(&self) -> Transformer {
37        self.transformer.clone()
38    }
39
40    /// Get the encoding configuration.
41    pub const fn config(&self) -> &SerializerConfig {
42        &self.encoding
43    }
44
45    /// Build the `Serializer` for this config.
46    pub fn build(&self) -> crate::Result<Serializer> {
47        self.encoding.build()
48    }
49}
50
51impl<T> From<T> for EncodingConfig
52where
53    T: Into<SerializerConfig>,
54{
55    fn from(encoding: T) -> Self {
56        Self {
57            encoding: encoding.into(),
58            transformer: Default::default(),
59        }
60    }
61}
62
63/// Encoding configuration.
64#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct EncodingConfigWithFraming {
68    #[configurable(derived)]
69    framing: Option<FramingConfig>,
70
71    #[configurable(derived)]
72    encoding: EncodingConfig,
73}
74
75impl EncodingConfigWithFraming {
76    /// Creates a new `EncodingConfigWithFraming` with the provided `FramingConfig`,
77    /// `SerializerConfig` and `Transformer`.
78    pub const fn new(
79        framing: Option<FramingConfig>,
80        encoding: SerializerConfig,
81        transformer: Transformer,
82    ) -> Self {
83        Self {
84            framing,
85            encoding: EncodingConfig {
86                encoding,
87                transformer,
88            },
89        }
90    }
91
92    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
93    pub fn transformer(&self) -> Transformer {
94        self.encoding.transformer.clone()
95    }
96
97    /// Get the encoding configuration.
98    pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
99        (&self.framing, &self.encoding.encoding)
100    }
101
102    /// Build the `Framer` and `Serializer` for this config.
103    pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
104        let framer = self.framing.as_ref().map(|framing| framing.build());
105        let serializer = self.encoding.build()?;
106
107        let framer = match (framer, &serializer) {
108            (Some(framer), _) => framer,
109            (None, Serializer::Json(_)) => match sink_type {
110                SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
111                SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
112            },
113            (None, Serializer::Avro(_) | Serializer::Native(_)) => {
114                LengthDelimitedEncoder::default().into()
115            }
116            (None, Serializer::Gelf(_)) => {
117                // Graylog/GELF always uses null byte delimiter on TCP, see
118                // https://github.com/Graylog2/graylog2-server/issues/1240
119                CharacterDelimitedEncoder::new(0).into()
120            }
121            (None, Serializer::Protobuf(_)) => {
122                // Protobuf uses length-delimited messages, see:
123                // https://developers.google.com/protocol-buffers/docs/techniques#streaming
124                LengthDelimitedEncoder::default().into()
125            }
126            (
127                None,
128                Serializer::Cef(_)
129                | Serializer::Csv(_)
130                | Serializer::Logfmt(_)
131                | Serializer::NativeJson(_)
132                | Serializer::RawMessage(_)
133                | Serializer::Text(_),
134            ) => NewlineDelimitedEncoder::default().into(),
135            #[cfg(feature = "codecs-opentelemetry")]
136            (None, Serializer::Otlp(_)) => BytesEncoder.into(),
137        };
138
139        Ok((framer, serializer))
140    }
141
142    /// Build the `Transformer` and `EncoderKind` for this config.
143    pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> {
144        let (framer, serializer) = self.build(sink_type)?;
145        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
146        Ok((self.transformer(), encoder))
147    }
148}
149
150/// The way a sink processes outgoing events.
151pub enum SinkType {
152    /// Events are sent in a continuous stream.
153    StreamBased,
154    /// Events are sent in a batch as a message.
155    MessageBased,
156}
157
158impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
159where
160    F: Into<FramingConfig>,
161    S: Into<SerializerConfig>,
162{
163    fn from((framing, encoding): (Option<F>, S)) -> Self {
164        Self {
165            framing: framing.map(Into::into),
166            encoding: encoding.into().into(),
167        }
168    }
169}
170
171#[cfg(test)]
172mod test {
173    use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
174
175    use super::*;
176    use crate::codecs::encoding::TimestampFormat;
177
178    #[test]
179    fn deserialize_encoding_config() {
180        let string = r#"
181            {
182                "codec": "json",
183                "only_fields": ["a.b[0]"],
184                "except_fields": ["ignore_me"],
185                "timestamp_format": "unix"
186            }
187        "#;
188
189        let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
190        let serializer = encoding.config();
191
192        assert!(matches!(serializer, SerializerConfig::Json(_)));
193
194        let transformer = encoding.transformer();
195
196        assert_eq!(
197            transformer.only_fields(),
198            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
199        );
200        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
201        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
202    }
203
204    #[test]
205    fn deserialize_encoding_config_with_framing() {
206        let string = r#"
207            {
208                "framing": {
209                    "method": "newline_delimited"
210                },
211                "encoding": {
212                    "codec": "json",
213                    "only_fields": ["a.b[0]"],
214                    "except_fields": ["ignore_me"],
215                    "timestamp_format": "unix"
216                }
217            }
218        "#;
219
220        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
221        let (framing, serializer) = encoding.config();
222
223        assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
224        assert!(matches!(serializer, SerializerConfig::Json(_)));
225
226        let transformer = encoding.transformer();
227
228        assert_eq!(
229            transformer.only_fields(),
230            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
231        );
232        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
233        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
234    }
235
236    #[test]
237    fn deserialize_encoding_config_without_framing() {
238        let string = r#"
239            {
240                "encoding": {
241                    "codec": "json",
242                    "only_fields": ["a.b[0]"],
243                    "except_fields": ["ignore_me"],
244                    "timestamp_format": "unix"
245                }
246            }
247        "#;
248
249        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
250        let (framing, serializer) = encoding.config();
251
252        assert!(framing.is_none());
253        assert!(matches!(serializer, SerializerConfig::Json(_)));
254
255        let transformer = encoding.transformer();
256
257        assert_eq!(
258            transformer.only_fields(),
259            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
260        );
261        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
262        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
263    }
264}