vector/sinks/util/
encoding.rs

1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9    config::telemetry,
10    request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14#[cfg(feature = "codecs-arrow")]
15use vector_lib::codecs::internal_events::EncoderNullConstraintError;
16
17pub trait Encoder<T> {
18    /// Encodes the input into the provided writer.
19    ///
20    /// # Errors
21    ///
22    /// If an I/O error is encountered while encoding the input, an error variant will be returned.
23    fn encode_input(
24        &self,
25        input: T,
26        writer: &mut dyn io::Write,
27    ) -> io::Result<(usize, GroupedCountByteSize)>;
28}
29
30impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
31    fn encode_input(
32        &self,
33        events: Vec<Event>,
34        writer: &mut dyn io::Write,
35    ) -> io::Result<(usize, GroupedCountByteSize)> {
36        let mut encoder = self.1.clone();
37        let mut bytes_written = 0;
38        let mut n_events_pending = events.len();
39        let is_empty = events.is_empty();
40        let batch_prefix = encoder.batch_prefix();
41        write_all(writer, n_events_pending, batch_prefix)?;
42        bytes_written += batch_prefix.len();
43
44        let mut byte_size = telemetry().create_request_count_byte_size();
45
46        for (position, mut event) in events.into_iter().with_position() {
47            self.0.transform(&mut event);
48
49            // Ensure the json size is calculated after any fields have been removed
50            // by the transformer.
51            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
52
53            let mut bytes = BytesMut::new();
54            match (position, encoder.framer()) {
55                (
56                    Position::Last | Position::Only,
57                    Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
58                ) => {
59                    encoder
60                        .serialize(event, &mut bytes)
61                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
62                }
63                _ => {
64                    encoder
65                        .encode(event, &mut bytes)
66                        .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
67                }
68            }
69            write_all(writer, n_events_pending, &bytes)?;
70            bytes_written += bytes.len();
71            n_events_pending -= 1;
72        }
73
74        let batch_suffix = encoder.batch_suffix(is_empty);
75        assert!(n_events_pending == 0);
76        write_all(writer, 0, batch_suffix)?;
77        bytes_written += batch_suffix.len();
78
79        Ok((bytes_written, byte_size))
80    }
81}
82
83impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
84    fn encode_input(
85        &self,
86        mut event: Event,
87        writer: &mut dyn io::Write,
88    ) -> io::Result<(usize, GroupedCountByteSize)> {
89        let mut encoder = self.1.clone();
90        self.0.transform(&mut event);
91
92        let mut byte_size = telemetry().create_request_count_byte_size();
93        byte_size.add_event(&event, event.estimated_json_encoded_size_of());
94
95        let mut bytes = BytesMut::new();
96        encoder
97            .serialize(event, &mut bytes)
98            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
99        write_all(writer, 1, &bytes)?;
100        Ok((bytes.len(), byte_size))
101    }
102}
103
104#[cfg(feature = "codecs-arrow")]
105impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
106    fn encode_input(
107        &self,
108        events: Vec<Event>,
109        writer: &mut dyn io::Write,
110    ) -> io::Result<(usize, GroupedCountByteSize)> {
111        use tokio_util::codec::Encoder as _;
112
113        let mut encoder = self.1.clone();
114        let mut byte_size = telemetry().create_request_count_byte_size();
115        let n_events = events.len();
116        let mut transformed_events = Vec::with_capacity(n_events);
117
118        for mut event in events {
119            self.0.transform(&mut event);
120            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
121            transformed_events.push(event);
122        }
123
124        let mut bytes = BytesMut::new();
125        encoder
126            .encode(transformed_events, &mut bytes)
127            .map_err(|error| {
128                if let vector_lib::codecs::encoding::Error::SchemaConstraintViolation(
129                    ref constraint_error,
130                ) = error
131                {
132                    emit!(EncoderNullConstraintError {
133                        error: constraint_error
134                    });
135                }
136                io::Error::new(io::ErrorKind::InvalidData, error)
137            })?;
138
139        write_all(writer, n_events, &bytes)?;
140        Ok((bytes.len(), byte_size))
141    }
142}
143
144impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
145    fn encode_input(
146        &self,
147        events: Vec<Event>,
148        writer: &mut dyn io::Write,
149    ) -> io::Result<(usize, GroupedCountByteSize)> {
150        // Delegate to the specific encoder implementation
151        match &self.1 {
152            vector_lib::codecs::EncoderKind::Framed(encoder) => {
153                (self.0.clone(), *encoder.clone()).encode_input(events, writer)
154            }
155            #[cfg(feature = "codecs-arrow")]
156            vector_lib::codecs::EncoderKind::Batch(encoder) => {
157                (self.0.clone(), encoder.clone()).encode_input(events, writer)
158            }
159        }
160    }
161}
162
163/// Write the buffer to the writer. If the operation fails, emit an internal event which complies with the
164/// instrumentation spec- as this necessitates both an Error and EventsDropped event.
165///
166/// # Arguments
167///
168/// * `writer`           - The object implementing io::Write to write data to.
169/// * `n_events_pending` - The number of events that are dropped if this write fails.
170/// * `buf`              - The buffer to write.
171pub fn write_all(
172    writer: &mut dyn io::Write,
173    n_events_pending: usize,
174    buf: &[u8],
175) -> io::Result<()> {
176    writer.write_all(buf).inspect_err(|error| {
177        emit!(EncoderWriteError {
178            error,
179            count: n_events_pending,
180        });
181    })
182}
183
184pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
185where
186    F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
187    E: Into<io::Error> + 'static,
188{
189    struct Tracked<'inner> {
190        count: usize,
191        inner: &'inner mut dyn io::Write,
192    }
193
194    impl io::Write for Tracked<'_> {
195        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
196            #[allow(clippy::disallowed_methods)] // We pass on the result of `write` to the caller.
197            let n = self.inner.write(buf)?;
198            self.count += n;
199            Ok(n)
200        }
201
202        fn flush(&mut self) -> io::Result<()> {
203            self.inner.flush()
204        }
205    }
206
207    let mut tracked = Tracked { count: 0, inner };
208    f(&mut tracked, input).map_err(|e| e.into())?;
209    Ok(tracked.count)
210}
211
212#[cfg(test)]
213mod tests {
214    use std::{collections::BTreeMap, env, path::PathBuf};
215
216    use bytes::{BufMut, Bytes};
217    use vector_lib::{
218        codecs::{
219            CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
220            NewlineDelimitedEncoder, TextSerializerConfig,
221            encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
222        },
223        event::LogEvent,
224        internal_event::CountByteSize,
225        json_size::JsonSize,
226    };
227    use vrl::value::{KeyString, Value};
228
229    use super::*;
230
231    #[test]
232    fn test_encode_batch_json_empty() {
233        let encoding = (
234            Transformer::default(),
235            vector_lib::codecs::Encoder::<Framer>::new(
236                CharacterDelimitedEncoder::new(b',').into(),
237                JsonSerializerConfig::default().build().into(),
238            ),
239        );
240
241        let mut writer = Vec::new();
242        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
243        assert_eq!(written, 2);
244
245        assert_eq!(String::from_utf8(writer).unwrap(), "[]");
246        assert_eq!(
247            CountByteSize(0, JsonSize::zero()),
248            json_size.size().unwrap()
249        );
250    }
251
252    #[test]
253    fn test_encode_batch_json_single() {
254        let encoding = (
255            Transformer::default(),
256            vector_lib::codecs::Encoder::<Framer>::new(
257                CharacterDelimitedEncoder::new(b',').into(),
258                JsonSerializerConfig::default().build().into(),
259            ),
260        );
261
262        let mut writer = Vec::new();
263        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
264            KeyString::from("key"),
265            Value::from("value"),
266        )])))];
267
268        let input_json_size = input
269            .iter()
270            .map(|event| event.estimated_json_encoded_size_of())
271            .sum::<JsonSize>();
272
273        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
274        assert_eq!(written, 17);
275
276        assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
277        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
278    }
279
280    #[test]
281    fn test_encode_batch_json_multiple() {
282        let encoding = (
283            Transformer::default(),
284            vector_lib::codecs::Encoder::<Framer>::new(
285                CharacterDelimitedEncoder::new(b',').into(),
286                JsonSerializerConfig::default().build().into(),
287            ),
288        );
289
290        let input = vec![
291            Event::Log(LogEvent::from(BTreeMap::from([(
292                KeyString::from("key"),
293                Value::from("value1"),
294            )]))),
295            Event::Log(LogEvent::from(BTreeMap::from([(
296                KeyString::from("key"),
297                Value::from("value2"),
298            )]))),
299            Event::Log(LogEvent::from(BTreeMap::from([(
300                KeyString::from("key"),
301                Value::from("value3"),
302            )]))),
303        ];
304
305        let input_json_size = input
306            .iter()
307            .map(|event| event.estimated_json_encoded_size_of())
308            .sum::<JsonSize>();
309
310        let mut writer = Vec::new();
311        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
312        assert_eq!(written, 52);
313
314        assert_eq!(
315            String::from_utf8(writer).unwrap(),
316            r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
317        );
318
319        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
320    }
321
322    #[test]
323    fn test_encode_batch_ndjson_empty() {
324        let encoding = (
325            Transformer::default(),
326            vector_lib::codecs::Encoder::<Framer>::new(
327                NewlineDelimitedEncoder::default().into(),
328                JsonSerializerConfig::default().build().into(),
329            ),
330        );
331
332        let mut writer = Vec::new();
333        let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
334        assert_eq!(written, 0);
335
336        assert_eq!(String::from_utf8(writer).unwrap(), "");
337        assert_eq!(
338            CountByteSize(0, JsonSize::zero()),
339            json_size.size().unwrap()
340        );
341    }
342
343    #[test]
344    fn test_encode_batch_ndjson_single() {
345        let encoding = (
346            Transformer::default(),
347            vector_lib::codecs::Encoder::<Framer>::new(
348                NewlineDelimitedEncoder::default().into(),
349                JsonSerializerConfig::default().build().into(),
350            ),
351        );
352
353        let mut writer = Vec::new();
354        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
355            KeyString::from("key"),
356            Value::from("value"),
357        )])))];
358        let input_json_size = input
359            .iter()
360            .map(|event| event.estimated_json_encoded_size_of())
361            .sum::<JsonSize>();
362
363        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
364        assert_eq!(written, 16);
365
366        assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
367        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
368    }
369
370    #[test]
371    fn test_encode_batch_ndjson_multiple() {
372        let encoding = (
373            Transformer::default(),
374            vector_lib::codecs::Encoder::<Framer>::new(
375                NewlineDelimitedEncoder::default().into(),
376                JsonSerializerConfig::default().build().into(),
377            ),
378        );
379
380        let mut writer = Vec::new();
381        let input = vec![
382            Event::Log(LogEvent::from(BTreeMap::from([(
383                KeyString::from("key"),
384                Value::from("value1"),
385            )]))),
386            Event::Log(LogEvent::from(BTreeMap::from([(
387                KeyString::from("key"),
388                Value::from("value2"),
389            )]))),
390            Event::Log(LogEvent::from(BTreeMap::from([(
391                KeyString::from("key"),
392                Value::from("value3"),
393            )]))),
394        ];
395        let input_json_size = input
396            .iter()
397            .map(|event| event.estimated_json_encoded_size_of())
398            .sum::<JsonSize>();
399
400        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
401        assert_eq!(written, 51);
402
403        assert_eq!(
404            String::from_utf8(writer).unwrap(),
405            "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
406        );
407        assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
408    }
409
410    #[test]
411    fn test_encode_event_json() {
412        let encoding = (
413            Transformer::default(),
414            vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
415        );
416
417        let mut writer = Vec::new();
418        let input = Event::Log(LogEvent::from(BTreeMap::from([(
419            KeyString::from("key"),
420            Value::from("value"),
421        )])));
422        let input_json_size = input.estimated_json_encoded_size_of();
423
424        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
425        assert_eq!(written, 15);
426
427        assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
428        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
429    }
430
431    #[test]
432    fn test_encode_event_text() {
433        let encoding = (
434            Transformer::default(),
435            vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
436        );
437
438        let mut writer = Vec::new();
439        let input = Event::Log(LogEvent::from(BTreeMap::from([(
440            KeyString::from("message"),
441            Value::from("value"),
442        )])));
443        let input_json_size = input.estimated_json_encoded_size_of();
444
445        let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
446        assert_eq!(written, 5);
447
448        assert_eq!(String::from_utf8(writer).unwrap(), r"value");
449        assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
450    }
451
452    fn test_data_dir() -> PathBuf {
453        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
454    }
455
456    #[test]
457    fn test_encode_batch_protobuf_single() {
458        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
459        let input_proto_size = message_raw.len();
460
461        // default LengthDelimitedCoderOptions.length_field_length is 4
462        let mut buf = BytesMut::with_capacity(64);
463        buf.reserve(4 + input_proto_size);
464        buf.put_uint(input_proto_size as u64, 4);
465        buf.extend_from_slice(&message_raw[..]);
466        let expected_bytes = buf.freeze();
467
468        let config = ProtobufSerializerConfig {
469            protobuf: ProtobufSerializerOptions {
470                desc_file: test_data_dir().join("test_proto.desc"),
471                message_type: "test_proto.User".to_string(),
472                use_json_names: false,
473            },
474        };
475
476        let encoding = (
477            Transformer::default(),
478            vector_lib::codecs::Encoder::<Framer>::new(
479                LengthDelimitedEncoder::default().into(),
480                config.build().unwrap().into(),
481            ),
482        );
483
484        let mut writer = Vec::new();
485        let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
486            (KeyString::from("id"), Value::from("123")),
487            (KeyString::from("name"), Value::from("Alice")),
488            (KeyString::from("age"), Value::from(30)),
489            (
490                KeyString::from("emails"),
491                Value::from(vec!["alice@example.com", "alice@work.com"]),
492            ),
493        ])))];
494
495        let input_json_size = input
496            .iter()
497            .map(|event| event.estimated_json_encoded_size_of())
498            .sum::<JsonSize>();
499
500        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
501
502        assert_eq!(input_proto_size, 49);
503        assert_eq!(written, input_proto_size + 4);
504        assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
505        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
506    }
507
508    #[test]
509    fn test_encode_batch_protobuf_multiple() {
510        let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
511        let messages = vec![message_raw.clone(), message_raw.clone()];
512        let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
513
514        let mut buf = BytesMut::with_capacity(128);
515        for message in messages {
516            // default LengthDelimitedCoderOptions.length_field_length is 4
517            buf.reserve(4 + message.len());
518            buf.put_uint(message.len() as u64, 4);
519            buf.extend_from_slice(&message[..]);
520        }
521        let expected_bytes = buf.freeze();
522
523        let config = ProtobufSerializerConfig {
524            protobuf: ProtobufSerializerOptions {
525                desc_file: test_data_dir().join("test_proto.desc"),
526                message_type: "test_proto.User".to_string(),
527                use_json_names: false,
528            },
529        };
530
531        let encoding = (
532            Transformer::default(),
533            vector_lib::codecs::Encoder::<Framer>::new(
534                LengthDelimitedEncoder::default().into(),
535                config.build().unwrap().into(),
536            ),
537        );
538
539        let mut writer = Vec::new();
540        let input = vec![
541            Event::Log(LogEvent::from(BTreeMap::from([
542                (KeyString::from("id"), Value::from("123")),
543                (KeyString::from("name"), Value::from("Alice")),
544                (KeyString::from("age"), Value::from(30)),
545                (
546                    KeyString::from("emails"),
547                    Value::from(vec!["alice@example.com", "alice@work.com"]),
548                ),
549            ]))),
550            Event::Log(LogEvent::from(BTreeMap::from([
551                (KeyString::from("id"), Value::from("123")),
552                (KeyString::from("name"), Value::from("Alice")),
553                (KeyString::from("age"), Value::from(30)),
554                (
555                    KeyString::from("emails"),
556                    Value::from(vec!["alice@example.com", "alice@work.com"]),
557                ),
558            ]))),
559        ];
560
561        let input_json_size: JsonSize = input
562            .iter()
563            .map(|event| event.estimated_json_encoded_size_of())
564            .sum();
565
566        let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
567
568        assert_eq!(total_input_proto_size, 49 * 2);
569        assert_eq!(written, total_input_proto_size + 8);
570        assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
571        assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
572    }
573}