vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, config::telemetry,
8    request_metadata::GroupedCountByteSize,
9};
10
11#[cfg(feature = "codecs-arrow")]
12use crate::internal_events::EncoderNullConstraintError;
13use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
14
15pub trait Encoder<T> {
16    /// Encodes the input into the provided writer.
17    ///
18    /// # Errors
19    ///
20    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
21    fn encode_input(
22        &self,
23        input: T,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
29    fn encode_input(
30        &self,
31        events: Vec<Event>,
32        writer: &mut dyn io::Write,
33    ) -> io::Result<(usize, GroupedCountByteSize)> {
34        let mut encoder = self.1.clone();
35        let mut bytes_written = 0;
36        let mut n_events_pending = events.len();
37        let is_empty = events.is_empty();
38        let batch_prefix = encoder.batch_prefix();
39        write_all(writer, n_events_pending, batch_prefix)?;
40        bytes_written += batch_prefix.len();
41
42        let mut byte_size = telemetry().create_request_count_byte_size();
43
44        for (position, mut event) in events.into_iter().with_position() {
45            self.0.transform(&mut event);
46
47            // Ensure the json size is calculated after any fields have been removed
48            // by the transformer.
49            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51            let mut bytes = BytesMut::new();
52            match (position, encoder.framer()) {
53                (
54                    Position::Last | Position::Only,
55                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56                ) => {
57                    encoder
58                        .serialize(event, &mut bytes)
59                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60                }
61                _ => {
62                    encoder
63                        .encode(event, &mut bytes)
64                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65                }
66            }
67            write_all(writer, n_events_pending, &bytes)?;
68            bytes_written += bytes.len();
69            n_events_pending -= 1;
70        }
71
72        let batch_suffix = encoder.batch_suffix(is_empty);
73        assert!(n_events_pending == 0);
74        write_all(writer, 0, batch_suffix)?;
75        bytes_written += batch_suffix.len();
76
77        Ok((bytes_written, byte_size))
78    }
79}
80
81impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
82    fn encode_input(
83        &self,
84        mut event: Event,
85        writer: &mut dyn io::Write,
86    ) -> io::Result<(usize, GroupedCountByteSize)> {
87        let mut encoder = self.1.clone();
88        self.0.transform(&mut event);
89
90        let mut byte_size = telemetry().create_request_count_byte_size();
91        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93        let mut bytes = BytesMut::new();
94        encoder
95            .serialize(event, &mut bytes)
96            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97        write_all(writer, 1, &bytes)?;
98        Ok((bytes.len(), byte_size))
99    }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, crate::codecs::BatchEncoder) {
104    fn encode_input(
105        &self,
106        events: Vec<Event>,
107        writer: &mut dyn io::Write,
108    ) -> io::Result<(usize, GroupedCountByteSize)> {
109        use tokio_util::codec::Encoder as _;
110
111        let mut encoder = self.1.clone();
112        let mut byte_size = telemetry().create_request_count_byte_size();
113        let n_events = events.len();
114        let mut transformed_events = Vec::with_capacity(n_events);
115
116        for mut event in events {
117            self.0.transform(&mut event);
118            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119            transformed_events.push(event);
120        }
121
122        let mut bytes = BytesMut::new();
123        encoder
124            .encode(transformed_events, &mut bytes)
125            .map_err(|error| {
126                if let vector_lib::codecs::encoding::Error::SchemaConstraintViolation(
127                    ref constraint_error,
128                ) = error
129                {
130                    emit!(EncoderNullConstraintError {
131                        error: constraint_error
132                    });
133                }
134                io::Error::new(io::ErrorKind::InvalidData, error)
135            })?;
136
137        write_all(writer, n_events, &bytes)?;
138        Ok((bytes.len(), byte_size))
139    }
140}
141
142impl Encoder<Vec<Event>> for (Transformer, crate::codecs::EncoderKind) {
143    fn encode_input(
144        &self,
145        events: Vec<Event>,
146        writer: &mut dyn io::Write,
147    ) -> io::Result<(usize, GroupedCountByteSize)> {
148        // Delegate to the specific encoder implementation
149        match &self.1 {
150            crate::codecs::EncoderKind::Framed(encoder) => {
151                (self.0.clone(), *encoder.clone()).encode_input(events, writer)
152            }
153            #[cfg(feature = "codecs-arrow")]
154            crate::codecs::EncoderKind::Batch(encoder) => {
155                (self.0.clone(), encoder.clone()).encode_input(events, writer)
156            }
157        }
158    }
159}
160
161/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
162/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
163///
164/// # Arguments
165///
166/// * `writer`           - The object implementing io::Write to write data to.
167/// * `n_events_pending` - The number of events that are dropped if this write fails.
168/// * `buf`              - The buffer to write.
169pub fn write_all(
170    writer: &mut dyn io::Write,
171    n_events_pending: usize,
172    buf: &[u8],
173) -> io::Result<()> {
174    writer.write_all(buf).inspect_err(|error| {
175        emit!(EncoderWriteError {
176            error,
177            count: n_events_pending,
178        });
179    })
180}
181
182pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
183where
184    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
185    E: Into<io::Error> + 'static,
186{
187    struct Tracked<'inner> {
188        count: usize,
189        inner: &'inner mut dyn io::Write,
190    }
191
192    impl io::Write for Tracked<'_> {
193        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
194            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
195            let n = self.inner.write(buf)?;
196            self.count += n;
197            Ok(n)
198        }
199
200        fn flush(&mut self) -> io::Result<()> {
201            self.inner.flush()
202        }
203    }
204
205    let mut tracked = Tracked { count: 0, inner };
206    f(&mut tracked, input).map_err(|e| e.into())?;
207    Ok(tracked.count)
208}
209
210#[cfg(test)]
211mod tests {
212    use std::{collections::BTreeMap, env, path::PathBuf};
213
214    use bytes::{BufMut, Bytes};
215    use vector_lib::{
216        codecs::{
217            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
218            NewlineDelimitedEncoder, TextSerializerConfig,
219            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
220        },
221        event::LogEvent,
222        internal_event::CountByteSize,
223        json_size::JsonSize,
224    };
225    use vrl::value::{KeyString, Value};
226
227    use super::*;
228
229    #[test]
230    fn test_encode_batch_json_empty() {
231        let encoding = (
232            Transformer::default(),
233            crate::codecs::Encoder::<Framer>::new(
234                CharacterDelimitedEncoder::new(b',').into(),
235                JsonSerializerConfig::default().build().into(),
236            ),
237        );
238
239        let mut writer = Vec::new();
240        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
241        assert_eq!(written, 2);
242
243        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
244        assert_eq!(
245            CountByteSize(0, JsonSize::zero()),
246            json_size.size().unwrap()
247        );
248    }
249
250    #[test]
251    fn test_encode_batch_json_single() {
252        let encoding = (
253            Transformer::default(),
254            crate::codecs::Encoder::<Framer>::new(
255                CharacterDelimitedEncoder::new(b',').into(),
256                JsonSerializerConfig::default().build().into(),
257            ),
258        );
259
260        let mut writer = Vec::new();
261        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
262            KeyString::from("key"),
263            Value::from("value"),
264        )])))];
265
266        let input_json_size = input
267            .iter()
268            .map(|event| event.estimated_json_encoded_size_of())
269            .sum::<JsonSize>();
270
271        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
272        assert_eq!(written, 17);
273
274        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
275        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
276    }
277
278    #[test]
279    fn test_encode_batch_json_multiple() {
280        let encoding = (
281            Transformer::default(),
282            crate::codecs::Encoder::<Framer>::new(
283                CharacterDelimitedEncoder::new(b',').into(),
284                JsonSerializerConfig::default().build().into(),
285            ),
286        );
287
288        let input = vec![
289            Event::Log(LogEvent::from(BTreeMap::from([(
290                KeyString::from("key"),
291                Value::from("value1"),
292            )]))),
293            Event::Log(LogEvent::from(BTreeMap::from([(
294                KeyString::from("key"),
295                Value::from("value2"),
296            )]))),
297            Event::Log(LogEvent::from(BTreeMap::from([(
298                KeyString::from("key"),
299                Value::from("value3"),
300            )]))),
301        ];
302
303        let input_json_size = input
304            .iter()
305            .map(|event| event.estimated_json_encoded_size_of())
306            .sum::<JsonSize>();
307
308        let mut writer = Vec::new();
309        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
310        assert_eq!(written, 52);
311
312        assert_eq!(
313            String::from_utf8(writer).unwrap(),
314            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
315        );
316
317        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
318    }
319
320    #[test]
321    fn test_encode_batch_ndjson_empty() {
322        let encoding = (
323            Transformer::default(),
324            crate::codecs::Encoder::<Framer>::new(
325                NewlineDelimitedEncoder::default().into(),
326                JsonSerializerConfig::default().build().into(),
327            ),
328        );
329
330        let mut writer = Vec::new();
331        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
332        assert_eq!(written, 0);
333
334        assert_eq!(String::from_utf8(writer).unwrap(), "");
335        assert_eq!(
336            CountByteSize(0, JsonSize::zero()),
337            json_size.size().unwrap()
338        );
339    }
340
341    #[test]
342    fn test_encode_batch_ndjson_single() {
343        let encoding = (
344            Transformer::default(),
345            crate::codecs::Encoder::<Framer>::new(
346                NewlineDelimitedEncoder::default().into(),
347                JsonSerializerConfig::default().build().into(),
348            ),
349        );
350
351        let mut writer = Vec::new();
352        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
353            KeyString::from("key"),
354            Value::from("value"),
355        )])))];
356        let input_json_size = input
357            .iter()
358            .map(|event| event.estimated_json_encoded_size_of())
359            .sum::<JsonSize>();
360
361        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
362        assert_eq!(written, 16);
363
364        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
365        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
366    }
367
368    #[test]
369    fn test_encode_batch_ndjson_multiple() {
370        let encoding = (
371            Transformer::default(),
372            crate::codecs::Encoder::<Framer>::new(
373                NewlineDelimitedEncoder::default().into(),
374                JsonSerializerConfig::default().build().into(),
375            ),
376        );
377
378        let mut writer = Vec::new();
379        let input = vec![
380            Event::Log(LogEvent::from(BTreeMap::from([(
381                KeyString::from("key"),
382                Value::from("value1"),
383            )]))),
384            Event::Log(LogEvent::from(BTreeMap::from([(
385                KeyString::from("key"),
386                Value::from("value2"),
387            )]))),
388            Event::Log(LogEvent::from(BTreeMap::from([(
389                KeyString::from("key"),
390                Value::from("value3"),
391            )]))),
392        ];
393        let input_json_size = input
394            .iter()
395            .map(|event| event.estimated_json_encoded_size_of())
396            .sum::<JsonSize>();
397
398        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
399        assert_eq!(written, 51);
400
401        assert_eq!(
402            String::from_utf8(writer).unwrap(),
403            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
404        );
405        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
406    }
407
408    #[test]
409    fn test_encode_event_json() {
410        let encoding = (
411            Transformer::default(),
412            crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
413        );
414
415        let mut writer = Vec::new();
416        let input = Event::Log(LogEvent::from(BTreeMap::from([(
417            KeyString::from("key"),
418            Value::from("value"),
419        )])));
420        let input_json_size = input.estimated_json_encoded_size_of();
421
422        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
423        assert_eq!(written, 15);
424
425        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
426        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
427    }
428
429    #[test]
430    fn test_encode_event_text() {
431        let encoding = (
432            Transformer::default(),
433            crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
434        );
435
436        let mut writer = Vec::new();
437        let input = Event::Log(LogEvent::from(BTreeMap::from([(
438            KeyString::from("message"),
439            Value::from("value"),
440        )])));
441        let input_json_size = input.estimated_json_encoded_size_of();
442
443        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
444        assert_eq!(written, 5);
445
446        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
447        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
448    }
449
450    fn test_data_dir() -> PathBuf {
451        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
452    }
453
454    #[test]
455    fn test_encode_batch_protobuf_single() {
456        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
457        let input_proto_size = message_raw.len();
458
459        // default LengthDelimitedCoderOptions.length_field_length is 4
460        let mut buf = BytesMut::with_capacity(64);
461        buf.reserve(4 + input_proto_size);
462        buf.put_uint(input_proto_size as u64, 4);
463        buf.extend_from_slice(&message_raw[..]);
464        let expected_bytes = buf.freeze();
465
466        let config = ProtobufSerializerConfig {
467            protobuf: ProtobufSerializerOptions {
468                desc_file: test_data_dir().join("test_proto.desc"),
469                message_type: "test_proto.User".to_string(),
470                use_json_names: false,
471            },
472        };
473
474        let encoding = (
475            Transformer::default(),
476            crate::codecs::Encoder::<Framer>::new(
477                LengthDelimitedEncoder::default().into(),
478                config.build().unwrap().into(),
479            ),
480        );
481
482        let mut writer = Vec::new();
483        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
484            (KeyString::from("id"), Value::from("123")),
485            (KeyString::from("name"), Value::from("Alice")),
486            (KeyString::from("age"), Value::from(30)),
487            (
488                KeyString::from("emails"),
489                Value::from(vec!["alice@example.com", "alice@work.com"]),
490            ),
491        ])))];
492
493        let input_json_size = input
494            .iter()
495            .map(|event| event.estimated_json_encoded_size_of())
496            .sum::<JsonSize>();
497
498        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
499
500        assert_eq!(input_proto_size, 49);
501        assert_eq!(written, input_proto_size + 4);
502        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
503        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
504    }
505
506    #[test]
507    fn test_encode_batch_protobuf_multiple() {
508        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
509        let messages = vec![message_raw.clone(), message_raw.clone()];
510        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
511
512        let mut buf = BytesMut::with_capacity(128);
513        for message in messages {
514            // default LengthDelimitedCoderOptions.length_field_length is 4
515            buf.reserve(4 + message.len());
516            buf.put_uint(message.len() as u64, 4);
517            buf.extend_from_slice(&message[..]);
518        }
519        let expected_bytes = buf.freeze();
520
521        let config = ProtobufSerializerConfig {
522            protobuf: ProtobufSerializerOptions {
523                desc_file: test_data_dir().join("test_proto.desc"),
524                message_type: "test_proto.User".to_string(),
525                use_json_names: false,
526            },
527        };
528
529        let encoding = (
530            Transformer::default(),
531            crate::codecs::Encoder::<Framer>::new(
532                LengthDelimitedEncoder::default().into(),
533                config.build().unwrap().into(),
534            ),
535        );
536
537        let mut writer = Vec::new();
538        let input = vec![
539            Event::Log(LogEvent::from(BTreeMap::from([
540                (KeyString::from("id"), Value::from("123")),
541                (KeyString::from("name"), Value::from("Alice")),
542                (KeyString::from("age"), Value::from(30)),
543                (
544                    KeyString::from("emails"),
545                    Value::from(vec!["alice@example.com", "alice@work.com"]),
546                ),
547            ]))),
548            Event::Log(LogEvent::from(BTreeMap::from([
549                (KeyString::from("id"), Value::from("123")),
550                (KeyString::from("name"), Value::from("Alice")),
551                (KeyString::from("age"), Value::from(30)),
552                (
553                    KeyString::from("emails"),
554                    Value::from(vec!["alice@example.com", "alice@work.com"]),
555                ),
556            ]))),
557        ];
558
559        let input_json_size: JsonSize = input
560            .iter()
561            .map(|event| event.estimated_json_encoded_size_of())
562            .sum();
563
564        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
565
566        assert_eq!(total_input_proto_size, 49 * 2);
567        assert_eq!(written, total_input_proto_size + 8);
568        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
569        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
570    }
571}