codecs/encoding/
config.rs

1use vector_config::configurable_component;
2
3use super::{Encoder, EncoderKind, Transformer};
4use crate::encoding::{
5    CharacterDelimitedEncoder, Framer, FramingConfig, LengthDelimitedEncoder,
6    NewlineDelimitedEncoder, Serializer, SerializerConfig,
7};
8
9#[cfg(feature = "opentelemetry")]
10use crate::encoding::BytesEncoder;
11
12/// Encoding configuration.
13#[configurable_component]
14#[derive(Clone, Debug)]
15/// Configures how events are encoded into raw bytes.
16/// The selected encoding also determines which input types (logs, metrics, traces) are supported.
17pub struct EncodingConfig {
18    #[serde(flatten)]
19    encoding: SerializerConfig,
20
21    #[serde(flatten)]
22    transformer: Transformer,
23}
24
25impl EncodingConfig {
26    /// Creates a new `EncodingConfig` with the provided `SerializerConfig` and `Transformer`.
27    pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
28        Self {
29            encoding,
30            transformer,
31        }
32    }
33
34    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
35    pub fn transformer(&self) -> Transformer {
36        self.transformer.clone()
37    }
38
39    /// Get the encoding configuration.
40    pub const fn config(&self) -> &SerializerConfig {
41        &self.encoding
42    }
43
44    /// Build the `Serializer` for this config.
45    pub fn build(&self) -> vector_common::Result<Serializer> {
46        self.encoding.build()
47    }
48}
49
50impl<T> From<T> for EncodingConfig
51where
52    T: Into<SerializerConfig>,
53{
54    fn from(encoding: T) -> Self {
55        Self {
56            encoding: encoding.into(),
57            transformer: Default::default(),
58        }
59    }
60}
61
62/// Encoding configuration.
63#[configurable_component]
64#[derive(Clone, Debug)]
65#[serde(deny_unknown_fields)]
66pub struct EncodingConfigWithFraming {
67    #[configurable(derived)]
68    framing: Option<FramingConfig>,
69
70    #[configurable(derived)]
71    encoding: EncodingConfig,
72}
73
74impl EncodingConfigWithFraming {
75    /// Creates a new `EncodingConfigWithFraming` with the provided `FramingConfig`,
76    /// `SerializerConfig` and `Transformer`.
77    pub const fn new(
78        framing: Option<FramingConfig>,
79        encoding: SerializerConfig,
80        transformer: Transformer,
81    ) -> Self {
82        Self {
83            framing,
84            encoding: EncodingConfig {
85                encoding,
86                transformer,
87            },
88        }
89    }
90
91    /// Build a `Transformer` that applies the encoding rules to an event before serialization.
92    pub fn transformer(&self) -> Transformer {
93        self.encoding.transformer.clone()
94    }
95
96    /// Get the encoding configuration.
97    pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
98        (&self.framing, &self.encoding.encoding)
99    }
100
101    /// Build the `Framer` and `Serializer` for this config.
102    pub fn build(&self, sink_type: SinkType) -> vector_common::Result<(Framer, Serializer)> {
103        let framer = self.framing.as_ref().map(|framing| framing.build());
104        let serializer = self.encoding.build()?;
105
106        let framer = match (framer, &serializer) {
107            (Some(framer), _) => framer,
108            (None, Serializer::Json(_)) => match sink_type {
109                SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
110                SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
111            },
112            (None, Serializer::Avro(_) | Serializer::Native(_)) => {
113                LengthDelimitedEncoder::default().into()
114            }
115            (None, Serializer::Gelf(_)) => {
116                // Graylog/GELF always uses null byte delimiter on TCP, see
117                // https://github.com/Graylog2/graylog2-server/issues/1240
118                CharacterDelimitedEncoder::new(0).into()
119            }
120            (None, Serializer::Protobuf(_)) => {
121                // Protobuf uses length-delimited messages, see:
122                // https://developers.google.com/protocol-buffers/docs/techniques#streaming
123                LengthDelimitedEncoder::default().into()
124            }
125            (
126                None,
127                Serializer::Cef(_)
128                | Serializer::Csv(_)
129                | Serializer::Logfmt(_)
130                | Serializer::NativeJson(_)
131                | Serializer::RawMessage(_)
132                | Serializer::Text(_),
133            ) => NewlineDelimitedEncoder::default().into(),
134            #[cfg(feature = "syslog")]
135            (None, Serializer::Syslog(_)) => NewlineDelimitedEncoder::default().into(),
136            #[cfg(feature = "opentelemetry")]
137            (None, Serializer::Otlp(_)) => BytesEncoder.into(),
138        };
139
140        Ok((framer, serializer))
141    }
142
143    /// Build the `Transformer` and `EncoderKind` for this config.
144    pub fn build_encoder(
145        &self,
146        sink_type: SinkType,
147    ) -> vector_common::Result<(Transformer, EncoderKind)> {
148        let (framer, serializer) = self.build(sink_type)?;
149        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
150        Ok((self.transformer(), encoder))
151    }
152}
153
154/// The way a sink processes outgoing events.
155pub enum SinkType {
156    /// Events are sent in a continuous stream.
157    StreamBased,
158    /// Events are sent in a batch as a message.
159    MessageBased,
160}
161
162impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
163where
164    F: Into<FramingConfig>,
165    S: Into<SerializerConfig>,
166{
167    fn from((framing, encoding): (Option<F>, S)) -> Self {
168        Self {
169            framing: framing.map(Into::into),
170            encoding: encoding.into().into(),
171        }
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use lookup::lookup_v2::{ConfigValuePath, parse_value_path};
178
179    use super::*;
180    use crate::encoding::TimestampFormat;
181
182    #[test]
183    fn deserialize_encoding_config() {
184        let string = r#"
185            {
186                "codec": "json",
187                "only_fields": ["a.b[0]"],
188                "except_fields": ["ignore_me"],
189                "timestamp_format": "unix"
190            }
191        "#;
192
193        let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
194        let serializer = encoding.config();
195
196        assert!(matches!(serializer, SerializerConfig::Json(_)));
197
198        let transformer = encoding.transformer();
199
200        assert_eq!(
201            transformer.only_fields(),
202            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
203        );
204        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
205        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
206    }
207
208    #[test]
209    fn deserialize_encoding_config_with_framing() {
210        let string = r#"
211            {
212                "framing": {
213                    "method": "newline_delimited"
214                },
215                "encoding": {
216                    "codec": "json",
217                    "only_fields": ["a.b[0]"],
218                    "except_fields": ["ignore_me"],
219                    "timestamp_format": "unix"
220                }
221            }
222        "#;
223
224        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
225        let (framing, serializer) = encoding.config();
226
227        assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
228        assert!(matches!(serializer, SerializerConfig::Json(_)));
229
230        let transformer = encoding.transformer();
231
232        assert_eq!(
233            transformer.only_fields(),
234            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
235        );
236        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
237        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
238    }
239
240    #[test]
241    fn deserialize_encoding_config_without_framing() {
242        let string = r#"
243            {
244                "encoding": {
245                    "codec": "json",
246                    "only_fields": ["a.b[0]"],
247                    "except_fields": ["ignore_me"],
248                    "timestamp_format": "unix"
249                }
250            }
251        "#;
252
253        let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
254        let (framing, serializer) = encoding.config();
255
256        assert!(framing.is_none());
257        assert!(matches!(serializer, SerializerConfig::Json(_)));
258
259        let transformer = encoding.transformer();
260
261        assert_eq!(
262            transformer.only_fields(),
263            &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
264        );
265        assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
266        assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
267    }
268}