1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, config::telemetry,
8 request_metadata::GroupedCountByteSize,
9};
10
11#[cfg(feature = "codecs-arrow")]
12use crate::internal_events::EncoderNullConstraintError;
13use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
14
15pub trait Encoder<T> {
16 fn encode_input(
22 &self,
23 input: T,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)>;
26}
27
28impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
29 fn encode_input(
30 &self,
31 events: Vec<Event>,
32 writer: &mut dyn io::Write,
33 ) -> io::Result<(usize, GroupedCountByteSize)> {
34 let mut encoder = self.1.clone();
35 let mut bytes_written = 0;
36 let mut n_events_pending = events.len();
37 let is_empty = events.is_empty();
38 let batch_prefix = encoder.batch_prefix();
39 write_all(writer, n_events_pending, batch_prefix)?;
40 bytes_written += batch_prefix.len();
41
42 let mut byte_size = telemetry().create_request_count_byte_size();
43
44 for (position, mut event) in events.into_iter().with_position() {
45 self.0.transform(&mut event);
46
47 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
50
51 let mut bytes = BytesMut::new();
52 match (position, encoder.framer()) {
53 (
54 Position::Last | Position::Only,
55 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
56 ) => {
57 encoder
58 .serialize(event, &mut bytes)
59 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
60 }
61 _ => {
62 encoder
63 .encode(event, &mut bytes)
64 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
65 }
66 }
67 write_all(writer, n_events_pending, &bytes)?;
68 bytes_written += bytes.len();
69 n_events_pending -= 1;
70 }
71
72 let batch_suffix = encoder.batch_suffix(is_empty);
73 assert!(n_events_pending == 0);
74 write_all(writer, 0, batch_suffix)?;
75 bytes_written += batch_suffix.len();
76
77 Ok((bytes_written, byte_size))
78 }
79}
80
81impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
82 fn encode_input(
83 &self,
84 mut event: Event,
85 writer: &mut dyn io::Write,
86 ) -> io::Result<(usize, GroupedCountByteSize)> {
87 let mut encoder = self.1.clone();
88 self.0.transform(&mut event);
89
90 let mut byte_size = telemetry().create_request_count_byte_size();
91 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
92
93 let mut bytes = BytesMut::new();
94 encoder
95 .serialize(event, &mut bytes)
96 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
97 write_all(writer, 1, &bytes)?;
98 Ok((bytes.len(), byte_size))
99 }
100}
101
102#[cfg(feature = "codecs-arrow")]
103impl Encoder<Vec<Event>> for (Transformer, crate::codecs::BatchEncoder) {
104 fn encode_input(
105 &self,
106 events: Vec<Event>,
107 writer: &mut dyn io::Write,
108 ) -> io::Result<(usize, GroupedCountByteSize)> {
109 use tokio_util::codec::Encoder as _;
110
111 let mut encoder = self.1.clone();
112 let mut byte_size = telemetry().create_request_count_byte_size();
113 let n_events = events.len();
114 let mut transformed_events = Vec::with_capacity(n_events);
115
116 for mut event in events {
117 self.0.transform(&mut event);
118 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
119 transformed_events.push(event);
120 }
121
122 let mut bytes = BytesMut::new();
123 encoder
124 .encode(transformed_events, &mut bytes)
125 .map_err(|error| {
126 if let vector_lib::codecs::encoding::Error::SchemaConstraintViolation(
127 ref constraint_error,
128 ) = error
129 {
130 emit!(EncoderNullConstraintError {
131 error: constraint_error
132 });
133 }
134 io::Error::new(io::ErrorKind::InvalidData, error)
135 })?;
136
137 write_all(writer, n_events, &bytes)?;
138 Ok((bytes.len(), byte_size))
139 }
140}
141
142impl Encoder<Vec<Event>> for (Transformer, crate::codecs::EncoderKind) {
143 fn encode_input(
144 &self,
145 events: Vec<Event>,
146 writer: &mut dyn io::Write,
147 ) -> io::Result<(usize, GroupedCountByteSize)> {
148 match &self.1 {
150 crate::codecs::EncoderKind::Framed(encoder) => {
151 (self.0.clone(), *encoder.clone()).encode_input(events, writer)
152 }
153 #[cfg(feature = "codecs-arrow")]
154 crate::codecs::EncoderKind::Batch(encoder) => {
155 (self.0.clone(), encoder.clone()).encode_input(events, writer)
156 }
157 }
158 }
159}
160
161pub fn write_all(
170 writer: &mut dyn io::Write,
171 n_events_pending: usize,
172 buf: &[u8],
173) -> io::Result<()> {
174 writer.write_all(buf).inspect_err(|error| {
175 emit!(EncoderWriteError {
176 error,
177 count: n_events_pending,
178 });
179 })
180}
181
182pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
183where
184 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
185 E: Into<io::Error> + 'static,
186{
187 struct Tracked<'inner> {
188 count: usize,
189 inner: &'inner mut dyn io::Write,
190 }
191
192 impl io::Write for Tracked<'_> {
193 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
194 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
196 self.count += n;
197 Ok(n)
198 }
199
200 fn flush(&mut self) -> io::Result<()> {
201 self.inner.flush()
202 }
203 }
204
205 let mut tracked = Tracked { count: 0, inner };
206 f(&mut tracked, input).map_err(|e| e.into())?;
207 Ok(tracked.count)
208}
209
210#[cfg(test)]
211mod tests {
212 use std::{collections::BTreeMap, env, path::PathBuf};
213
214 use bytes::{BufMut, Bytes};
215 use vector_lib::{
216 codecs::{
217 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
218 NewlineDelimitedEncoder, TextSerializerConfig,
219 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
220 },
221 event::LogEvent,
222 internal_event::CountByteSize,
223 json_size::JsonSize,
224 };
225 use vrl::value::{KeyString, Value};
226
227 use super::*;
228
229 #[test]
230 fn test_encode_batch_json_empty() {
231 let encoding = (
232 Transformer::default(),
233 crate::codecs::Encoder::<Framer>::new(
234 CharacterDelimitedEncoder::new(b',').into(),
235 JsonSerializerConfig::default().build().into(),
236 ),
237 );
238
239 let mut writer = Vec::new();
240 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
241 assert_eq!(written, 2);
242
243 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
244 assert_eq!(
245 CountByteSize(0, JsonSize::zero()),
246 json_size.size().unwrap()
247 );
248 }
249
250 #[test]
251 fn test_encode_batch_json_single() {
252 let encoding = (
253 Transformer::default(),
254 crate::codecs::Encoder::<Framer>::new(
255 CharacterDelimitedEncoder::new(b',').into(),
256 JsonSerializerConfig::default().build().into(),
257 ),
258 );
259
260 let mut writer = Vec::new();
261 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
262 KeyString::from("key"),
263 Value::from("value"),
264 )])))];
265
266 let input_json_size = input
267 .iter()
268 .map(|event| event.estimated_json_encoded_size_of())
269 .sum::<JsonSize>();
270
271 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
272 assert_eq!(written, 17);
273
274 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
275 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
276 }
277
278 #[test]
279 fn test_encode_batch_json_multiple() {
280 let encoding = (
281 Transformer::default(),
282 crate::codecs::Encoder::<Framer>::new(
283 CharacterDelimitedEncoder::new(b',').into(),
284 JsonSerializerConfig::default().build().into(),
285 ),
286 );
287
288 let input = vec![
289 Event::Log(LogEvent::from(BTreeMap::from([(
290 KeyString::from("key"),
291 Value::from("value1"),
292 )]))),
293 Event::Log(LogEvent::from(BTreeMap::from([(
294 KeyString::from("key"),
295 Value::from("value2"),
296 )]))),
297 Event::Log(LogEvent::from(BTreeMap::from([(
298 KeyString::from("key"),
299 Value::from("value3"),
300 )]))),
301 ];
302
303 let input_json_size = input
304 .iter()
305 .map(|event| event.estimated_json_encoded_size_of())
306 .sum::<JsonSize>();
307
308 let mut writer = Vec::new();
309 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
310 assert_eq!(written, 52);
311
312 assert_eq!(
313 String::from_utf8(writer).unwrap(),
314 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
315 );
316
317 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
318 }
319
320 #[test]
321 fn test_encode_batch_ndjson_empty() {
322 let encoding = (
323 Transformer::default(),
324 crate::codecs::Encoder::<Framer>::new(
325 NewlineDelimitedEncoder::default().into(),
326 JsonSerializerConfig::default().build().into(),
327 ),
328 );
329
330 let mut writer = Vec::new();
331 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
332 assert_eq!(written, 0);
333
334 assert_eq!(String::from_utf8(writer).unwrap(), "");
335 assert_eq!(
336 CountByteSize(0, JsonSize::zero()),
337 json_size.size().unwrap()
338 );
339 }
340
341 #[test]
342 fn test_encode_batch_ndjson_single() {
343 let encoding = (
344 Transformer::default(),
345 crate::codecs::Encoder::<Framer>::new(
346 NewlineDelimitedEncoder::default().into(),
347 JsonSerializerConfig::default().build().into(),
348 ),
349 );
350
351 let mut writer = Vec::new();
352 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
353 KeyString::from("key"),
354 Value::from("value"),
355 )])))];
356 let input_json_size = input
357 .iter()
358 .map(|event| event.estimated_json_encoded_size_of())
359 .sum::<JsonSize>();
360
361 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
362 assert_eq!(written, 16);
363
364 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
365 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
366 }
367
368 #[test]
369 fn test_encode_batch_ndjson_multiple() {
370 let encoding = (
371 Transformer::default(),
372 crate::codecs::Encoder::<Framer>::new(
373 NewlineDelimitedEncoder::default().into(),
374 JsonSerializerConfig::default().build().into(),
375 ),
376 );
377
378 let mut writer = Vec::new();
379 let input = vec![
380 Event::Log(LogEvent::from(BTreeMap::from([(
381 KeyString::from("key"),
382 Value::from("value1"),
383 )]))),
384 Event::Log(LogEvent::from(BTreeMap::from([(
385 KeyString::from("key"),
386 Value::from("value2"),
387 )]))),
388 Event::Log(LogEvent::from(BTreeMap::from([(
389 KeyString::from("key"),
390 Value::from("value3"),
391 )]))),
392 ];
393 let input_json_size = input
394 .iter()
395 .map(|event| event.estimated_json_encoded_size_of())
396 .sum::<JsonSize>();
397
398 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
399 assert_eq!(written, 51);
400
401 assert_eq!(
402 String::from_utf8(writer).unwrap(),
403 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
404 );
405 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
406 }
407
408 #[test]
409 fn test_encode_event_json() {
410 let encoding = (
411 Transformer::default(),
412 crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
413 );
414
415 let mut writer = Vec::new();
416 let input = Event::Log(LogEvent::from(BTreeMap::from([(
417 KeyString::from("key"),
418 Value::from("value"),
419 )])));
420 let input_json_size = input.estimated_json_encoded_size_of();
421
422 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
423 assert_eq!(written, 15);
424
425 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
426 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
427 }
428
429 #[test]
430 fn test_encode_event_text() {
431 let encoding = (
432 Transformer::default(),
433 crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
434 );
435
436 let mut writer = Vec::new();
437 let input = Event::Log(LogEvent::from(BTreeMap::from([(
438 KeyString::from("message"),
439 Value::from("value"),
440 )])));
441 let input_json_size = input.estimated_json_encoded_size_of();
442
443 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
444 assert_eq!(written, 5);
445
446 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
447 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
448 }
449
450 fn test_data_dir() -> PathBuf {
451 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
452 }
453
454 #[test]
455 fn test_encode_batch_protobuf_single() {
456 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
457 let input_proto_size = message_raw.len();
458
459 let mut buf = BytesMut::with_capacity(64);
461 buf.reserve(4 + input_proto_size);
462 buf.put_uint(input_proto_size as u64, 4);
463 buf.extend_from_slice(&message_raw[..]);
464 let expected_bytes = buf.freeze();
465
466 let config = ProtobufSerializerConfig {
467 protobuf: ProtobufSerializerOptions {
468 desc_file: test_data_dir().join("test_proto.desc"),
469 message_type: "test_proto.User".to_string(),
470 use_json_names: false,
471 },
472 };
473
474 let encoding = (
475 Transformer::default(),
476 crate::codecs::Encoder::<Framer>::new(
477 LengthDelimitedEncoder::default().into(),
478 config.build().unwrap().into(),
479 ),
480 );
481
482 let mut writer = Vec::new();
483 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
484 (KeyString::from("id"), Value::from("123")),
485 (KeyString::from("name"), Value::from("Alice")),
486 (KeyString::from("age"), Value::from(30)),
487 (
488 KeyString::from("emails"),
489 Value::from(vec!["alice@example.com", "alice@work.com"]),
490 ),
491 ])))];
492
493 let input_json_size = input
494 .iter()
495 .map(|event| event.estimated_json_encoded_size_of())
496 .sum::<JsonSize>();
497
498 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
499
500 assert_eq!(input_proto_size, 49);
501 assert_eq!(written, input_proto_size + 4);
502 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
503 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
504 }
505
506 #[test]
507 fn test_encode_batch_protobuf_multiple() {
508 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
509 let messages = vec![message_raw.clone(), message_raw.clone()];
510 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
511
512 let mut buf = BytesMut::with_capacity(128);
513 for message in messages {
514 buf.reserve(4 + message.len());
516 buf.put_uint(message.len() as u64, 4);
517 buf.extend_from_slice(&message[..]);
518 }
519 let expected_bytes = buf.freeze();
520
521 let config = ProtobufSerializerConfig {
522 protobuf: ProtobufSerializerOptions {
523 desc_file: test_data_dir().join("test_proto.desc"),
524 message_type: "test_proto.User".to_string(),
525 use_json_names: false,
526 },
527 };
528
529 let encoding = (
530 Transformer::default(),
531 crate::codecs::Encoder::<Framer>::new(
532 LengthDelimitedEncoder::default().into(),
533 config.build().unwrap().into(),
534 ),
535 );
536
537 let mut writer = Vec::new();
538 let input = vec![
539 Event::Log(LogEvent::from(BTreeMap::from([
540 (KeyString::from("id"), Value::from("123")),
541 (KeyString::from("name"), Value::from("Alice")),
542 (KeyString::from("age"), Value::from(30)),
543 (
544 KeyString::from("emails"),
545 Value::from(vec!["alice@example.com", "alice@work.com"]),
546 ),
547 ]))),
548 Event::Log(LogEvent::from(BTreeMap::from([
549 (KeyString::from("id"), Value::from("123")),
550 (KeyString::from("name"), Value::from("Alice")),
551 (KeyString::from("age"), Value::from(30)),
552 (
553 KeyString::from("emails"),
554 Value::from(vec!["alice@example.com", "alice@work.com"]),
555 ),
556 ]))),
557 ];
558
559 let input_json_size: JsonSize = input
560 .iter()
561 .map(|event| event.estimated_json_encoded_size_of())
562 .sum();
563
564 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
565
566 assert_eq!(total_input_proto_size, 49 * 2);
567 assert_eq!(written, total_input_proto_size + 8);
568 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
569 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
570 }
571}