1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9 config::telemetry,
10 request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14#[cfg(feature = "codecs-arrow")]
15use vector_lib::codecs::internal_events::EncoderNullConstraintError;
16
17pub trait Encoder<T> {
18 fn encode_input(
24 &self,
25 input: T,
26 writer: &mut dyn io::Write,
27 ) -> io::Result<(usize, GroupedCountByteSize)>;
28}
29
30impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
31 fn encode_input(
32 &self,
33 events: Vec<Event>,
34 writer: &mut dyn io::Write,
35 ) -> io::Result<(usize, GroupedCountByteSize)> {
36 let mut encoder = self.1.clone();
37 let mut bytes_written = 0;
38 let mut n_events_pending = events.len();
39 let is_empty = events.is_empty();
40 let batch_prefix = encoder.batch_prefix();
41 write_all(writer, n_events_pending, batch_prefix)?;
42 bytes_written += batch_prefix.len();
43
44 let mut byte_size = telemetry().create_request_count_byte_size();
45
46 for (position, mut event) in events.into_iter().with_position() {
47 self.0.transform(&mut event);
48
49 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
52
53 let mut bytes = BytesMut::new();
54 match (position, encoder.framer()) {
55 (
56 Position::Last | Position::Only,
57 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
58 ) => {
59 encoder
60 .serialize(event, &mut bytes)
61 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
62 }
63 _ => {
64 encoder
65 .encode(event, &mut bytes)
66 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
67 }
68 }
69 write_all(writer, n_events_pending, &bytes)?;
70 bytes_written += bytes.len();
71 n_events_pending -= 1;
72 }
73
74 let batch_suffix = encoder.batch_suffix(is_empty);
75 assert!(n_events_pending == 0);
76 write_all(writer, 0, batch_suffix)?;
77 bytes_written += batch_suffix.len();
78
79 Ok((bytes_written, byte_size))
80 }
81}
82
83impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
84 fn encode_input(
85 &self,
86 mut event: Event,
87 writer: &mut dyn io::Write,
88 ) -> io::Result<(usize, GroupedCountByteSize)> {
89 let mut encoder = self.1.clone();
90 self.0.transform(&mut event);
91
92 let mut byte_size = telemetry().create_request_count_byte_size();
93 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
94
95 let mut bytes = BytesMut::new();
96 encoder
97 .serialize(event, &mut bytes)
98 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
99 write_all(writer, 1, &bytes)?;
100 Ok((bytes.len(), byte_size))
101 }
102}
103
104#[cfg(feature = "codecs-arrow")]
105impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
106 fn encode_input(
107 &self,
108 events: Vec<Event>,
109 writer: &mut dyn io::Write,
110 ) -> io::Result<(usize, GroupedCountByteSize)> {
111 use tokio_util::codec::Encoder as _;
112
113 let mut encoder = self.1.clone();
114 let mut byte_size = telemetry().create_request_count_byte_size();
115 let n_events = events.len();
116 let mut transformed_events = Vec::with_capacity(n_events);
117
118 for mut event in events {
119 self.0.transform(&mut event);
120 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
121 transformed_events.push(event);
122 }
123
124 let mut bytes = BytesMut::new();
125 encoder
126 .encode(transformed_events, &mut bytes)
127 .map_err(|error| {
128 if let vector_lib::codecs::encoding::Error::SchemaConstraintViolation(
129 ref constraint_error,
130 ) = error
131 {
132 emit!(EncoderNullConstraintError {
133 error: constraint_error
134 });
135 }
136 io::Error::new(io::ErrorKind::InvalidData, error)
137 })?;
138
139 write_all(writer, n_events, &bytes)?;
140 Ok((bytes.len(), byte_size))
141 }
142}
143
144impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
145 fn encode_input(
146 &self,
147 events: Vec<Event>,
148 writer: &mut dyn io::Write,
149 ) -> io::Result<(usize, GroupedCountByteSize)> {
150 match &self.1 {
152 vector_lib::codecs::EncoderKind::Framed(encoder) => {
153 (self.0.clone(), *encoder.clone()).encode_input(events, writer)
154 }
155 #[cfg(feature = "codecs-arrow")]
156 vector_lib::codecs::EncoderKind::Batch(encoder) => {
157 (self.0.clone(), encoder.clone()).encode_input(events, writer)
158 }
159 }
160 }
161}
162
163pub fn write_all(
172 writer: &mut dyn io::Write,
173 n_events_pending: usize,
174 buf: &[u8],
175) -> io::Result<()> {
176 writer.write_all(buf).inspect_err(|error| {
177 emit!(EncoderWriteError {
178 error,
179 count: n_events_pending,
180 });
181 })
182}
183
184pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
185where
186 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
187 E: Into<io::Error> + 'static,
188{
189 struct Tracked<'inner> {
190 count: usize,
191 inner: &'inner mut dyn io::Write,
192 }
193
194 impl io::Write for Tracked<'_> {
195 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
196 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
198 self.count += n;
199 Ok(n)
200 }
201
202 fn flush(&mut self) -> io::Result<()> {
203 self.inner.flush()
204 }
205 }
206
207 let mut tracked = Tracked { count: 0, inner };
208 f(&mut tracked, input).map_err(|e| e.into())?;
209 Ok(tracked.count)
210}
211
212#[cfg(test)]
213mod tests {
214 use std::{collections::BTreeMap, env, path::PathBuf};
215
216 use bytes::{BufMut, Bytes};
217 use vector_lib::{
218 codecs::{
219 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
220 NewlineDelimitedEncoder, TextSerializerConfig,
221 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
222 },
223 event::LogEvent,
224 internal_event::CountByteSize,
225 json_size::JsonSize,
226 };
227 use vrl::value::{KeyString, Value};
228
229 use super::*;
230
231 #[test]
232 fn test_encode_batch_json_empty() {
233 let encoding = (
234 Transformer::default(),
235 vector_lib::codecs::Encoder::<Framer>::new(
236 CharacterDelimitedEncoder::new(b',').into(),
237 JsonSerializerConfig::default().build().into(),
238 ),
239 );
240
241 let mut writer = Vec::new();
242 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
243 assert_eq!(written, 2);
244
245 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
246 assert_eq!(
247 CountByteSize(0, JsonSize::zero()),
248 json_size.size().unwrap()
249 );
250 }
251
252 #[test]
253 fn test_encode_batch_json_single() {
254 let encoding = (
255 Transformer::default(),
256 vector_lib::codecs::Encoder::<Framer>::new(
257 CharacterDelimitedEncoder::new(b',').into(),
258 JsonSerializerConfig::default().build().into(),
259 ),
260 );
261
262 let mut writer = Vec::new();
263 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
264 KeyString::from("key"),
265 Value::from("value"),
266 )])))];
267
268 let input_json_size = input
269 .iter()
270 .map(|event| event.estimated_json_encoded_size_of())
271 .sum::<JsonSize>();
272
273 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
274 assert_eq!(written, 17);
275
276 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
277 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
278 }
279
280 #[test]
281 fn test_encode_batch_json_multiple() {
282 let encoding = (
283 Transformer::default(),
284 vector_lib::codecs::Encoder::<Framer>::new(
285 CharacterDelimitedEncoder::new(b',').into(),
286 JsonSerializerConfig::default().build().into(),
287 ),
288 );
289
290 let input = vec![
291 Event::Log(LogEvent::from(BTreeMap::from([(
292 KeyString::from("key"),
293 Value::from("value1"),
294 )]))),
295 Event::Log(LogEvent::from(BTreeMap::from([(
296 KeyString::from("key"),
297 Value::from("value2"),
298 )]))),
299 Event::Log(LogEvent::from(BTreeMap::from([(
300 KeyString::from("key"),
301 Value::from("value3"),
302 )]))),
303 ];
304
305 let input_json_size = input
306 .iter()
307 .map(|event| event.estimated_json_encoded_size_of())
308 .sum::<JsonSize>();
309
310 let mut writer = Vec::new();
311 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
312 assert_eq!(written, 52);
313
314 assert_eq!(
315 String::from_utf8(writer).unwrap(),
316 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
317 );
318
319 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
320 }
321
322 #[test]
323 fn test_encode_batch_ndjson_empty() {
324 let encoding = (
325 Transformer::default(),
326 vector_lib::codecs::Encoder::<Framer>::new(
327 NewlineDelimitedEncoder::default().into(),
328 JsonSerializerConfig::default().build().into(),
329 ),
330 );
331
332 let mut writer = Vec::new();
333 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
334 assert_eq!(written, 0);
335
336 assert_eq!(String::from_utf8(writer).unwrap(), "");
337 assert_eq!(
338 CountByteSize(0, JsonSize::zero()),
339 json_size.size().unwrap()
340 );
341 }
342
343 #[test]
344 fn test_encode_batch_ndjson_single() {
345 let encoding = (
346 Transformer::default(),
347 vector_lib::codecs::Encoder::<Framer>::new(
348 NewlineDelimitedEncoder::default().into(),
349 JsonSerializerConfig::default().build().into(),
350 ),
351 );
352
353 let mut writer = Vec::new();
354 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
355 KeyString::from("key"),
356 Value::from("value"),
357 )])))];
358 let input_json_size = input
359 .iter()
360 .map(|event| event.estimated_json_encoded_size_of())
361 .sum::<JsonSize>();
362
363 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
364 assert_eq!(written, 16);
365
366 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
367 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
368 }
369
370 #[test]
371 fn test_encode_batch_ndjson_multiple() {
372 let encoding = (
373 Transformer::default(),
374 vector_lib::codecs::Encoder::<Framer>::new(
375 NewlineDelimitedEncoder::default().into(),
376 JsonSerializerConfig::default().build().into(),
377 ),
378 );
379
380 let mut writer = Vec::new();
381 let input = vec![
382 Event::Log(LogEvent::from(BTreeMap::from([(
383 KeyString::from("key"),
384 Value::from("value1"),
385 )]))),
386 Event::Log(LogEvent::from(BTreeMap::from([(
387 KeyString::from("key"),
388 Value::from("value2"),
389 )]))),
390 Event::Log(LogEvent::from(BTreeMap::from([(
391 KeyString::from("key"),
392 Value::from("value3"),
393 )]))),
394 ];
395 let input_json_size = input
396 .iter()
397 .map(|event| event.estimated_json_encoded_size_of())
398 .sum::<JsonSize>();
399
400 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
401 assert_eq!(written, 51);
402
403 assert_eq!(
404 String::from_utf8(writer).unwrap(),
405 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
406 );
407 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
408 }
409
410 #[test]
411 fn test_encode_event_json() {
412 let encoding = (
413 Transformer::default(),
414 vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
415 );
416
417 let mut writer = Vec::new();
418 let input = Event::Log(LogEvent::from(BTreeMap::from([(
419 KeyString::from("key"),
420 Value::from("value"),
421 )])));
422 let input_json_size = input.estimated_json_encoded_size_of();
423
424 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
425 assert_eq!(written, 15);
426
427 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
428 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
429 }
430
431 #[test]
432 fn test_encode_event_text() {
433 let encoding = (
434 Transformer::default(),
435 vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
436 );
437
438 let mut writer = Vec::new();
439 let input = Event::Log(LogEvent::from(BTreeMap::from([(
440 KeyString::from("message"),
441 Value::from("value"),
442 )])));
443 let input_json_size = input.estimated_json_encoded_size_of();
444
445 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
446 assert_eq!(written, 5);
447
448 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
449 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
450 }
451
452 fn test_data_dir() -> PathBuf {
453 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
454 }
455
456 #[test]
457 fn test_encode_batch_protobuf_single() {
458 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
459 let input_proto_size = message_raw.len();
460
461 let mut buf = BytesMut::with_capacity(64);
463 buf.reserve(4 + input_proto_size);
464 buf.put_uint(input_proto_size as u64, 4);
465 buf.extend_from_slice(&message_raw[..]);
466 let expected_bytes = buf.freeze();
467
468 let config = ProtobufSerializerConfig {
469 protobuf: ProtobufSerializerOptions {
470 desc_file: test_data_dir().join("test_proto.desc"),
471 message_type: "test_proto.User".to_string(),
472 use_json_names: false,
473 },
474 };
475
476 let encoding = (
477 Transformer::default(),
478 vector_lib::codecs::Encoder::<Framer>::new(
479 LengthDelimitedEncoder::default().into(),
480 config.build().unwrap().into(),
481 ),
482 );
483
484 let mut writer = Vec::new();
485 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
486 (KeyString::from("id"), Value::from("123")),
487 (KeyString::from("name"), Value::from("Alice")),
488 (KeyString::from("age"), Value::from(30)),
489 (
490 KeyString::from("emails"),
491 Value::from(vec!["alice@example.com", "alice@work.com"]),
492 ),
493 ])))];
494
495 let input_json_size = input
496 .iter()
497 .map(|event| event.estimated_json_encoded_size_of())
498 .sum::<JsonSize>();
499
500 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
501
502 assert_eq!(input_proto_size, 49);
503 assert_eq!(written, input_proto_size + 4);
504 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
505 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
506 }
507
508 #[test]
509 fn test_encode_batch_protobuf_multiple() {
510 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
511 let messages = vec![message_raw.clone(), message_raw.clone()];
512 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
513
514 let mut buf = BytesMut::with_capacity(128);
515 for message in messages {
516 buf.reserve(4 + message.len());
518 buf.put_uint(message.len() as u64, 4);
519 buf.extend_from_slice(&message[..]);
520 }
521 let expected_bytes = buf.freeze();
522
523 let config = ProtobufSerializerConfig {
524 protobuf: ProtobufSerializerOptions {
525 desc_file: test_data_dir().join("test_proto.desc"),
526 message_type: "test_proto.User".to_string(),
527 use_json_names: false,
528 },
529 };
530
531 let encoding = (
532 Transformer::default(),
533 vector_lib::codecs::Encoder::<Framer>::new(
534 LengthDelimitedEncoder::default().into(),
535 config.build().unwrap().into(),
536 ),
537 );
538
539 let mut writer = Vec::new();
540 let input = vec![
541 Event::Log(LogEvent::from(BTreeMap::from([
542 (KeyString::from("id"), Value::from("123")),
543 (KeyString::from("name"), Value::from("Alice")),
544 (KeyString::from("age"), Value::from(30)),
545 (
546 KeyString::from("emails"),
547 Value::from(vec!["alice@example.com", "alice@work.com"]),
548 ),
549 ]))),
550 Event::Log(LogEvent::from(BTreeMap::from([
551 (KeyString::from("id"), Value::from("123")),
552 (KeyString::from("name"), Value::from("Alice")),
553 (KeyString::from("age"), Value::from(30)),
554 (
555 KeyString::from("emails"),
556 Value::from(vec!["alice@example.com", "alice@work.com"]),
557 ),
558 ]))),
559 ];
560
561 let input_json_size: JsonSize = input
562 .iter()
563 .map(|event| event.estimated_json_encoded_size_of())
564 .sum();
565
566 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
567
568 assert_eq!(total_input_proto_size, 49 * 2);
569 assert_eq!(written, total_input_proto_size + 8);
570 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
571 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
572 }
573}