vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, config::telemetry,
8    request_metadata::GroupedCountByteSize,
9};
10
11use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
12
13pub trait Encoder<T> {
14    /// Encodes the input into the provided writer.
15    ///
16    /// # Errors
17    ///
18    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
19    fn encode_input(
20        &self,
21        input: T,
22        writer: &mut dyn io::Write,
23    ) -> io::Result<(usize, GroupedCountByteSize)>;
24}
25
26impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
27    fn encode_input(
28        &self,
29        events: Vec<Event>,
30        writer: &mut dyn io::Write,
31    ) -> io::Result<(usize, GroupedCountByteSize)> {
32        let mut encoder = self.1.clone();
33        let mut bytes_written = 0;
34        let mut n_events_pending = events.len();
35        let is_empty = events.is_empty();
36        let batch_prefix = encoder.batch_prefix();
37        write_all(writer, n_events_pending, batch_prefix)?;
38        bytes_written += batch_prefix.len();
39
40        let mut byte_size = telemetry().create_request_count_byte_size();
41
42        for (position, mut event) in events.into_iter().with_position() {
43            self.0.transform(&mut event);
44
45            // Ensure the json size is calculated after any fields have been removed
46            // by the transformer.
47            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
48
49            let mut bytes = BytesMut::new();
50            match (position, encoder.framer()) {
51                (
52                    Position::Last | Position::Only,
53                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
54                ) => {
55                    encoder
56                        .serialize(event, &mut bytes)
57                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
58                }
59                _ => {
60                    encoder
61                        .encode(event, &mut bytes)
62                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
63                }
64            }
65            write_all(writer, n_events_pending, &bytes)?;
66            bytes_written += bytes.len();
67            n_events_pending -= 1;
68        }
69
70        let batch_suffix = encoder.batch_suffix(is_empty);
71        assert!(n_events_pending == 0);
72        write_all(writer, 0, batch_suffix)?;
73        bytes_written += batch_suffix.len();
74
75        Ok((bytes_written, byte_size))
76    }
77}
78
79impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
80    fn encode_input(
81        &self,
82        mut event: Event,
83        writer: &mut dyn io::Write,
84    ) -> io::Result<(usize, GroupedCountByteSize)> {
85        let mut encoder = self.1.clone();
86        self.0.transform(&mut event);
87
88        let mut byte_size = telemetry().create_request_count_byte_size();
89        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
90
91        let mut bytes = BytesMut::new();
92        encoder
93            .serialize(event, &mut bytes)
94            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
95        write_all(writer, 1, &bytes)?;
96        Ok((bytes.len(), byte_size))
97    }
98}
99
100/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
101/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
102///
103/// # Arguments
104///
105/// * `writer`           - The object implementing io::Write to write data to.
106/// * `n_events_pending` - The number of events that are dropped if this write fails.
107/// * `buf`              - The buffer to write.
108pub fn write_all(
109    writer: &mut dyn io::Write,
110    n_events_pending: usize,
111    buf: &[u8],
112) -> io::Result<()> {
113    writer.write_all(buf).inspect_err(|error| {
114        emit!(EncoderWriteError {
115            error,
116            count: n_events_pending,
117        });
118    })
119}
120
121pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
122where
123    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
124    E: Into<io::Error> + 'static,
125{
126    struct Tracked<'inner> {
127        count: usize,
128        inner: &'inner mut dyn io::Write,
129    }
130
131    impl io::Write for Tracked<'_> {
132        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
133            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
134            let n = self.inner.write(buf)?;
135            self.count += n;
136            Ok(n)
137        }
138
139        fn flush(&mut self) -> io::Result<()> {
140            self.inner.flush()
141        }
142    }
143
144    let mut tracked = Tracked { count: 0, inner };
145    f(&mut tracked, input).map_err(|e| e.into())?;
146    Ok(tracked.count)
147}
148
149#[cfg(test)]
150mod tests {
151    use std::{collections::BTreeMap, env, path::PathBuf};
152
153    use bytes::{BufMut, Bytes};
154    use vector_lib::{
155        codecs::{
156            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
157            NewlineDelimitedEncoder, TextSerializerConfig,
158            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
159        },
160        event::LogEvent,
161        internal_event::CountByteSize,
162        json_size::JsonSize,
163    };
164    use vrl::value::{KeyString, Value};
165
166    use super::*;
167
168    #[test]
169    fn test_encode_batch_json_empty() {
170        let encoding = (
171            Transformer::default(),
172            crate::codecs::Encoder::<Framer>::new(
173                CharacterDelimitedEncoder::new(b',').into(),
174                JsonSerializerConfig::default().build().into(),
175            ),
176        );
177
178        let mut writer = Vec::new();
179        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
180        assert_eq!(written, 2);
181
182        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
183        assert_eq!(
184            CountByteSize(0, JsonSize::zero()),
185            json_size.size().unwrap()
186        );
187    }
188
189    #[test]
190    fn test_encode_batch_json_single() {
191        let encoding = (
192            Transformer::default(),
193            crate::codecs::Encoder::<Framer>::new(
194                CharacterDelimitedEncoder::new(b',').into(),
195                JsonSerializerConfig::default().build().into(),
196            ),
197        );
198
199        let mut writer = Vec::new();
200        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
201            KeyString::from("key"),
202            Value::from("value"),
203        )])))];
204
205        let input_json_size = input
206            .iter()
207            .map(|event| event.estimated_json_encoded_size_of())
208            .sum::<JsonSize>();
209
210        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
211        assert_eq!(written, 17);
212
213        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
214        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
215    }
216
217    #[test]
218    fn test_encode_batch_json_multiple() {
219        let encoding = (
220            Transformer::default(),
221            crate::codecs::Encoder::<Framer>::new(
222                CharacterDelimitedEncoder::new(b',').into(),
223                JsonSerializerConfig::default().build().into(),
224            ),
225        );
226
227        let input = vec![
228            Event::Log(LogEvent::from(BTreeMap::from([(
229                KeyString::from("key"),
230                Value::from("value1"),
231            )]))),
232            Event::Log(LogEvent::from(BTreeMap::from([(
233                KeyString::from("key"),
234                Value::from("value2"),
235            )]))),
236            Event::Log(LogEvent::from(BTreeMap::from([(
237                KeyString::from("key"),
238                Value::from("value3"),
239            )]))),
240        ];
241
242        let input_json_size = input
243            .iter()
244            .map(|event| event.estimated_json_encoded_size_of())
245            .sum::<JsonSize>();
246
247        let mut writer = Vec::new();
248        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
249        assert_eq!(written, 52);
250
251        assert_eq!(
252            String::from_utf8(writer).unwrap(),
253            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
254        );
255
256        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
257    }
258
259    #[test]
260    fn test_encode_batch_ndjson_empty() {
261        let encoding = (
262            Transformer::default(),
263            crate::codecs::Encoder::<Framer>::new(
264                NewlineDelimitedEncoder::default().into(),
265                JsonSerializerConfig::default().build().into(),
266            ),
267        );
268
269        let mut writer = Vec::new();
270        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
271        assert_eq!(written, 0);
272
273        assert_eq!(String::from_utf8(writer).unwrap(), "");
274        assert_eq!(
275            CountByteSize(0, JsonSize::zero()),
276            json_size.size().unwrap()
277        );
278    }
279
280    #[test]
281    fn test_encode_batch_ndjson_single() {
282        let encoding = (
283            Transformer::default(),
284            crate::codecs::Encoder::<Framer>::new(
285                NewlineDelimitedEncoder::default().into(),
286                JsonSerializerConfig::default().build().into(),
287            ),
288        );
289
290        let mut writer = Vec::new();
291        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
292            KeyString::from("key"),
293            Value::from("value"),
294        )])))];
295        let input_json_size = input
296            .iter()
297            .map(|event| event.estimated_json_encoded_size_of())
298            .sum::<JsonSize>();
299
300        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
301        assert_eq!(written, 16);
302
303        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
304        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
305    }
306
307    #[test]
308    fn test_encode_batch_ndjson_multiple() {
309        let encoding = (
310            Transformer::default(),
311            crate::codecs::Encoder::<Framer>::new(
312                NewlineDelimitedEncoder::default().into(),
313                JsonSerializerConfig::default().build().into(),
314            ),
315        );
316
317        let mut writer = Vec::new();
318        let input = vec![
319            Event::Log(LogEvent::from(BTreeMap::from([(
320                KeyString::from("key"),
321                Value::from("value1"),
322            )]))),
323            Event::Log(LogEvent::from(BTreeMap::from([(
324                KeyString::from("key"),
325                Value::from("value2"),
326            )]))),
327            Event::Log(LogEvent::from(BTreeMap::from([(
328                KeyString::from("key"),
329                Value::from("value3"),
330            )]))),
331        ];
332        let input_json_size = input
333            .iter()
334            .map(|event| event.estimated_json_encoded_size_of())
335            .sum::<JsonSize>();
336
337        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
338        assert_eq!(written, 51);
339
340        assert_eq!(
341            String::from_utf8(writer).unwrap(),
342            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
343        );
344        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
345    }
346
347    #[test]
348    fn test_encode_event_json() {
349        let encoding = (
350            Transformer::default(),
351            crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
352        );
353
354        let mut writer = Vec::new();
355        let input = Event::Log(LogEvent::from(BTreeMap::from([(
356            KeyString::from("key"),
357            Value::from("value"),
358        )])));
359        let input_json_size = input.estimated_json_encoded_size_of();
360
361        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
362        assert_eq!(written, 15);
363
364        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
365        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
366    }
367
368    #[test]
369    fn test_encode_event_text() {
370        let encoding = (
371            Transformer::default(),
372            crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
373        );
374
375        let mut writer = Vec::new();
376        let input = Event::Log(LogEvent::from(BTreeMap::from([(
377            KeyString::from("message"),
378            Value::from("value"),
379        )])));
380        let input_json_size = input.estimated_json_encoded_size_of();
381
382        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
383        assert_eq!(written, 5);
384
385        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
386        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
387    }
388
389    fn test_data_dir() -> PathBuf {
390        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
391    }
392
393    #[test]
394    fn test_encode_batch_protobuf_single() {
395        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
396        let input_proto_size = message_raw.len();
397
398        // default LengthDelimitedCoderOptions.length_field_length is 4
399        let mut buf = BytesMut::with_capacity(64);
400        buf.reserve(4 + input_proto_size);
401        buf.put_uint(input_proto_size as u64, 4);
402        buf.extend_from_slice(&message_raw[..]);
403        let expected_bytes = buf.freeze();
404
405        let config = ProtobufSerializerConfig {
406            protobuf: ProtobufSerializerOptions {
407                desc_file: test_data_dir().join("test_proto.desc"),
408                message_type: "test_proto.User".to_string(),
409                use_json_names: false,
410            },
411        };
412
413        let encoding = (
414            Transformer::default(),
415            crate::codecs::Encoder::<Framer>::new(
416                LengthDelimitedEncoder::default().into(),
417                config.build().unwrap().into(),
418            ),
419        );
420
421        let mut writer = Vec::new();
422        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
423            (KeyString::from("id"), Value::from("123")),
424            (KeyString::from("name"), Value::from("Alice")),
425            (KeyString::from("age"), Value::from(30)),
426            (
427                KeyString::from("emails"),
428                Value::from(vec!["alice@example.com", "alice@work.com"]),
429            ),
430        ])))];
431
432        let input_json_size = input
433            .iter()
434            .map(|event| event.estimated_json_encoded_size_of())
435            .sum::<JsonSize>();
436
437        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
438
439        assert_eq!(input_proto_size, 49);
440        assert_eq!(written, input_proto_size + 4);
441        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
442        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
443    }
444
445    #[test]
446    fn test_encode_batch_protobuf_multiple() {
447        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
448        let messages = vec![message_raw.clone(), message_raw.clone()];
449        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
450
451        let mut buf = BytesMut::with_capacity(128);
452        for message in messages {
453            // default LengthDelimitedCoderOptions.length_field_length is 4
454            buf.reserve(4 + message.len());
455            buf.put_uint(message.len() as u64, 4);
456            buf.extend_from_slice(&message[..]);
457        }
458        let expected_bytes = buf.freeze();
459
460        let config = ProtobufSerializerConfig {
461            protobuf: ProtobufSerializerOptions {
462                desc_file: test_data_dir().join("test_proto.desc"),
463                message_type: "test_proto.User".to_string(),
464                use_json_names: false,
465            },
466        };
467
468        let encoding = (
469            Transformer::default(),
470            crate::codecs::Encoder::<Framer>::new(
471                LengthDelimitedEncoder::default().into(),
472                config.build().unwrap().into(),
473            ),
474        );
475
476        let mut writer = Vec::new();
477        let input = vec![
478            Event::Log(LogEvent::from(BTreeMap::from([
479                (KeyString::from("id"), Value::from("123")),
480                (KeyString::from("name"), Value::from("Alice")),
481                (KeyString::from("age"), Value::from(30)),
482                (
483                    KeyString::from("emails"),
484                    Value::from(vec!["alice@example.com", "alice@work.com"]),
485                ),
486            ]))),
487            Event::Log(LogEvent::from(BTreeMap::from([
488                (KeyString::from("id"), Value::from("123")),
489                (KeyString::from("name"), Value::from("Alice")),
490                (KeyString::from("age"), Value::from(30)),
491                (
492                    KeyString::from("emails"),
493                    Value::from(vec!["alice@example.com", "alice@work.com"]),
494                ),
495            ]))),
496        ];
497
498        let input_json_size: JsonSize = input
499            .iter()
500            .map(|event| event.estimated_json_encoded_size_of())
501            .sum();
502
503        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
504
505        assert_eq!(total_input_proto_size, 49 * 2);
506        assert_eq!(written, total_input_proto_size + 8);
507        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
508        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
509    }
510}