vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9    config::telemetry,
10    request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16    /// Encodes the input into the provided writer.
17    ///
18    /// # Errors
19    ///
20    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
21    fn encode_input(
22        &self,
23        input: T,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29    fn encode_input(
30        &self,
31        events: Vec<Event>,
32        writer: &mut dyn io::Write,
33    ) -> io::Result<(usize, GroupedCountByteSize)> {
34        let mut encoder = self.1.clone();
35        let mut bytes_written = 0;
36        let mut n_events_pending = events.len();
37        let is_empty = events.is_empty();
38        let batch_prefix = encoder.batch_prefix();
39        write_all(writer, n_events_pending, batch_prefix)?;
40        bytes_written += batch_prefix.len();
41
42        let mut byte_size = telemetry().create_request_count_byte_size();
43
44        for (position, mut event) in events.into_iter().with_position() {
45            self.0.transform(&mut event);
46
47            // Ensure the json size is calculated after any fields have been removed
48            // by the transformer.
49            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51            let mut bytes = BytesMut::new();
52            match (position, encoder.framer()) {
53                (
54                    Position::Last | Position::Only,
55                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56                ) => {
57                    encoder
58                        .serialize(event, &mut bytes)
59                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60                }
61                _ => {
62                    encoder
63                        .encode(event, &mut bytes)
64                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65                }
66            }
67            write_all(writer, n_events_pending, &bytes)?;
68            bytes_written += bytes.len();
69            n_events_pending -= 1;
70        }
71
72        let batch_suffix = encoder.batch_suffix(is_empty);
73        assert!(n_events_pending == 0);
74        write_all(writer, 0, batch_suffix)?;
75        bytes_written += batch_suffix.len();
76
77        Ok((bytes_written, byte_size))
78    }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82    fn encode_input(
83        &self,
84        mut event: Event,
85        writer: &mut dyn io::Write,
86    ) -> io::Result<(usize, GroupedCountByteSize)> {
87        let mut encoder = self.1.clone();
88        self.0.transform(&mut event);
89
90        let mut byte_size = telemetry().create_request_count_byte_size();
91        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93        let mut bytes = BytesMut::new();
94        encoder
95            .serialize(event, &mut bytes)
96            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97        write_all(writer, 1, &bytes)?;
98        Ok((bytes.len(), byte_size))
99    }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
104    fn encode_input(
105        &self,
106        events: Vec<Event>,
107        writer: &mut dyn io::Write,
108    ) -> io::Result<(usize, GroupedCountByteSize)> {
109        use tokio_util::codec::Encoder as _;
110
111        let mut encoder = self.1.clone();
112        let mut byte_size = telemetry().create_request_count_byte_size();
113        let n_events = events.len();
114        let mut transformed_events = Vec::with_capacity(n_events);
115
116        for mut event in events {
117            self.0.transform(&mut event);
118            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119            transformed_events.push(event);
120        }
121
122        let mut bytes = BytesMut::new();
123        encoder
124            .encode(transformed_events, &mut bytes)
125            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
126
127        write_all(writer, n_events, &bytes)?;
128        Ok((bytes.len(), byte_size))
129    }
130}
131
132impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
133    fn encode_input(
134        &self,
135        events: Vec<Event>,
136        writer: &mut dyn io::Write,
137    ) -> io::Result<(usize, GroupedCountByteSize)> {
138        // Delegate to the specific encoder implementation
139        match &self.1 {
140            vector_lib::codecs::EncoderKind::Framed(encoder) => {
141                (self.0.clone(), *encoder.clone()).encode_input(events, writer)
142            }
143            #[cfg(feature = "codecs-arrow")]
144            vector_lib::codecs::EncoderKind::Batch(encoder) => {
145                (self.0.clone(), encoder.clone()).encode_input(events, writer)
146            }
147        }
148    }
149}
150
151/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
152/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
153///
154/// # Arguments
155///
156/// * `writer`           - The object implementing io::Write to write data to.
157/// * `n_events_pending` - The number of events that are dropped if this write fails.
158/// * `buf`              - The buffer to write.
159pub fn write_all(
160    writer: &mut dyn io::Write,
161    n_events_pending: usize,
162    buf: &[u8],
163) -> io::Result<()> {
164    writer.write_all(buf).inspect_err(|error| {
165        emit!(EncoderWriteError {
166            error,
167            count: n_events_pending,
168        });
169    })
170}
171
172pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
173where
174    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
175    E: Into<io::Error> + 'static,
176{
177    struct Tracked<'inner> {
178        count: usize,
179        inner: &'inner mut dyn io::Write,
180    }
181
182    impl io::Write for Tracked<'_> {
183        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
184            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
185            let n = self.inner.write(buf)?;
186            self.count += n;
187            Ok(n)
188        }
189
190        fn flush(&mut self) -> io::Result<()> {
191            self.inner.flush()
192        }
193    }
194
195    let mut tracked = Tracked { count: 0, inner };
196    f(&mut tracked, input).map_err(|e| e.into())?;
197    Ok(tracked.count)
198}
199
200#[cfg(test)]
201mod tests {
202    use std::{collections::BTreeMap, env, path::PathBuf};
203
204    use bytes::{BufMut, Bytes};
205    use vector_lib::{
206        codecs::{
207            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
208            NewlineDelimitedEncoder, TextSerializerConfig,
209            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
210        },
211        event::LogEvent,
212        internal_event::CountByteSize,
213        json_size::JsonSize,
214    };
215    use vrl::value::{KeyString, Value};
216
217    use super::*;
218
219    #[test]
220    fn test_encode_batch_json_empty() {
221        let encoding = (
222            Transformer::default(),
223            vector_lib::codecs::Encoder::<Framer>::new(
224                CharacterDelimitedEncoder::new(b',').into(),
225                JsonSerializerConfig::default().build().into(),
226            ),
227        );
228
229        let mut writer = Vec::new();
230        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
231        assert_eq!(written, 2);
232
233        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
234        assert_eq!(
235            CountByteSize(0, JsonSize::zero()),
236            json_size.size().unwrap()
237        );
238    }
239
240    #[test]
241    fn test_encode_batch_json_single() {
242        let encoding = (
243            Transformer::default(),
244            vector_lib::codecs::Encoder::<Framer>::new(
245                CharacterDelimitedEncoder::new(b',').into(),
246                JsonSerializerConfig::default().build().into(),
247            ),
248        );
249
250        let mut writer = Vec::new();
251        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
252            KeyString::from("key"),
253            Value::from("value"),
254        )])))];
255
256        let input_json_size = input
257            .iter()
258            .map(|event| event.estimated_json_encoded_size_of())
259            .sum::<JsonSize>();
260
261        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
262        assert_eq!(written, 17);
263
264        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
265        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
266    }
267
268    #[test]
269    fn test_encode_batch_json_multiple() {
270        let encoding = (
271            Transformer::default(),
272            vector_lib::codecs::Encoder::<Framer>::new(
273                CharacterDelimitedEncoder::new(b',').into(),
274                JsonSerializerConfig::default().build().into(),
275            ),
276        );
277
278        let input = vec![
279            Event::Log(LogEvent::from(BTreeMap::from([(
280                KeyString::from("key"),
281                Value::from("value1"),
282            )]))),
283            Event::Log(LogEvent::from(BTreeMap::from([(
284                KeyString::from("key"),
285                Value::from("value2"),
286            )]))),
287            Event::Log(LogEvent::from(BTreeMap::from([(
288                KeyString::from("key"),
289                Value::from("value3"),
290            )]))),
291        ];
292
293        let input_json_size = input
294            .iter()
295            .map(|event| event.estimated_json_encoded_size_of())
296            .sum::<JsonSize>();
297
298        let mut writer = Vec::new();
299        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
300        assert_eq!(written, 52);
301
302        assert_eq!(
303            String::from_utf8(writer).unwrap(),
304            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
305        );
306
307        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
308    }
309
310    #[test]
311    fn test_encode_batch_ndjson_empty() {
312        let encoding = (
313            Transformer::default(),
314            vector_lib::codecs::Encoder::<Framer>::new(
315                NewlineDelimitedEncoder::default().into(),
316                JsonSerializerConfig::default().build().into(),
317            ),
318        );
319
320        let mut writer = Vec::new();
321        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
322        assert_eq!(written, 0);
323
324        assert_eq!(String::from_utf8(writer).unwrap(), "");
325        assert_eq!(
326            CountByteSize(0, JsonSize::zero()),
327            json_size.size().unwrap()
328        );
329    }
330
331    #[test]
332    fn test_encode_batch_ndjson_single() {
333        let encoding = (
334            Transformer::default(),
335            vector_lib::codecs::Encoder::<Framer>::new(
336                NewlineDelimitedEncoder::default().into(),
337                JsonSerializerConfig::default().build().into(),
338            ),
339        );
340
341        let mut writer = Vec::new();
342        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
343            KeyString::from("key"),
344            Value::from("value"),
345        )])))];
346        let input_json_size = input
347            .iter()
348            .map(|event| event.estimated_json_encoded_size_of())
349            .sum::<JsonSize>();
350
351        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
352        assert_eq!(written, 16);
353
354        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
355        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
356    }
357
358    #[test]
359    fn test_encode_batch_ndjson_multiple() {
360        let encoding = (
361            Transformer::default(),
362            vector_lib::codecs::Encoder::<Framer>::new(
363                NewlineDelimitedEncoder::default().into(),
364                JsonSerializerConfig::default().build().into(),
365            ),
366        );
367
368        let mut writer = Vec::new();
369        let input = vec![
370            Event::Log(LogEvent::from(BTreeMap::from([(
371                KeyString::from("key"),
372                Value::from("value1"),
373            )]))),
374            Event::Log(LogEvent::from(BTreeMap::from([(
375                KeyString::from("key"),
376                Value::from("value2"),
377            )]))),
378            Event::Log(LogEvent::from(BTreeMap::from([(
379                KeyString::from("key"),
380                Value::from("value3"),
381            )]))),
382        ];
383        let input_json_size = input
384            .iter()
385            .map(|event| event.estimated_json_encoded_size_of())
386            .sum::<JsonSize>();
387
388        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
389        assert_eq!(written, 51);
390
391        assert_eq!(
392            String::from_utf8(writer).unwrap(),
393            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
394        );
395        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
396    }
397
398    #[test]
399    fn test_encode_event_json() {
400        let encoding = (
401            Transformer::default(),
402            vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
403        );
404
405        let mut writer = Vec::new();
406        let input = Event::Log(LogEvent::from(BTreeMap::from([(
407            KeyString::from("key"),
408            Value::from("value"),
409        )])));
410        let input_json_size = input.estimated_json_encoded_size_of();
411
412        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
413        assert_eq!(written, 15);
414
415        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
416        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
417    }
418
419    #[test]
420    fn test_encode_event_text() {
421        let encoding = (
422            Transformer::default(),
423            vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
424        );
425
426        let mut writer = Vec::new();
427        let input = Event::Log(LogEvent::from(BTreeMap::from([(
428            KeyString::from("message"),
429            Value::from("value"),
430        )])));
431        let input_json_size = input.estimated_json_encoded_size_of();
432
433        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
434        assert_eq!(written, 5);
435
436        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
437        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
438    }
439
440    fn test_data_dir() -> PathBuf {
441        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
442    }
443
444    #[test]
445    fn test_encode_batch_protobuf_single() {
446        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
447        let input_proto_size = message_raw.len();
448
449        // default LengthDelimitedCoderOptions.length_field_length is 4
450        let mut buf = BytesMut::with_capacity(64);
451        buf.reserve(4 + input_proto_size);
452        buf.put_uint(input_proto_size as u64, 4);
453        buf.extend_from_slice(&message_raw[..]);
454        let expected_bytes = buf.freeze();
455
456        let config = ProtobufSerializerConfig {
457            protobuf: ProtobufSerializerOptions {
458                desc_file: test_data_dir().join("test_proto.desc"),
459                message_type: "test_proto.User".to_string(),
460                use_json_names: false,
461            },
462        };
463
464        let encoding = (
465            Transformer::default(),
466            vector_lib::codecs::Encoder::<Framer>::new(
467                LengthDelimitedEncoder::default().into(),
468                config.build().unwrap().into(),
469            ),
470        );
471
472        let mut writer = Vec::new();
473        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
474            (KeyString::from("id"), Value::from("123")),
475            (KeyString::from("name"), Value::from("Alice")),
476            (KeyString::from("age"), Value::from(30)),
477            (
478                KeyString::from("emails"),
479                Value::from(vec!["alice@example.com", "alice@work.com"]),
480            ),
481        ])))];
482
483        let input_json_size = input
484            .iter()
485            .map(|event| event.estimated_json_encoded_size_of())
486            .sum::<JsonSize>();
487
488        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
489
490        assert_eq!(input_proto_size, 49);
491        assert_eq!(written, input_proto_size + 4);
492        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
493        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
494    }
495
496    #[test]
497    fn test_encode_batch_protobuf_multiple() {
498        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
499        let messages = vec![message_raw.clone(), message_raw.clone()];
500        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
501
502        let mut buf = BytesMut::with_capacity(128);
503        for message in messages {
504            // default LengthDelimitedCoderOptions.length_field_length is 4
505            buf.reserve(4 + message.len());
506            buf.put_uint(message.len() as u64, 4);
507            buf.extend_from_slice(&message[..]);
508        }
509        let expected_bytes = buf.freeze();
510
511        let config = ProtobufSerializerConfig {
512            protobuf: ProtobufSerializerOptions {
513                desc_file: test_data_dir().join("test_proto.desc"),
514                message_type: "test_proto.User".to_string(),
515                use_json_names: false,
516            },
517        };
518
519        let encoding = (
520            Transformer::default(),
521            vector_lib::codecs::Encoder::<Framer>::new(
522                LengthDelimitedEncoder::default().into(),
523                config.build().unwrap().into(),
524            ),
525        );
526
527        let mut writer = Vec::new();
528        let input = vec![
529            Event::Log(LogEvent::from(BTreeMap::from([
530                (KeyString::from("id"), Value::from("123")),
531                (KeyString::from("name"), Value::from("Alice")),
532                (KeyString::from("age"), Value::from(30)),
533                (
534                    KeyString::from("emails"),
535                    Value::from(vec!["alice@example.com", "alice@work.com"]),
536                ),
537            ]))),
538            Event::Log(LogEvent::from(BTreeMap::from([
539                (KeyString::from("id"), Value::from("123")),
540                (KeyString::from("name"), Value::from("Alice")),
541                (KeyString::from("age"), Value::from(30)),
542                (
543                    KeyString::from("emails"),
544                    Value::from(vec!["alice@example.com", "alice@work.com"]),
545                ),
546            ]))),
547        ];
548
549        let input_json_size: JsonSize = input
550            .iter()
551            .map(|event| event.estimated_json_encoded_size_of())
552            .sum();
553
554        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
555
556        assert_eq!(total_input_proto_size, 49 * 2);
557        assert_eq!(written, total_input_proto_size + 8);
558        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
559        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
560    }
561}