1use std::io;
2
3use bytes::BytesMut;
4use itertools::{Itertools, Position};
5use tokio_util::codec::Encoder as _;
6use vector_lib::codecs::encoding::Framer;
7use vector_lib::request_metadata::GroupedCountByteSize;
8use vector_lib::{config::telemetry, EstimatedJsonEncodedSizeOf};
9
10use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError};
11
12pub trait Encoder<T> {
13 fn encode_input(
19 &self,
20 input: T,
21 writer: &mut dyn io::Write,
22 ) -> io::Result<(usize, GroupedCountByteSize)>;
23}
24
25impl Encoder<Vec<Event>> for (Transformer, crate::codecs::Encoder<Framer>) {
26 fn encode_input(
27 &self,
28 events: Vec<Event>,
29 writer: &mut dyn io::Write,
30 ) -> io::Result<(usize, GroupedCountByteSize)> {
31 let mut encoder = self.1.clone();
32 let mut bytes_written = 0;
33 let mut n_events_pending = events.len();
34 let is_empty = events.is_empty();
35 let batch_prefix = encoder.batch_prefix();
36 write_all(writer, n_events_pending, batch_prefix)?;
37 bytes_written += batch_prefix.len();
38
39 let mut byte_size = telemetry().create_request_count_byte_size();
40
41 for (position, mut event) in events.into_iter().with_position() {
42 self.0.transform(&mut event);
43
44 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
47
48 let mut bytes = BytesMut::new();
49 match (position, encoder.framer()) {
50 (
51 Position::Last | Position::Only,
52 Framer::CharacterDelimited(_) | Framer::NewlineDelimited(_),
53 ) => {
54 encoder
55 .serialize(event, &mut bytes)
56 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
57 }
58 _ => {
59 encoder
60 .encode(event, &mut bytes)
61 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
62 }
63 }
64 write_all(writer, n_events_pending, &bytes)?;
65 bytes_written += bytes.len();
66 n_events_pending -= 1;
67 }
68
69 let batch_suffix = encoder.batch_suffix(is_empty);
70 assert!(n_events_pending == 0);
71 write_all(writer, 0, batch_suffix)?;
72 bytes_written += batch_suffix.len();
73
74 Ok((bytes_written, byte_size))
75 }
76}
77
78impl Encoder<Event> for (Transformer, crate::codecs::Encoder<()>) {
79 fn encode_input(
80 &self,
81 mut event: Event,
82 writer: &mut dyn io::Write,
83 ) -> io::Result<(usize, GroupedCountByteSize)> {
84 let mut encoder = self.1.clone();
85 self.0.transform(&mut event);
86
87 let mut byte_size = telemetry().create_request_count_byte_size();
88 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
89
90 let mut bytes = BytesMut::new();
91 encoder
92 .serialize(event, &mut bytes)
93 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
94 write_all(writer, 1, &bytes)?;
95 Ok((bytes.len(), byte_size))
96 }
97}
98
99pub fn write_all(
108 writer: &mut dyn io::Write,
109 n_events_pending: usize,
110 buf: &[u8],
111) -> io::Result<()> {
112 writer.write_all(buf).inspect_err(|error| {
113 emit!(EncoderWriteError {
114 error,
115 count: n_events_pending,
116 });
117 })
118}
119
120pub fn as_tracked_write<F, I, E>(inner: &mut dyn io::Write, input: I, f: F) -> io::Result<usize>
121where
122 F: FnOnce(&mut dyn io::Write, I) -> Result<(), E>,
123 E: Into<io::Error> + 'static,
124{
125 struct Tracked<'inner> {
126 count: usize,
127 inner: &'inner mut dyn io::Write,
128 }
129
130 impl io::Write for Tracked<'_> {
131 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
132 #[allow(clippy::disallowed_methods)] let n = self.inner.write(buf)?;
134 self.count += n;
135 Ok(n)
136 }
137
138 fn flush(&mut self) -> io::Result<()> {
139 self.inner.flush()
140 }
141 }
142
143 let mut tracked = Tracked { count: 0, inner };
144 f(&mut tracked, input).map_err(|e| e.into())?;
145 Ok(tracked.count)
146}
147
148#[cfg(test)]
149mod tests {
150 use std::collections::BTreeMap;
151 use std::env;
152 use std::path::PathBuf;
153
154 use bytes::{BufMut, Bytes};
155 use vector_lib::codecs::encoding::{ProtobufSerializerConfig, ProtobufSerializerOptions};
156 use vector_lib::codecs::{
157 CharacterDelimitedEncoder, JsonSerializerConfig, LengthDelimitedEncoder,
158 NewlineDelimitedEncoder, TextSerializerConfig,
159 };
160 use vector_lib::event::LogEvent;
161 use vector_lib::{internal_event::CountByteSize, json_size::JsonSize};
162 use vrl::value::{KeyString, Value};
163
164 use super::*;
165
166 #[test]
167 fn test_encode_batch_json_empty() {
168 let encoding = (
169 Transformer::default(),
170 crate::codecs::Encoder::<Framer>::new(
171 CharacterDelimitedEncoder::new(b',').into(),
172 JsonSerializerConfig::default().build().into(),
173 ),
174 );
175
176 let mut writer = Vec::new();
177 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
178 assert_eq!(written, 2);
179
180 assert_eq!(String::from_utf8(writer).unwrap(), "[]");
181 assert_eq!(
182 CountByteSize(0, JsonSize::zero()),
183 json_size.size().unwrap()
184 );
185 }
186
187 #[test]
188 fn test_encode_batch_json_single() {
189 let encoding = (
190 Transformer::default(),
191 crate::codecs::Encoder::<Framer>::new(
192 CharacterDelimitedEncoder::new(b',').into(),
193 JsonSerializerConfig::default().build().into(),
194 ),
195 );
196
197 let mut writer = Vec::new();
198 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
199 KeyString::from("key"),
200 Value::from("value"),
201 )])))];
202
203 let input_json_size = input
204 .iter()
205 .map(|event| event.estimated_json_encoded_size_of())
206 .sum::<JsonSize>();
207
208 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
209 assert_eq!(written, 17);
210
211 assert_eq!(String::from_utf8(writer).unwrap(), r#"[{"key":"value"}]"#);
212 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
213 }
214
215 #[test]
216 fn test_encode_batch_json_multiple() {
217 let encoding = (
218 Transformer::default(),
219 crate::codecs::Encoder::<Framer>::new(
220 CharacterDelimitedEncoder::new(b',').into(),
221 JsonSerializerConfig::default().build().into(),
222 ),
223 );
224
225 let input = vec![
226 Event::Log(LogEvent::from(BTreeMap::from([(
227 KeyString::from("key"),
228 Value::from("value1"),
229 )]))),
230 Event::Log(LogEvent::from(BTreeMap::from([(
231 KeyString::from("key"),
232 Value::from("value2"),
233 )]))),
234 Event::Log(LogEvent::from(BTreeMap::from([(
235 KeyString::from("key"),
236 Value::from("value3"),
237 )]))),
238 ];
239
240 let input_json_size = input
241 .iter()
242 .map(|event| event.estimated_json_encoded_size_of())
243 .sum::<JsonSize>();
244
245 let mut writer = Vec::new();
246 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
247 assert_eq!(written, 52);
248
249 assert_eq!(
250 String::from_utf8(writer).unwrap(),
251 r#"[{"key":"value1"},{"key":"value2"},{"key":"value3"}]"#
252 );
253
254 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
255 }
256
257 #[test]
258 fn test_encode_batch_ndjson_empty() {
259 let encoding = (
260 Transformer::default(),
261 crate::codecs::Encoder::<Framer>::new(
262 NewlineDelimitedEncoder::default().into(),
263 JsonSerializerConfig::default().build().into(),
264 ),
265 );
266
267 let mut writer = Vec::new();
268 let (written, json_size) = encoding.encode_input(vec![], &mut writer).unwrap();
269 assert_eq!(written, 0);
270
271 assert_eq!(String::from_utf8(writer).unwrap(), "");
272 assert_eq!(
273 CountByteSize(0, JsonSize::zero()),
274 json_size.size().unwrap()
275 );
276 }
277
278 #[test]
279 fn test_encode_batch_ndjson_single() {
280 let encoding = (
281 Transformer::default(),
282 crate::codecs::Encoder::<Framer>::new(
283 NewlineDelimitedEncoder::default().into(),
284 JsonSerializerConfig::default().build().into(),
285 ),
286 );
287
288 let mut writer = Vec::new();
289 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([(
290 KeyString::from("key"),
291 Value::from("value"),
292 )])))];
293 let input_json_size = input
294 .iter()
295 .map(|event| event.estimated_json_encoded_size_of())
296 .sum::<JsonSize>();
297
298 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
299 assert_eq!(written, 16);
300
301 assert_eq!(String::from_utf8(writer).unwrap(), "{\"key\":\"value\"}\n");
302 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
303 }
304
305 #[test]
306 fn test_encode_batch_ndjson_multiple() {
307 let encoding = (
308 Transformer::default(),
309 crate::codecs::Encoder::<Framer>::new(
310 NewlineDelimitedEncoder::default().into(),
311 JsonSerializerConfig::default().build().into(),
312 ),
313 );
314
315 let mut writer = Vec::new();
316 let input = vec![
317 Event::Log(LogEvent::from(BTreeMap::from([(
318 KeyString::from("key"),
319 Value::from("value1"),
320 )]))),
321 Event::Log(LogEvent::from(BTreeMap::from([(
322 KeyString::from("key"),
323 Value::from("value2"),
324 )]))),
325 Event::Log(LogEvent::from(BTreeMap::from([(
326 KeyString::from("key"),
327 Value::from("value3"),
328 )]))),
329 ];
330 let input_json_size = input
331 .iter()
332 .map(|event| event.estimated_json_encoded_size_of())
333 .sum::<JsonSize>();
334
335 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
336 assert_eq!(written, 51);
337
338 assert_eq!(
339 String::from_utf8(writer).unwrap(),
340 "{\"key\":\"value1\"}\n{\"key\":\"value2\"}\n{\"key\":\"value3\"}\n"
341 );
342 assert_eq!(CountByteSize(3, input_json_size), json_size.size().unwrap());
343 }
344
345 #[test]
346 fn test_encode_event_json() {
347 let encoding = (
348 Transformer::default(),
349 crate::codecs::Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
350 );
351
352 let mut writer = Vec::new();
353 let input = Event::Log(LogEvent::from(BTreeMap::from([(
354 KeyString::from("key"),
355 Value::from("value"),
356 )])));
357 let input_json_size = input.estimated_json_encoded_size_of();
358
359 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
360 assert_eq!(written, 15);
361
362 assert_eq!(String::from_utf8(writer).unwrap(), r#"{"key":"value"}"#);
363 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
364 }
365
366 #[test]
367 fn test_encode_event_text() {
368 let encoding = (
369 Transformer::default(),
370 crate::codecs::Encoder::<()>::new(TextSerializerConfig::default().build().into()),
371 );
372
373 let mut writer = Vec::new();
374 let input = Event::Log(LogEvent::from(BTreeMap::from([(
375 KeyString::from("message"),
376 Value::from("value"),
377 )])));
378 let input_json_size = input.estimated_json_encoded_size_of();
379
380 let (written, json_size) = encoding.encode_input(input, &mut writer).unwrap();
381 assert_eq!(written, 5);
382
383 assert_eq!(String::from_utf8(writer).unwrap(), r"value");
384 assert_eq!(CountByteSize(1, input_json_size), json_size.size().unwrap());
385 }
386
387 fn test_data_dir() -> PathBuf {
388 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
389 }
390
391 #[test]
392 fn test_encode_batch_protobuf_single() {
393 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
394 let input_proto_size = message_raw.len();
395
396 let mut buf = BytesMut::with_capacity(64);
398 buf.reserve(4 + input_proto_size);
399 buf.put_uint(input_proto_size as u64, 4);
400 buf.extend_from_slice(&message_raw[..]);
401 let expected_bytes = buf.freeze();
402
403 let config = ProtobufSerializerConfig {
404 protobuf: ProtobufSerializerOptions {
405 desc_file: test_data_dir().join("test_proto.desc"),
406 message_type: "test_proto.User".to_string(),
407 },
408 };
409
410 let encoding = (
411 Transformer::default(),
412 crate::codecs::Encoder::<Framer>::new(
413 LengthDelimitedEncoder::default().into(),
414 config.build().unwrap().into(),
415 ),
416 );
417
418 let mut writer = Vec::new();
419 let input = vec![Event::Log(LogEvent::from(BTreeMap::from([
420 (KeyString::from("id"), Value::from("123")),
421 (KeyString::from("name"), Value::from("Alice")),
422 (KeyString::from("age"), Value::from(30)),
423 (
424 KeyString::from("emails"),
425 Value::from(vec!["alice@example.com", "alice@work.com"]),
426 ),
427 ])))];
428
429 let input_json_size = input
430 .iter()
431 .map(|event| event.estimated_json_encoded_size_of())
432 .sum::<JsonSize>();
433
434 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
435
436 assert_eq!(input_proto_size, 49);
437 assert_eq!(written, input_proto_size + 4);
438 assert_eq!(CountByteSize(1, input_json_size), size.size().unwrap());
439 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
440 }
441
442 #[test]
443 fn test_encode_batch_protobuf_multiple() {
444 let message_raw = std::fs::read(test_data_dir().join("test_proto.pb")).unwrap();
445 let messages = vec![message_raw.clone(), message_raw.clone()];
446 let total_input_proto_size: usize = messages.iter().map(|m| m.len()).sum();
447
448 let mut buf = BytesMut::with_capacity(128);
449 for message in messages {
450 buf.reserve(4 + message.len());
452 buf.put_uint(message.len() as u64, 4);
453 buf.extend_from_slice(&message[..]);
454 }
455 let expected_bytes = buf.freeze();
456
457 let config = ProtobufSerializerConfig {
458 protobuf: ProtobufSerializerOptions {
459 desc_file: test_data_dir().join("test_proto.desc"),
460 message_type: "test_proto.User".to_string(),
461 },
462 };
463
464 let encoding = (
465 Transformer::default(),
466 crate::codecs::Encoder::<Framer>::new(
467 LengthDelimitedEncoder::default().into(),
468 config.build().unwrap().into(),
469 ),
470 );
471
472 let mut writer = Vec::new();
473 let input = vec![
474 Event::Log(LogEvent::from(BTreeMap::from([
475 (KeyString::from("id"), Value::from("123")),
476 (KeyString::from("name"), Value::from("Alice")),
477 (KeyString::from("age"), Value::from(30)),
478 (
479 KeyString::from("emails"),
480 Value::from(vec!["alice@example.com", "alice@work.com"]),
481 ),
482 ]))),
483 Event::Log(LogEvent::from(BTreeMap::from([
484 (KeyString::from("id"), Value::from("123")),
485 (KeyString::from("name"), Value::from("Alice")),
486 (KeyString::from("age"), Value::from(30)),
487 (
488 KeyString::from("emails"),
489 Value::from(vec!["alice@example.com", "alice@work.com"]),
490 ),
491 ]))),
492 ];
493
494 let input_json_size: JsonSize = input
495 .iter()
496 .map(|event| event.estimated_json_encoded_size_of())
497 .sum();
498
499 let (written, size) = encoding.encode_input(input, &mut writer).unwrap();
500
501 assert_eq!(total_input_proto_size, 49 * 2);
502 assert_eq!(written, total_input_proto_size + 8);
503 assert_eq!(CountByteSize(2, input_json_size), size.size().unwrap());
504 assert_eq!(Bytes::copy_from_slice(&writer), expected_bytes);
505 }
506}