vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::codecs::encoding::Framer;
7use vector_lib::request_metadata::GroupedCountByteSize;
8use vector_lib::{config::telemetry, EstimatedJsonEncodedSizeOf};
9
10use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
11
12pub trait Encoder<T> {
13    /// Encodes the input into the provided writer.
14    ///
15    /// # Errors
16    ///
17    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
18    fn encode_input(
19        &self,
20        input: T,
21        writer: &mut dyn io::Write,
22    ) -> io::Result<(usize, GroupedCountByteSize)>;
23}
24
25impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
26    fn encode_input(
27        &self,
28        events: Vec<Event>,
29        writer: &mut dyn io::Write,
30    ) -> io::Result<(usize, GroupedCountByteSize)> {
31        let mut encoder = self.1.clone();
32        let mut bytes_written = 0;
33        let mut n_events_pending = events.len();
34        let is_empty = events.is_empty();
35        let batch_prefix = encoder.batch_prefix();
36        write_all(writer, n_events_pending, batch_prefix)?;
37        bytes_written += batch_prefix.len();
38
39        let mut byte_size = telemetry().create_request_count_byte_size();
40
41        for (position, mut event) in events.into_iter().with_position() {
42            self.0.transform(&mut event);
43
44            // Ensure the json size is calculated after any fields have been removed
45            // by the transformer.
46            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
47
48            let mut bytes = BytesMut::new();
49            match (position, encoder.framer()) {
50                (
51                    Position::Last | Position::Only,
52                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
53                ) => {
54                    encoder
55                        .serialize(event, &mut bytes)
56                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
57                }
58                _ => {
59                    encoder
60                        .encode(event, &mut bytes)
61                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
62                }
63            }
64            write_all(writer, n_events_pending, &bytes)?;
65            bytes_written += bytes.len();
66            n_events_pending -= 1;
67        }
68
69        let batch_suffix = encoder.batch_suffix(is_empty);
70        assert!(n_events_pending == 0);
71        write_all(writer, 0, batch_suffix)?;
72        bytes_written += batch_suffix.len();
73
74        Ok((bytes_written, byte_size))
75    }
76}
77
78impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
79    fn encode_input(
80        &self,
81        mut event: Event,
82        writer: &mut dyn io::Write,
83    ) -> io::Result<(usize, GroupedCountByteSize)> {
84        let mut encoder = self.1.clone();
85        self.0.transform(&mut event);
86
87        let mut byte_size = telemetry().create_request_count_byte_size();
88        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
89
90        let mut bytes = BytesMut::new();
91        encoder
92            .serialize(event, &mut bytes)
93            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
94        write_all(writer, 1, &bytes)?;
95        Ok((bytes.len(), byte_size))
96    }
97}
98
99/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
100/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
101///
102/// # Arguments
103///
104/// * `writer`           - The object implementing io::Write to write data to.
105/// * `n_events_pending` - The number of events that are dropped if this write fails.
106/// * `buf`              - The buffer to write.
107pub fn write_all(
108    writer: &mut dyn io::Write,
109    n_events_pending: usize,
110    buf: &[u8],
111) -> io::Result<()> {
112    writer.write_all(buf).inspect_err(|error| {
113        emit!(EncoderWriteError {
114            error,
115            count: n_events_pending,
116        });
117    })
118}
119
120pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
121where
122    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
123    E: Into<io::Error> + 'static,
124{
125    struct Tracked<'inner> {
126        count: usize,
127        inner: &'inner mut dyn io::Write,
128    }
129
130    impl io::Write for Tracked<'_> {
131        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
132            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
133            let n = self.inner.write(buf)?;
134            self.count += n;
135            Ok(n)
136        }
137
138        fn flush(&mut self) -> io::Result<()> {
139            self.inner.flush()
140        }
141    }
142
143    let mut tracked = Tracked { count: 0, inner };
144    f(&mut tracked, input).map_err(|e| e.into())?;
145    Ok(tracked.count)
146}
147
148#[cfg(test)]
149mod tests {
150    use std::collections::BTreeMap;
151    use std::env;
152    use std::path::PathBuf;
153
154    use bytes::{BufMut, Bytes};
155    use vector_lib::codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions};
156    use vector_lib::codecs::{
157        CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
158        NewlineDelimitedEncoder, TextSerializerConfig,
159    };
160    use vector_lib::event::LogEvent;
161    use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
162    use vrl::value::{KeyString, Value};
163
164    use super::*;
165
166    #[test]
167    fn test_encode_batch_json_empty() {
168        let encoding = (
169            Transformer::default(),
170            crate::codecs::Encoder::<Framer>::new(
171                CharacterDelimitedEncoder::new(b',').into(),
172                JsonSerializerConfig::default().build().into(),
173            ),
174        );
175
176        let mut writer = Vec::new();
177        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
178        assert_eq!(written, 2);
179
180        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
181        assert_eq!(
182            CountByteSize(0, JsonSize::zero()),
183            json_size.size().unwrap()
184        );
185    }
186
187    #[test]
188    fn test_encode_batch_json_single() {
189        let encoding = (
190            Transformer::default(),
191            crate::codecs::Encoder::<Framer>::new(
192                CharacterDelimitedEncoder::new(b',').into(),
193                JsonSerializerConfig::default().build().into(),
194            ),
195        );
196
197        let mut writer = Vec::new();
198        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
199            KeyString::from("key"),
200            Value::from("value"),
201        )])))];
202
203        let input_json_size = input
204            .iter()
205            .map(|event| event.estimated_json_encoded_size_of())
206            .sum::<JsonSize>();
207
208        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
209        assert_eq!(written, 17);
210
211        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
212        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
213    }
214
215    #[test]
216    fn test_encode_batch_json_multiple() {
217        let encoding = (
218            Transformer::default(),
219            crate::codecs::Encoder::<Framer>::new(
220                CharacterDelimitedEncoder::new(b',').into(),
221                JsonSerializerConfig::default().build().into(),
222            ),
223        );
224
225        let input = vec![
226            Event::Log(LogEvent::from(BTreeMap::from([(
227                KeyString::from("key"),
228                Value::from("value1"),
229            )]))),
230            Event::Log(LogEvent::from(BTreeMap::from([(
231                KeyString::from("key"),
232                Value::from("value2"),
233            )]))),
234            Event::Log(LogEvent::from(BTreeMap::from([(
235                KeyString::from("key"),
236                Value::from("value3"),
237            )]))),
238        ];
239
240        let input_json_size = input
241            .iter()
242            .map(|event| event.estimated_json_encoded_size_of())
243            .sum::<JsonSize>();
244
245        let mut writer = Vec::new();
246        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
247        assert_eq!(written, 52);
248
249        assert_eq!(
250            String::from_utf8(writer).unwrap(),
251            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
252        );
253
254        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
255    }
256
257    #[test]
258    fn test_encode_batch_ndjson_empty() {
259        let encoding = (
260            Transformer::default(),
261            crate::codecs::Encoder::<Framer>::new(
262                NewlineDelimitedEncoder::default().into(),
263                JsonSerializerConfig::default().build().into(),
264            ),
265        );
266
267        let mut writer = Vec::new();
268        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
269        assert_eq!(written, 0);
270
271        assert_eq!(String::from_utf8(writer).unwrap(), "");
272        assert_eq!(
273            CountByteSize(0, JsonSize::zero()),
274            json_size.size().unwrap()
275        );
276    }
277
278    #[test]
279    fn test_encode_batch_ndjson_single() {
280        let encoding = (
281            Transformer::default(),
282            crate::codecs::Encoder::<Framer>::new(
283                NewlineDelimitedEncoder::default().into(),
284                JsonSerializerConfig::default().build().into(),
285            ),
286        );
287
288        let mut writer = Vec::new();
289        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
290            KeyString::from("key"),
291            Value::from("value"),
292        )])))];
293        let input_json_size = input
294            .iter()
295            .map(|event| event.estimated_json_encoded_size_of())
296            .sum::<JsonSize>();
297
298        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
299        assert_eq!(written, 16);
300
301        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
302        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
303    }
304
305    #[test]
306    fn test_encode_batch_ndjson_multiple() {
307        let encoding = (
308            Transformer::default(),
309            crate::codecs::Encoder::<Framer>::new(
310                NewlineDelimitedEncoder::default().into(),
311                JsonSerializerConfig::default().build().into(),
312            ),
313        );
314
315        let mut writer = Vec::new();
316        let input = vec![
317            Event::Log(LogEvent::from(BTreeMap::from([(
318                KeyString::from("key"),
319                Value::from("value1"),
320            )]))),
321            Event::Log(LogEvent::from(BTreeMap::from([(
322                KeyString::from("key"),
323                Value::from("value2"),
324            )]))),
325            Event::Log(LogEvent::from(BTreeMap::from([(
326                KeyString::from("key"),
327                Value::from("value3"),
328            )]))),
329        ];
330        let input_json_size = input
331            .iter()
332            .map(|event| event.estimated_json_encoded_size_of())
333            .sum::<JsonSize>();
334
335        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
336        assert_eq!(written, 51);
337
338        assert_eq!(
339            String::from_utf8(writer).unwrap(),
340            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
341        );
342        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
343    }
344
345    #[test]
346    fn test_encode_event_json() {
347        let encoding = (
348            Transformer::default(),
349            crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
350        );
351
352        let mut writer = Vec::new();
353        let input = Event::Log(LogEvent::from(BTreeMap::from([(
354            KeyString::from("key"),
355            Value::from("value"),
356        )])));
357        let input_json_size = input.estimated_json_encoded_size_of();
358
359        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
360        assert_eq!(written, 15);
361
362        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
363        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
364    }
365
366    #[test]
367    fn test_encode_event_text() {
368        let encoding = (
369            Transformer::default(),
370            crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
371        );
372
373        let mut writer = Vec::new();
374        let input = Event::Log(LogEvent::from(BTreeMap::from([(
375            KeyString::from("message"),
376            Value::from("value"),
377        )])));
378        let input_json_size = input.estimated_json_encoded_size_of();
379
380        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
381        assert_eq!(written, 5);
382
383        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
384        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
385    }
386
387    fn test_data_dir() -> PathBuf {
388        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
389    }
390
391    #[test]
392    fn test_encode_batch_protobuf_single() {
393        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
394        let input_proto_size = message_raw.len();
395
396        // default LengthDelimitedCoderOptions.length_field_length is 4
397        let mut buf = BytesMut::with_capacity(64);
398        buf.reserve(4 + input_proto_size);
399        buf.put_uint(input_proto_size as u64, 4);
400        buf.extend_from_slice(&message_raw[..]);
401        let expected_bytes = buf.freeze();
402
403        let config = ProtobufSerializerConfig {
404            protobuf: ProtobufSerializerOptions {
405                desc_file: test_data_dir().join("test_proto.desc"),
406                message_type: "test_proto.User".to_string(),
407            },
408        };
409
410        let encoding = (
411            Transformer::default(),
412            crate::codecs::Encoder::<Framer>::new(
413                LengthDelimitedEncoder::default().into(),
414                config.build().unwrap().into(),
415            ),
416        );
417
418        let mut writer = Vec::new();
419        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
420            (KeyString::from("id"), Value::from("123")),
421            (KeyString::from("name"), Value::from("Alice")),
422            (KeyString::from("age"), Value::from(30)),
423            (
424                KeyString::from("emails"),
425                Value::from(vec!["alice@example.com", "alice@work.com"]),
426            ),
427        ])))];
428
429        let input_json_size = input
430            .iter()
431            .map(|event| event.estimated_json_encoded_size_of())
432            .sum::<JsonSize>();
433
434        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
435
436        assert_eq!(input_proto_size, 49);
437        assert_eq!(written, input_proto_size + 4);
438        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
439        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
440    }
441
442    #[test]
443    fn test_encode_batch_protobuf_multiple() {
444        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
445        let messages = vec![message_raw.clone(), message_raw.clone()];
446        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
447
448        let mut buf = BytesMut::with_capacity(128);
449        for message in messages {
450            // default LengthDelimitedCoderOptions.length_field_length is 4
451            buf.reserve(4 + message.len());
452            buf.put_uint(message.len() as u64, 4);
453            buf.extend_from_slice(&message[..]);
454        }
455        let expected_bytes = buf.freeze();
456
457        let config = ProtobufSerializerConfig {
458            protobuf: ProtobufSerializerOptions {
459                desc_file: test_data_dir().join("test_proto.desc"),
460                message_type: "test_proto.User".to_string(),
461            },
462        };
463
464        let encoding = (
465            Transformer::default(),
466            crate::codecs::Encoder::<Framer>::new(
467                LengthDelimitedEncoder::default().into(),
468                config.build().unwrap().into(),
469            ),
470        );
471
472        let mut writer = Vec::new();
473        let input = vec![
474            Event::Log(LogEvent::from(BTreeMap::from([
475                (KeyString::from("id"), Value::from("123")),
476                (KeyString::from("name"), Value::from("Alice")),
477                (KeyString::from("age"), Value::from(30)),
478                (
479                    KeyString::from("emails"),
480                    Value::from(vec!["alice@example.com", "alice@work.com"]),
481                ),
482            ]))),
483            Event::Log(LogEvent::from(BTreeMap::from([
484                (KeyString::from("id"), Value::from("123")),
485                (KeyString::from("name"), Value::from("Alice")),
486                (KeyString::from("age"), Value::from(30)),
487                (
488                    KeyString::from("emails"),
489                    Value::from(vec!["alice@example.com", "alice@work.com"]),
490                ),
491            ]))),
492        ];
493
494        let input_json_size: JsonSize = input
495            .iter()
496            .map(|event| event.estimated_json_encoded_size_of())
497            .sum();
498
499        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
500
501        assert_eq!(total_input_proto_size, 49 * 2);
502        assert_eq!(written, total_input_proto_size + 8);
503        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
504        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
505    }
506}