1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_common::internal_event::emit;
4use vector_core::event::Event;
5
6#[cfg(feature = "arrow")]
7use crate::encoding::ArrowStreamSerializer;
8use crate::{
9 encoding::{Error, Framer, Serializer},
10 internal_events::{EncoderFramingError, EncoderSerializeError},
11};
12
13#[derive(Debug, Clone)]
15pub enum BatchSerializer {
16 #[cfg(feature = "arrow")]
18 Arrow(ArrowStreamSerializer),
19}
20
21#[derive(Debug, Clone)]
23pub struct BatchEncoder {
24 serializer: BatchSerializer,
25}
26
27impl BatchEncoder {
28 pub const fn new(serializer: BatchSerializer) -> Self {
30 Self { serializer }
31 }
32
33 pub const fn serializer(&self) -> &BatchSerializer {
35 &self.serializer
36 }
37
38 #[cfg(feature = "arrow")]
40 pub const fn content_type(&self) -> &'static str {
41 match &self.serializer {
42 BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
43 }
44 }
45}
46
47impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
48 type Error = Error;
49
50 #[allow(unused_variables)]
51 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
52 #[allow(unreachable_patterns)]
53 match &mut self.serializer {
54 #[cfg(feature = "arrow")]
55 BatchSerializer::Arrow(serializer) => {
56 serializer.encode(events, buffer).map_err(|err| {
57 use crate::encoding::ArrowEncodingError;
58 match err {
59 ArrowEncodingError::NullConstraint { .. } => {
60 Error::SchemaConstraintViolation(Box::new(err))
61 }
62 _ => Error::SerializingError(Box::new(err)),
63 }
64 })
65 }
66 _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub enum EncoderKind {
74 Framed(Box<Encoder<Framer>>),
76 #[cfg(feature = "arrow")]
78 Batch(BatchEncoder),
79}
80
81#[derive(Debug, Clone)]
82pub struct Encoder<Framer>
84where
85 Framer: Clone,
86{
87 framer: Framer,
88 serializer: Serializer,
89}
90
91impl Default for Encoder<Framer> {
92 fn default() -> Self {
93 use crate::encoding::{NewlineDelimitedEncoder, TextSerializerConfig};
94
95 Self {
96 framer: NewlineDelimitedEncoder::default().into(),
97 serializer: TextSerializerConfig::default().build().into(),
98 }
99 }
100}
101
102impl Default for Encoder<()> {
103 fn default() -> Self {
104 use crate::encoding::TextSerializerConfig;
105
106 Self {
107 framer: (),
108 serializer: TextSerializerConfig::default().build().into(),
109 }
110 }
111}
112
113impl<Framer> Encoder<Framer>
114where
115 Framer: Clone,
116{
117 pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
119 let len = buffer.len();
120 let mut payload = buffer.split_off(len);
121
122 self.serialize_at_start(event, &mut payload)?;
123
124 buffer.unsplit(payload);
125
126 Ok(())
127 }
128
129 fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
131 self.serializer.encode(event, buffer).map_err(|error| {
132 emit(EncoderSerializeError { error: &error });
133 Error::SerializingError(error)
134 })
135 }
136}
137
138impl Encoder<Framer> {
139 pub const fn new(framer: Framer, serializer: Serializer) -> Self {
143 Self { framer, serializer }
144 }
145
146 pub const fn framer(&self) -> &Framer {
148 &self.framer
149 }
150
151 pub const fn serializer(&self) -> &Serializer {
153 &self.serializer
154 }
155
156 pub const fn batch_prefix(&self) -> &[u8] {
158 match (&self.framer, &self.serializer) {
159 (
160 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
161 delimiter: b',',
162 }),
163 Serializer::Json(_) | Serializer::NativeJson(_),
164 ) => b"[",
165 _ => &[],
166 }
167 }
168
169 pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
171 match (&self.framer, &self.serializer, empty) {
172 (
173 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
174 delimiter: b',',
175 }),
176 Serializer::Json(_) | Serializer::NativeJson(_),
177 _,
178 ) => b"]",
179 (Framer::NewlineDelimited(_), _, false) => b"\n",
180 _ => &[],
181 }
182 }
183
184 pub const fn content_type(&self) -> &'static str {
186 match (&self.serializer, &self.framer) {
187 (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
188 "application/x-ndjson"
189 }
190 (
191 Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
192 Framer::CharacterDelimited(crate::encoding::CharacterDelimitedEncoder {
193 delimiter: b',',
194 }),
195 ) => "application/json",
196 (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
197 (
198 Serializer::Avro(_)
199 | Serializer::Cef(_)
200 | Serializer::Csv(_)
201 | Serializer::Gelf(_)
202 | Serializer::Json(_)
203 | Serializer::Logfmt(_)
204 | Serializer::NativeJson(_)
205 | Serializer::RawMessage(_)
206 | Serializer::Text(_),
207 _,
208 ) => "text/plain",
209 #[cfg(feature = "syslog")]
210 (Serializer::Syslog(_), _) => "text/plain",
211 #[cfg(feature = "opentelemetry")]
212 (Serializer::Otlp(_), _) => "application/x-protobuf",
213 }
214 }
215}
216
217impl Encoder<()> {
218 pub const fn new(serializer: Serializer) -> Self {
221 Self {
222 framer: (),
223 serializer,
224 }
225 }
226
227 pub const fn serializer(&self) -> &Serializer {
229 &self.serializer
230 }
231}
232
233impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
234 type Error = Error;
235
236 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
237 let len = buffer.len();
238 let mut payload = buffer.split_off(len);
239
240 self.serialize_at_start(event, &mut payload)?;
241
242 self.framer.encode((), &mut payload).map_err(|error| {
244 emit(EncoderFramingError { error: &error });
245 Error::FramingError(error)
246 })?;
247
248 buffer.unsplit(payload);
249
250 Ok(())
251 }
252}
253
254impl tokio_util::codec::Encoder<Event> for Encoder<()> {
255 type Error = Error;
256
257 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
258 let len = buffer.len();
259 let mut payload = buffer.split_off(len);
260
261 self.serialize_at_start(event, &mut payload)?;
262
263 buffer.unsplit(payload);
264
265 Ok(())
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use bytes::BufMut;
272 use futures::{SinkExt, StreamExt};
273 use tokio_util::codec::FramedWrite;
274 use vector_core::event::LogEvent;
275
276 use super::*;
277 use crate::encoding::BoxedFramingError;
278
279 #[derive(Debug, Clone)]
280 struct ParenEncoder;
281
282 impl ParenEncoder {
283 pub(super) const fn new() -> Self {
284 Self
285 }
286 }
287
288 impl tokio_util::codec::Encoder<()> for ParenEncoder {
289 type Error = BoxedFramingError;
290
291 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
292 dst.reserve(2);
293 let inner = dst.split();
294 dst.put_u8(b'(');
295 dst.unsplit(inner);
296 dst.put_u8(b')');
297 Ok(())
298 }
299 }
300
301 #[derive(Debug, Clone)]
302 struct ErrorNthEncoder<T>(T, usize, usize)
303 where
304 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
305
306 impl<T> ErrorNthEncoder<T>
307 where
308 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
309 {
310 pub(super) const fn new(encoder: T, n: usize) -> Self {
311 Self(encoder, 0, n)
312 }
313 }
314
315 impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
316 where
317 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
318 {
319 type Error = BoxedFramingError;
320
321 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
322 self.0.encode((), dst)?;
323 let result = if self.1 == self.2 {
324 Err(Box::new(std::io::Error::other("error")) as _)
325 } else {
326 Ok(())
327 };
328 self.1 += 1;
329 result
330 }
331 }
332
333 #[tokio::test]
334 async fn test_encode_events_sink_empty() {
335 let encoder = Encoder::<Framer>::new(
336 Framer::Boxed(Box::new(ParenEncoder::new())),
337 crate::encoding::TextSerializerConfig::default()
338 .build()
339 .into(),
340 );
341 let source = futures::stream::iter(vec![
342 Event::Log(LogEvent::from("foo")),
343 Event::Log(LogEvent::from("bar")),
344 Event::Log(LogEvent::from("baz")),
345 ])
346 .map(Ok);
347 let sink = Vec::new();
348 let mut framed = FramedWrite::new(sink, encoder);
349 source.forward(&mut framed).await.unwrap();
350 let sink = framed.into_inner();
351 assert_eq!(sink, b"(foo)(bar)(baz)");
352 }
353
354 #[tokio::test]
355 async fn test_encode_events_sink_non_empty() {
356 let encoder = Encoder::<Framer>::new(
357 Framer::Boxed(Box::new(ParenEncoder::new())),
358 crate::encoding::TextSerializerConfig::default()
359 .build()
360 .into(),
361 );
362 let source = futures::stream::iter(vec![
363 Event::Log(LogEvent::from("bar")),
364 Event::Log(LogEvent::from("baz")),
365 Event::Log(LogEvent::from("bat")),
366 ])
367 .map(Ok);
368 let sink = Vec::from("(foo)");
369 let mut framed = FramedWrite::new(sink, encoder);
370 source.forward(&mut framed).await.unwrap();
371 let sink = framed.into_inner();
372 assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
373 }
374
375 #[tokio::test]
376 async fn test_encode_events_sink_empty_handle_framing_error() {
377 let encoder = Encoder::<Framer>::new(
378 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
379 crate::encoding::TextSerializerConfig::default()
380 .build()
381 .into(),
382 );
383 let source = futures::stream::iter(vec![
384 Event::Log(LogEvent::from("foo")),
385 Event::Log(LogEvent::from("bar")),
386 Event::Log(LogEvent::from("baz")),
387 ])
388 .map(Ok);
389 let sink = Vec::new();
390 let mut framed = FramedWrite::new(sink, encoder);
391 assert!(source.forward(&mut framed).await.is_err());
392 framed.flush().await.unwrap();
393 let sink = framed.into_inner();
394 assert_eq!(sink, b"(foo)");
395 }
396
397 #[tokio::test]
398 async fn test_encode_events_sink_non_empty_handle_framing_error() {
399 let encoder = Encoder::<Framer>::new(
400 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
401 crate::encoding::TextSerializerConfig::default()
402 .build()
403 .into(),
404 );
405 let source = futures::stream::iter(vec![
406 Event::Log(LogEvent::from("bar")),
407 Event::Log(LogEvent::from("baz")),
408 Event::Log(LogEvent::from("bat")),
409 ])
410 .map(Ok);
411 let sink = Vec::from("(foo)");
412 let mut framed = FramedWrite::new(sink, encoder);
413 assert!(source.forward(&mut framed).await.is_err());
414 framed.flush().await.unwrap();
415 let sink = framed.into_inner();
416 assert_eq!(sink, b"(foo)(bar)");
417 }
418
419 #[tokio::test]
420 async fn test_encode_batch_newline() {
421 let encoder = Encoder::<Framer>::new(
422 Framer::NewlineDelimited(crate::encoding::NewlineDelimitedEncoder::default()),
423 crate::encoding::TextSerializerConfig::default()
424 .build()
425 .into(),
426 );
427 let source = futures::stream::iter(vec![
428 Event::Log(LogEvent::from("bar")),
429 Event::Log(LogEvent::from("baz")),
430 Event::Log(LogEvent::from("bat")),
431 ])
432 .map(Ok);
433 let sink: Vec<u8> = Vec::new();
434 let mut framed = FramedWrite::new(sink, encoder);
435 source.forward(&mut framed).await.unwrap();
436 let sink = framed.into_inner();
437 assert_eq!(sink, b"bar\nbaz\nbat\n");
438 }
439}