1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 codecs::{Transformer, encoding::Framer, internal_events::EncoderWriteError},
9 config::telemetry,
10 request_metadata::GroupedCountByteSize,
11};
12
13use crate::event::Event;
14
15pub trait Encoder<T> {
16 fn encode_input(
22 &self,
23 input: T,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::Encoder<Framer>) {
29 fn encode_input(
30 &self,
31 events: Vec<Event>,
32 writer: &mut dyn io::Write,
33 ) -> io::Result<(usize, GroupedCountByteSize)> {
34 let mut encoder = self.1.clone();
35 let mut bytes_written = 0;
36 let mut n_events_pending = events.len();
37 let is_empty = events.is_empty();
38 let batch_prefix = encoder.batch_prefix();
39 write_all(writer, n_events_pending, batch_prefix)?;
40 bytes_written += batch_prefix.len();
41
42 let mut byte_size = telemetry().create_request_count_byte_size();
43
44 for (position, mut event) in events.into_iter().with_position() {
45 self.0.transform(&mut event);
46
47 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51 let mut bytes = BytesMut::new();
52 match (position, encoder.framer()) {
53 (
54 Position::Last | Position::Only,
55 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56 ) => {
57 encoder
58 .serialize(event, &mut bytes)
59 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60 }
61 _ => {
62 encoder
63 .encode(event, &mut bytes)
64 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65 }
66 }
67 write_all(writer, n_events_pending, &bytes)?;
68 bytes_written += bytes.len();
69 n_events_pending -= 1;
70 }
71
72 let batch_suffix = encoder.batch_suffix(is_empty);
73 assert!(n_events_pending == 0);
74 write_all(writer, 0, batch_suffix)?;
75 bytes_written += batch_suffix.len();
76
77 Ok((bytes_written, byte_size))
78 }
79}
80
81impl Encoder<Event> for (Transformer, vector_lib::codecs::Encoder<()>) {
82 fn encode_input(
83 &self,
84 mut event: Event,
85 writer: &mut dyn io::Write,
86 ) -> io::Result<(usize, GroupedCountByteSize)> {
87 let mut encoder = self.1.clone();
88 self.0.transform(&mut event);
89
90 let mut byte_size = telemetry().create_request_count_byte_size();
91 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93 let mut bytes = BytesMut::new();
94 encoder
95 .serialize(event, &mut bytes)
96 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97 write_all(writer, 1, &bytes)?;
98 Ok((bytes.len(), byte_size))
99 }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::BatchEncoder) {
104 fn encode_input(
105 &self,
106 events: Vec<Event>,
107 writer: &mut dyn io::Write,
108 ) -> io::Result<(usize, GroupedCountByteSize)> {
109 use tokio_util::codec::Encoder as _;
110
111 let mut encoder = self.1.clone();
112 let mut byte_size = telemetry().create_request_count_byte_size();
113 let n_events = events.len();
114 let mut transformed_events = Vec::with_capacity(n_events);
115
116 for mut event in events {
117 self.0.transform(&mut event);
118 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119 transformed_events.push(event);
120 }
121
122 let mut bytes = BytesMut::new();
123 encoder
124 .encode(transformed_events, &mut bytes)
125 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
126
127 write_all(writer, n_events, &bytes)?;
128 Ok((bytes.len(), byte_size))
129 }
130}
131
132impl Encoder<Vec<Event>> for (Transformer, vector_lib::codecs::EncoderKind) {
133 fn encode_input(
134 &self,
135 events: Vec<Event>,
136 writer: &mut dyn io::Write,
137 ) -> io::Result<(usize, GroupedCountByteSize)> {
138 match &self.1 {
140 vector_lib::codecs::EncoderKind::Framed(encoder) => {
141 (self.0.clone(), *encoder.clone()).encode_input(events, writer)
142 }
143 #[cfg(feature = "codecs-arrow")]
144 vector_lib::codecs::EncoderKind::Batch(encoder) => {
145 (self.0.clone(), encoder.clone()).encode_input(events, writer)
146 }
147 }
148 }
149}
150
151pub fn write_all(
160 writer: &mut dyn io::Write,
161 n_events_pending: usize,
162 buf: &[u8],
163) -> io::Result<()> {
164 writer.write_all(buf).inspect_err(|error| {
165 emit!(EncoderWriteError {
166 error,
167 count: n_events_pending,
168 });
169 })
170}
171
172pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
173where
174 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
175 E: Into<io::Error> + 'static,
176{
177 struct Tracked<'inner> {
178 count: usize,
179 inner: &'inner mut dyn io::Write,
180 }
181
182 impl io::Write for Tracked<'_> {
183 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
184 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
186 self.count += n;
187 Ok(n)
188 }
189
190 fn flush(&mut self) -> io::Result<()> {
191 self.inner.flush()
192 }
193 }
194
195 let mut tracked = Tracked { count: 0, inner };
196 f(&mut tracked, input).map_err(|e| e.into())?;
197 Ok(tracked.count)
198}
199
200#[cfg(test)]
201mod tests {
202 use std::{collections::BTreeMap, env, path::PathBuf};
203
204 use bytes::{BufMut, Bytes};
205 use vector_lib::{
206 codecs::{
207 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
208 NewlineDelimitedEncoder, TextSerializerConfig,
209 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
210 },
211 event::LogEvent,
212 internal_event::CountByteSize,
213 json_size::JsonSize,
214 };
215 use vrl::value::{KeyString, Value};
216
217 use super::*;
218
219 #[test]
220 fn test_encode_batch_json_empty() {
221 let encoding = (
222 Transformer::default(),
223 vector_lib::codecs::Encoder::<Framer>::new(
224 CharacterDelimitedEncoder::new(b',').into(),
225 JsonSerializerConfig::default().build().into(),
226 ),
227 );
228
229 let mut writer = Vec::new();
230 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
231 assert_eq!(written, 2);
232
233 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
234 assert_eq!(
235 CountByteSize(0, JsonSize::zero()),
236 json_size.size().unwrap()
237 );
238 }
239
240 #[test]
241 fn test_encode_batch_json_single() {
242 let encoding = (
243 Transformer::default(),
244 vector_lib::codecs::Encoder::<Framer>::new(
245 CharacterDelimitedEncoder::new(b',').into(),
246 JsonSerializerConfig::default().build().into(),
247 ),
248 );
249
250 let mut writer = Vec::new();
251 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
252 KeyString::from("key"),
253 Value::from("value"),
254 )])))];
255
256 let input_json_size = input
257 .iter()
258 .map(|event| event.estimated_json_encoded_size_of())
259 .sum::<JsonSize>();
260
261 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
262 assert_eq!(written, 17);
263
264 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
265 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
266 }
267
268 #[test]
269 fn test_encode_batch_json_multiple() {
270 let encoding = (
271 Transformer::default(),
272 vector_lib::codecs::Encoder::<Framer>::new(
273 CharacterDelimitedEncoder::new(b',').into(),
274 JsonSerializerConfig::default().build().into(),
275 ),
276 );
277
278 let input = vec![
279 Event::Log(LogEvent::from(BTreeMap::from([(
280 KeyString::from("key"),
281 Value::from("value1"),
282 )]))),
283 Event::Log(LogEvent::from(BTreeMap::from([(
284 KeyString::from("key"),
285 Value::from("value2"),
286 )]))),
287 Event::Log(LogEvent::from(BTreeMap::from([(
288 KeyString::from("key"),
289 Value::from("value3"),
290 )]))),
291 ];
292
293 let input_json_size = input
294 .iter()
295 .map(|event| event.estimated_json_encoded_size_of())
296 .sum::<JsonSize>();
297
298 let mut writer = Vec::new();
299 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
300 assert_eq!(written, 52);
301
302 assert_eq!(
303 String::from_utf8(writer).unwrap(),
304 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
305 );
306
307 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
308 }
309
310 #[test]
311 fn test_encode_batch_ndjson_empty() {
312 let encoding = (
313 Transformer::default(),
314 vector_lib::codecs::Encoder::<Framer>::new(
315 NewlineDelimitedEncoder::default().into(),
316 JsonSerializerConfig::default().build().into(),
317 ),
318 );
319
320 let mut writer = Vec::new();
321 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
322 assert_eq!(written, 0);
323
324 assert_eq!(String::from_utf8(writer).unwrap(), "");
325 assert_eq!(
326 CountByteSize(0, JsonSize::zero()),
327 json_size.size().unwrap()
328 );
329 }
330
331 #[test]
332 fn test_encode_batch_ndjson_single() {
333 let encoding = (
334 Transformer::default(),
335 vector_lib::codecs::Encoder::<Framer>::new(
336 NewlineDelimitedEncoder::default().into(),
337 JsonSerializerConfig::default().build().into(),
338 ),
339 );
340
341 let mut writer = Vec::new();
342 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
343 KeyString::from("key"),
344 Value::from("value"),
345 )])))];
346 let input_json_size = input
347 .iter()
348 .map(|event| event.estimated_json_encoded_size_of())
349 .sum::<JsonSize>();
350
351 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
352 assert_eq!(written, 16);
353
354 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
355 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
356 }
357
358 #[test]
359 fn test_encode_batch_ndjson_multiple() {
360 let encoding = (
361 Transformer::default(),
362 vector_lib::codecs::Encoder::<Framer>::new(
363 NewlineDelimitedEncoder::default().into(),
364 JsonSerializerConfig::default().build().into(),
365 ),
366 );
367
368 let mut writer = Vec::new();
369 let input = vec![
370 Event::Log(LogEvent::from(BTreeMap::from([(
371 KeyString::from("key"),
372 Value::from("value1"),
373 )]))),
374 Event::Log(LogEvent::from(BTreeMap::from([(
375 KeyString::from("key"),
376 Value::from("value2"),
377 )]))),
378 Event::Log(LogEvent::from(BTreeMap::from([(
379 KeyString::from("key"),
380 Value::from("value3"),
381 )]))),
382 ];
383 let input_json_size = input
384 .iter()
385 .map(|event| event.estimated_json_encoded_size_of())
386 .sum::<JsonSize>();
387
388 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
389 assert_eq!(written, 51);
390
391 assert_eq!(
392 String::from_utf8(writer).unwrap(),
393 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
394 );
395 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
396 }
397
398 #[test]
399 fn test_encode_event_json() {
400 let encoding = (
401 Transformer::default(),
402 vector_lib::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
403 );
404
405 let mut writer = Vec::new();
406 let input = Event::Log(LogEvent::from(BTreeMap::from([(
407 KeyString::from("key"),
408 Value::from("value"),
409 )])));
410 let input_json_size = input.estimated_json_encoded_size_of();
411
412 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
413 assert_eq!(written, 15);
414
415 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
416 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
417 }
418
419 #[test]
420 fn test_encode_event_text() {
421 let encoding = (
422 Transformer::default(),
423 vector_lib::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
424 );
425
426 let mut writer = Vec::new();
427 let input = Event::Log(LogEvent::from(BTreeMap::from([(
428 KeyString::from("message"),
429 Value::from("value"),
430 )])));
431 let input_json_size = input.estimated_json_encoded_size_of();
432
433 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
434 assert_eq!(written, 5);
435
436 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
437 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
438 }
439
440 fn test_data_dir() -> PathBuf {
441 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
442 }
443
444 #[test]
445 fn test_encode_batch_protobuf_single() {
446 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
447 let input_proto_size = message_raw.len();
448
449 let mut buf = BytesMut::with_capacity(64);
451 buf.reserve(4 + input_proto_size);
452 buf.put_uint(input_proto_size as u64, 4);
453 buf.extend_from_slice(&message_raw[..]);
454 let expected_bytes = buf.freeze();
455
456 let config = ProtobufSerializerConfig {
457 protobuf: ProtobufSerializerOptions {
458 desc_file: test_data_dir().join("test_proto.desc"),
459 message_type: "test_proto.User".to_string(),
460 use_json_names: false,
461 },
462 };
463
464 let encoding = (
465 Transformer::default(),
466 vector_lib::codecs::Encoder::<Framer>::new(
467 LengthDelimitedEncoder::default().into(),
468 config.build().unwrap().into(),
469 ),
470 );
471
472 let mut writer = Vec::new();
473 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
474 (KeyString::from("id"), Value::from("123")),
475 (KeyString::from("name"), Value::from("Alice")),
476 (KeyString::from("age"), Value::from(30)),
477 (
478 KeyString::from("emails"),
479 Value::from(vec!["alice@example.com", "alice@work.com"]),
480 ),
481 ])))];
482
483 let input_json_size = input
484 .iter()
485 .map(|event| event.estimated_json_encoded_size_of())
486 .sum::<JsonSize>();
487
488 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
489
490 assert_eq!(input_proto_size, 49);
491 assert_eq!(written, input_proto_size + 4);
492 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
493 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
494 }
495
496 #[test]
497 fn test_encode_batch_protobuf_multiple() {
498 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
499 let messages = vec![message_raw.clone(), message_raw.clone()];
500 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
501
502 let mut buf = BytesMut::with_capacity(128);
503 for message in messages {
504 buf.reserve(4 + message.len());
506 buf.put_uint(message.len() as u64, 4);
507 buf.extend_from_slice(&message[..]);
508 }
509 let expected_bytes = buf.freeze();
510
511 let config = ProtobufSerializerConfig {
512 protobuf: ProtobufSerializerOptions {
513 desc_file: test_data_dir().join("test_proto.desc"),
514 message_type: "test_proto.User".to_string(),
515 use_json_names: false,
516 },
517 };
518
519 let encoding = (
520 Transformer::default(),
521 vector_lib::codecs::Encoder::<Framer>::new(
522 LengthDelimitedEncoder::default().into(),
523 config.build().unwrap().into(),
524 ),
525 );
526
527 let mut writer = Vec::new();
528 let input = vec![
529 Event::Log(LogEvent::from(BTreeMap::from([
530 (KeyString::from("id"), Value::from("123")),
531 (KeyString::from("name"), Value::from("Alice")),
532 (KeyString::from("age"), Value::from(30)),
533 (
534 KeyString::from("emails"),
535 Value::from(vec!["alice@example.com", "alice@work.com"]),
536 ),
537 ]))),
538 Event::Log(LogEvent::from(BTreeMap::from([
539 (KeyString::from("id"), Value::from("123")),
540 (KeyString::from("name"), Value::from("Alice")),
541 (KeyString::from("age"), Value::from(30)),
542 (
543 KeyString::from("emails"),
544 Value::from(vec!["alice@example.com", "alice@work.com"]),
545 ),
546 ]))),
547 ];
548
549 let input_json_size: JsonSize = input
550 .iter()
551 .map(|event| event.estimated_json_encoded_size_of())
552 .sum();
553
554 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
555
556 assert_eq!(total_input_proto_size, 49 * 2);
557 assert_eq!(written, total_input_proto_size + 8);
558 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
559 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
560 }
561}