1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8#[cfg(feature = "parquet")]
9use crate::encoding::ParquetSerializer;
10use crate::{
11 encoding::{Error, Framer, Serializer},
12 internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15#[derive(Debug, Clone)]
17pub enum BatchSerializer {
18 #[cfg(feature = "arrow")]
20 Arrow(ArrowStreamSerializer),
21 #[cfg(feature = "parquet")]
23 Parquet(Box<ParquetSerializer>),
24}
25
26#[derive(Debug, Clone)]
28pub struct BatchEncoder {
29 serializer: BatchSerializer,
30}
31
32impl BatchEncoder {
33 pub const fn new(serializer: BatchSerializer) -> Self {
35 Self { serializer }
36 }
37
38 pub const fn serializer(&self) -> &BatchSerializer {
40 &self.serializer
41 }
42
43 #[cfg(any(feature = "arrow", feature = "parquet"))]
45 pub const fn content_type(&self) -> &'static str {
46 match &self.serializer {
47 BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
48 #[cfg(feature = "parquet")]
49 BatchSerializer::Parquet(_) => "application/vnd.apache.parquet",
50 }
51 }
52}
53
54impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
55 type Error = Error;
56
57 #[allow(unused_variables)]
58 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
59 #[allow(unreachable_patterns)]
60 match &mut self.serializer {
61 #[cfg(feature = "arrow")]
62 BatchSerializer::Arrow(serializer) => {
63 serializer.encode(events, buffer).map_err(|err| {
64 use crate::encoding::ArrowEncodingError;
65 match err {
66 ArrowEncodingError::NullConstraint { .. } => {
67 Error::SchemaConstraintViolation(Box::new(err))
68 }
69 _ => Error::SerializingError(Box::new(err)),
70 }
71 })
72 }
73 #[cfg(feature = "parquet")]
74 BatchSerializer::Parquet(serializer) => serializer
75 .encode(events, buffer)
76 .map_err(Error::SerializingError),
77 #[allow(unreachable_patterns)]
78 _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
85pub enum EncoderKind {
86 Framed(Box<Encoder<Framer>>),
88 #[cfg(any(feature = "arrow", feature = "parquet"))]
90 Batch(BatchEncoder),
91}
92
93#[derive(Debug, Clone)]
94pub struct Encoder<Framer>
96where
97 Framer: Clone,
98{
99 framer: Framer,
100 serializer: Serializer,
101}
102
103impl Default for Encoder<Framer> {
104 fn default() -> Self {
105 use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
106
107 Self {
108 framer: NewlineDelimitedEncoder::default().into(),
109 serializer: TextSerializerConfig::default().build().into(),
110 }
111 }
112}
113
114impl Default for Encoder<()> {
115 fn default() -> Self {
116 use crate::encoding::TextSerializerConfig;
117
118 Self {
119 framer: (),
120 serializer: TextSerializerConfig::default().build().into(),
121 }
122 }
123}
124
125impl<Framer> Encoder<Framer>
126where
127 Framer: Clone,
128{
129 pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
131 let len = buffer.len();
132 let mut payload = buffer.split_off(len);
133
134 self.serialize_at_start(event, &mut payload)?;
135
136 buffer.unsplit(payload);
137
138 Ok(())
139 }
140
141 fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
143 self.serializer.encode(event, buffer).map_err(|error| {
144 emit(EncoderSerializeError { error: &error });
145 Error::SerializingError(error)
146 })
147 }
148}
149
150impl Encoder<Framer> {
151 pub const fn new(framer: Framer, serializer: Serializer) -> Self {
155 Self { framer, serializer }
156 }
157
158 pub const fn framer(&self) -> &Framer {
160 &self.framer
161 }
162
163 pub const fn serializer(&self) -> &Serializer {
165 &self.serializer
166 }
167
168 pub const fn batch_prefix(&self) -> &[u8] {
170 match (&self.framer, &self.serializer) {
171 (
172 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
173 delimiter: b',',
174 }),
175 Serializer::Json(_) | Serializer::NativeJson(_),
176 ) => b"[",
177 _ => &[],
178 }
179 }
180
181 pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
183 match (&self.framer, &self.serializer, empty) {
184 (
185 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
186 delimiter: b',',
187 }),
188 Serializer::Json(_) | Serializer::NativeJson(_),
189 _,
190 ) => b"]",
191 (Framer::NewlineDelimited(_), _, false) => b"\n",
192 _ => &[],
193 }
194 }
195
196 pub const fn content_type(&self) -> &'static str {
198 match (&self.serializer, &self.framer) {
199 (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
200 "application/x-ndjson"
201 }
202 (
203 Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
204 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
205 delimiter: b',',
206 }),
207 ) => "application/json",
208 (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
209 (
210 Serializer::Avro(_)
211 | Serializer::Cef(_)
212 | Serializer::Csv(_)
213 | Serializer::Gelf(_)
214 | Serializer::Json(_)
215 | Serializer::Logfmt(_)
216 | Serializer::NativeJson(_)
217 | Serializer::RawMessage(_)
218 | Serializer::Text(_),
219 _,
220 ) => "text/plain",
221 #[cfg(feature = "syslog")]
222 (Serializer::Syslog(_), _) => "text/plain",
223 #[cfg(feature = "opentelemetry")]
224 (Serializer::Otlp(_), _) => "application/x-protobuf",
225 }
226 }
227}
228
229impl Encoder<()> {
230 pub const fn new(serializer: Serializer) -> Self {
233 Self {
234 framer: (),
235 serializer,
236 }
237 }
238
239 pub const fn serializer(&self) -> &Serializer {
241 &self.serializer
242 }
243}
244
245impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
246 type Error = Error;
247
248 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
249 let len = buffer.len();
250 let mut payload = buffer.split_off(len);
251
252 self.serialize_at_start(event, &mut payload)?;
253
254 self.framer.encode((), &mut payload).map_err(|error| {
256 emit(EncoderFramingError { error: &error });
257 Error::FramingError(error)
258 })?;
259
260 buffer.unsplit(payload);
261
262 Ok(())
263 }
264}
265
266impl tokio_util::codec::Encoder<Event> for Encoder<()> {
267 type Error = Error;
268
269 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
270 let len = buffer.len();
271 let mut payload = buffer.split_off(len);
272
273 self.serialize_at_start(event, &mut payload)?;
274
275 buffer.unsplit(payload);
276
277 Ok(())
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use bytes::BufMut;
284 use futures::{SinkExt, StreamExt};
285 use tokio_util::codec::FramedWrite;
286 use vector_core::event::LogEvent;
287
288 use super::*;
289 use crate::encoding::BoxedFramingError;
290
291 #[derive(Debug, Clone)]
292 struct ParenEncoder;
293
294 impl ParenEncoder {
295 pub(super) const fn new() -> Self {
296 Self
297 }
298 }
299
300 impl tokio_util::codec::Encoder<()> for ParenEncoder {
301 type Error = BoxedFramingError;
302
303 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
304 dst.reserve(2);
305 let inner = dst.split();
306 dst.put_u8(b'(');
307 dst.unsplit(inner);
308 dst.put_u8(b')');
309 Ok(())
310 }
311 }
312
313 #[derive(Debug, Clone)]
314 struct ErrorNthEncoder<T>(T, usize, usize)
315 where
316 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
317
318 impl<T> ErrorNthEncoder<T>
319 where
320 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
321 {
322 pub(super) const fn new(encoder: T, n: usize) -> Self {
323 Self(encoder, 0, n)
324 }
325 }
326
327 impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
328 where
329 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
330 {
331 type Error = BoxedFramingError;
332
333 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
334 self.0.encode((), dst)?;
335 let result = if self.1 == self.2 {
336 Err(Box::new(std::io::Error::other("error")) as _)
337 } else {
338 Ok(())
339 };
340 self.1 += 1;
341 result
342 }
343 }
344
345 #[tokio::test]
346 async fn test_encode_events_sink_empty() {
347 let encoder = Encoder::<Framer>::new(
348 Framer::Boxed(Box::new(ParenEncoder::new())),
349 crate::encoding::TextSerializerConfig::default()
350 .build()
351 .into(),
352 );
353 let source = futures::stream::iter(vec![
354 Event::Log(LogEvent::from("foo")),
355 Event::Log(LogEvent::from("bar")),
356 Event::Log(LogEvent::from("baz")),
357 ])
358 .map(Ok);
359 let sink = Vec::new();
360 let mut framed = FramedWrite::new(sink, encoder);
361 source.forward(&mut framed).await.unwrap();
362 let sink = framed.into_inner();
363 assert_eq!(sink, b"(foo)(bar)(baz)");
364 }
365
366 #[tokio::test]
367 async fn test_encode_events_sink_non_empty() {
368 let encoder = Encoder::<Framer>::new(
369 Framer::Boxed(Box::new(ParenEncoder::new())),
370 crate::encoding::TextSerializerConfig::default()
371 .build()
372 .into(),
373 );
374 let source = futures::stream::iter(vec![
375 Event::Log(LogEvent::from("bar")),
376 Event::Log(LogEvent::from("baz")),
377 Event::Log(LogEvent::from("bat")),
378 ])
379 .map(Ok);
380 let sink = Vec::from("(foo)");
381 let mut framed = FramedWrite::new(sink, encoder);
382 source.forward(&mut framed).await.unwrap();
383 let sink = framed.into_inner();
384 assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
385 }
386
387 #[tokio::test]
388 async fn test_encode_events_sink_empty_handle_framing_error() {
389 let encoder = Encoder::<Framer>::new(
390 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
391 crate::encoding::TextSerializerConfig::default()
392 .build()
393 .into(),
394 );
395 let source = futures::stream::iter(vec![
396 Event::Log(LogEvent::from("foo")),
397 Event::Log(LogEvent::from("bar")),
398 Event::Log(LogEvent::from("baz")),
399 ])
400 .map(Ok);
401 let sink = Vec::new();
402 let mut framed = FramedWrite::new(sink, encoder);
403 assert!(source.forward(&mut framed).await.is_err());
404 framed.flush().await.unwrap();
405 let sink = framed.into_inner();
406 assert_eq!(sink, b"(foo)");
407 }
408
409 #[tokio::test]
410 async fn test_encode_events_sink_non_empty_handle_framing_error() {
411 let encoder = Encoder::<Framer>::new(
412 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
413 crate::encoding::TextSerializerConfig::default()
414 .build()
415 .into(),
416 );
417 let source = futures::stream::iter(vec![
418 Event::Log(LogEvent::from("bar")),
419 Event::Log(LogEvent::from("baz")),
420 Event::Log(LogEvent::from("bat")),
421 ])
422 .map(Ok);
423 let sink = Vec::from("(foo)");
424 let mut framed = FramedWrite::new(sink, encoder);
425 assert!(source.forward(&mut framed).await.is_err());
426 framed.flush().await.unwrap();
427 let sink = framed.into_inner();
428 assert_eq!(sink, b"(foo)(bar)");
429 }
430
431 #[tokio::test]
432 async fn test_encode_batch_newline() {
433 let encoder = Encoder::<Framer>::new(
434 Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
435 crate::encoding::TextSerializerConfig::default()
436 .build()
437 .into(),
438 );
439 let source = futures::stream::iter(vec![
440 Event::Log(LogEvent::from("bar")),
441 Event::Log(LogEvent::from("baz")),
442 Event::Log(LogEvent::from("bat")),
443 ])
444 .map(Ok);
445 let sink: Vec<u8> = Vec::new();
446 let mut framed = FramedWrite::new(sink, encoder);
447 source.forward(&mut framed).await.unwrap();
448 let sink = framed.into_inner();
449 assert_eq!(sink, b"bar\nbaz\nbat\n");
450 }
451}