1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf, codecs::encoding::Framer, config::telemetry,
8 request_metadata::GroupedCountByteSize,
9};
10
11use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
12
13pub trait Encoder<T> {
14 fn encode_input(
20 &self,
21 input: T,
22 writer: &mut dyn io::Write,
23 ) -> io::Result<(usize, GroupedCountByteSize)>;
24}
25
26impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
27 fn encode_input(
28 &self,
29 events: Vec<Event>,
30 writer: &mut dyn io::Write,
31 ) -> io::Result<(usize, GroupedCountByteSize)> {
32 let mut encoder = self.1.clone();
33 let mut bytes_written = 0;
34 let mut n_events_pending = events.len();
35 let is_empty = events.is_empty();
36 let batch_prefix = encoder.batch_prefix();
37 write_all(writer, n_events_pending, batch_prefix)?;
38 bytes_written += batch_prefix.len();
39
40 let mut byte_size = telemetry().create_request_count_byte_size();
41
42 for (position, mut event) in events.into_iter().with_position() {
43 self.0.transform(&mut event);
44
45 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
48
49 let mut bytes = BytesMut::new();
50 match (position, encoder.framer()) {
51 (
52 Position::Last | Position::Only,
53 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
54 ) => {
55 encoder
56 .serialize(event, &mut bytes)
57 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
58 }
59 _ => {
60 encoder
61 .encode(event, &mut bytes)
62 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
63 }
64 }
65 write_all(writer, n_events_pending, &bytes)?;
66 bytes_written += bytes.len();
67 n_events_pending -= 1;
68 }
69
70 let batch_suffix = encoder.batch_suffix(is_empty);
71 assert!(n_events_pending == 0);
72 write_all(writer, 0, batch_suffix)?;
73 bytes_written += batch_suffix.len();
74
75 Ok((bytes_written, byte_size))
76 }
77}
78
79impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
80 fn encode_input(
81 &self,
82 mut event: Event,
83 writer: &mut dyn io::Write,
84 ) -> io::Result<(usize, GroupedCountByteSize)> {
85 let mut encoder = self.1.clone();
86 self.0.transform(&mut event);
87
88 let mut byte_size = telemetry().create_request_count_byte_size();
89 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
90
91 let mut bytes = BytesMut::new();
92 encoder
93 .serialize(event, &mut bytes)
94 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
95 write_all(writer, 1, &bytes)?;
96 Ok((bytes.len(), byte_size))
97 }
98}
99
100pub fn write_all(
109 writer: &mut dyn io::Write,
110 n_events_pending: usize,
111 buf: &[u8],
112) -> io::Result<()> {
113 writer.write_all(buf).inspect_err(|error| {
114 emit!(EncoderWriteError {
115 error,
116 count: n_events_pending,
117 });
118 })
119}
120
121pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
122where
123 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
124 E: Into<io::Error> + 'static,
125{
126 struct Tracked<'inner> {
127 count: usize,
128 inner: &'inner mut dyn io::Write,
129 }
130
131 impl io::Write for Tracked<'_> {
132 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
133 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
135 self.count += n;
136 Ok(n)
137 }
138
139 fn flush(&mut self) -> io::Result<()> {
140 self.inner.flush()
141 }
142 }
143
144 let mut tracked = Tracked { count: 0, inner };
145 f(&mut tracked, input).map_err(|e| e.into())?;
146 Ok(tracked.count)
147}
148
149#[cfg(test)]
150mod tests {
151 use std::{collections::BTreeMap, env, path::PathBuf};
152
153 use bytes::{BufMut, Bytes};
154 use vector_lib::{
155 codecs::{
156 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
157 NewlineDelimitedEncoder, TextSerializerConfig,
158 encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions},
159 },
160 event::LogEvent,
161 internal_event::CountByteSize,
162 json_size::JsonSize,
163 };
164 use vrl::value::{KeyString, Value};
165
166 use super::*;
167
168 #[test]
169 fn test_encode_batch_json_empty() {
170 let encoding = (
171 Transformer::default(),
172 crate::codecs::Encoder::<Framer>::new(
173 CharacterDelimitedEncoder::new(b',').into(),
174 JsonSerializerConfig::default().build().into(),
175 ),
176 );
177
178 let mut writer = Vec::new();
179 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
180 assert_eq!(written, 2);
181
182 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
183 assert_eq!(
184 CountByteSize(0, JsonSize::zero()),
185 json_size.size().unwrap()
186 );
187 }
188
189 #[test]
190 fn test_encode_batch_json_single() {
191 let encoding = (
192 Transformer::default(),
193 crate::codecs::Encoder::<Framer>::new(
194 CharacterDelimitedEncoder::new(b',').into(),
195 JsonSerializerConfig::default().build().into(),
196 ),
197 );
198
199 let mut writer = Vec::new();
200 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
201 KeyString::from("key"),
202 Value::from("value"),
203 )])))];
204
205 let input_json_size = input
206 .iter()
207 .map(|event| event.estimated_json_encoded_size_of())
208 .sum::<JsonSize>();
209
210 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
211 assert_eq!(written, 17);
212
213 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
214 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
215 }
216
217 #[test]
218 fn test_encode_batch_json_multiple() {
219 let encoding = (
220 Transformer::default(),
221 crate::codecs::Encoder::<Framer>::new(
222 CharacterDelimitedEncoder::new(b',').into(),
223 JsonSerializerConfig::default().build().into(),
224 ),
225 );
226
227 let input = vec![
228 Event::Log(LogEvent::from(BTreeMap::from([(
229 KeyString::from("key"),
230 Value::from("value1"),
231 )]))),
232 Event::Log(LogEvent::from(BTreeMap::from([(
233 KeyString::from("key"),
234 Value::from("value2"),
235 )]))),
236 Event::Log(LogEvent::from(BTreeMap::from([(
237 KeyString::from("key"),
238 Value::from("value3"),
239 )]))),
240 ];
241
242 let input_json_size = input
243 .iter()
244 .map(|event| event.estimated_json_encoded_size_of())
245 .sum::<JsonSize>();
246
247 let mut writer = Vec::new();
248 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
249 assert_eq!(written, 52);
250
251 assert_eq!(
252 String::from_utf8(writer).unwrap(),
253 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
254 );
255
256 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
257 }
258
259 #[test]
260 fn test_encode_batch_ndjson_empty() {
261 let encoding = (
262 Transformer::default(),
263 crate::codecs::Encoder::<Framer>::new(
264 NewlineDelimitedEncoder::default().into(),
265 JsonSerializerConfig::default().build().into(),
266 ),
267 );
268
269 let mut writer = Vec::new();
270 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
271 assert_eq!(written, 0);
272
273 assert_eq!(String::from_utf8(writer).unwrap(), "");
274 assert_eq!(
275 CountByteSize(0, JsonSize::zero()),
276 json_size.size().unwrap()
277 );
278 }
279
280 #[test]
281 fn test_encode_batch_ndjson_single() {
282 let encoding = (
283 Transformer::default(),
284 crate::codecs::Encoder::<Framer>::new(
285 NewlineDelimitedEncoder::default().into(),
286 JsonSerializerConfig::default().build().into(),
287 ),
288 );
289
290 let mut writer = Vec::new();
291 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
292 KeyString::from("key"),
293 Value::from("value"),
294 )])))];
295 let input_json_size = input
296 .iter()
297 .map(|event| event.estimated_json_encoded_size_of())
298 .sum::<JsonSize>();
299
300 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
301 assert_eq!(written, 16);
302
303 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
304 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
305 }
306
307 #[test]
308 fn test_encode_batch_ndjson_multiple() {
309 let encoding = (
310 Transformer::default(),
311 crate::codecs::Encoder::<Framer>::new(
312 NewlineDelimitedEncoder::default().into(),
313 JsonSerializerConfig::default().build().into(),
314 ),
315 );
316
317 let mut writer = Vec::new();
318 let input = vec![
319 Event::Log(LogEvent::from(BTreeMap::from([(
320 KeyString::from("key"),
321 Value::from("value1"),
322 )]))),
323 Event::Log(LogEvent::from(BTreeMap::from([(
324 KeyString::from("key"),
325 Value::from("value2"),
326 )]))),
327 Event::Log(LogEvent::from(BTreeMap::from([(
328 KeyString::from("key"),
329 Value::from("value3"),
330 )]))),
331 ];
332 let input_json_size = input
333 .iter()
334 .map(|event| event.estimated_json_encoded_size_of())
335 .sum::<JsonSize>();
336
337 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
338 assert_eq!(written, 51);
339
340 assert_eq!(
341 String::from_utf8(writer).unwrap(),
342 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
343 );
344 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
345 }
346
347 #[test]
348 fn test_encode_event_json() {
349 let encoding = (
350 Transformer::default(),
351 crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
352 );
353
354 let mut writer = Vec::new();
355 let input = Event::Log(LogEvent::from(BTreeMap::from([(
356 KeyString::from("key"),
357 Value::from("value"),
358 )])));
359 let input_json_size = input.estimated_json_encoded_size_of();
360
361 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
362 assert_eq!(written, 15);
363
364 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
365 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
366 }
367
368 #[test]
369 fn test_encode_event_text() {
370 let encoding = (
371 Transformer::default(),
372 crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
373 );
374
375 let mut writer = Vec::new();
376 let input = Event::Log(LogEvent::from(BTreeMap::from([(
377 KeyString::from("message"),
378 Value::from("value"),
379 )])));
380 let input_json_size = input.estimated_json_encoded_size_of();
381
382 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
383 assert_eq!(written, 5);
384
385 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
386 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
387 }
388
389 fn test_data_dir() -> PathBuf {
390 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
391 }
392
393 #[test]
394 fn test_encode_batch_protobuf_single() {
395 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
396 let input_proto_size = message_raw.len();
397
398 let mut buf = BytesMut::with_capacity(64);
400 buf.reserve(4 + input_proto_size);
401 buf.put_uint(input_proto_size as u64, 4);
402 buf.extend_from_slice(&message_raw[..]);
403 let expected_bytes = buf.freeze();
404
405 let config = ProtobufSerializerConfig {
406 protobuf: ProtobufSerializerOptions {
407 desc_file: test_data_dir().join("test_proto.desc"),
408 message_type: "test_proto.User".to_string(),
409 use_json_names: false,
410 },
411 };
412
413 let encoding = (
414 Transformer::default(),
415 crate::codecs::Encoder::<Framer>::new(
416 LengthDelimitedEncoder::default().into(),
417 config.build().unwrap().into(),
418 ),
419 );
420
421 let mut writer = Vec::new();
422 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
423 (KeyString::from("id"), Value::from("123")),
424 (KeyString::from("name"), Value::from("Alice")),
425 (KeyString::from("age"), Value::from(30)),
426 (
427 KeyString::from("emails"),
428 Value::from(vec!["alice@example.com", "alice@work.com"]),
429 ),
430 ])))];
431
432 let input_json_size = input
433 .iter()
434 .map(|event| event.estimated_json_encoded_size_of())
435 .sum::<JsonSize>();
436
437 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
438
439 assert_eq!(input_proto_size, 49);
440 assert_eq!(written, input_proto_size + 4);
441 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
442 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
443 }
444
445 #[test]
446 fn test_encode_batch_protobuf_multiple() {
447 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
448 let messages = vec![message_raw.clone(), message_raw.clone()];
449 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
450
451 let mut buf = BytesMut::with_capacity(128);
452 for message in messages {
453 buf.reserve(4 + message.len());
455 buf.put_uint(message.len() as u64, 4);
456 buf.extend_from_slice(&message[..]);
457 }
458 let expected_bytes = buf.freeze();
459
460 let config = ProtobufSerializerConfig {
461 protobuf: ProtobufSerializerOptions {
462 desc_file: test_data_dir().join("test_proto.desc"),
463 message_type: "test_proto.User".to_string(),
464 use_json_names: false,
465 },
466 };
467
468 let encoding = (
469 Transformer::default(),
470 crate::codecs::Encoder::<Framer>::new(
471 LengthDelimitedEncoder::default().into(),
472 config.build().unwrap().into(),
473 ),
474 );
475
476 let mut writer = Vec::new();
477 let input = vec![
478 Event::Log(LogEvent::from(BTreeMap::from([
479 (KeyString::from("id"), Value::from("123")),
480 (KeyString::from("name"), Value::from("Alice")),
481 (KeyString::from("age"), Value::from(30)),
482 (
483 KeyString::from("emails"),
484 Value::from(vec!["alice@example.com", "alice@work.com"]),
485 ),
486 ]))),
487 Event::Log(LogEvent::from(BTreeMap::from([
488 (KeyString::from("id"), Value::from("123")),
489 (KeyString::from("name"), Value::from("Alice")),
490 (KeyString::from("age"), Value::from(30)),
491 (
492 KeyString::from("emails"),
493 Value::from(vec!["alice@example.com", "alice@work.com"]),
494 ),
495 ]))),
496 ];
497
498 let input_json_size: JsonSize = input
499 .iter()
500 .map(|event| event.estimated_json_encoded_size_of())
501 .sum();
502
503 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
504
505 assert_eq!(total_input_proto_size, 49 * 2);
506 assert_eq!(written, total_input_proto_size + 8);
507 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
508 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
509 }
510}