1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3#[cfg(feature = "codecs-arrow")]
4use vector_lib::codecs::encoding::ArrowStreamSerializer;
5use vector_lib::codecs::{
6 CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
7 encoding::{Error, Framer, Serializer},
8};
9
10use crate::{
11 event::Event,
12 internal_events::{EncoderFramingError, EncoderSerializeError},
13};
14
15#[derive(Debug, Clone)]
17pub enum BatchSerializer {
18 #[cfg(feature = "codecs-arrow")]
20 Arrow(ArrowStreamSerializer),
21}
22
23#[derive(Debug, Clone)]
25pub struct BatchEncoder {
26 serializer: BatchSerializer,
27}
28
29impl BatchEncoder {
30 pub const fn new(serializer: BatchSerializer) -> Self {
32 Self { serializer }
33 }
34
35 pub const fn serializer(&self) -> &BatchSerializer {
37 &self.serializer
38 }
39
40 #[cfg(feature = "codecs-arrow")]
42 pub const fn content_type(&self) -> &'static str {
43 match &self.serializer {
44 BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream",
45 }
46 }
47}
48
49impl tokio_util::codec::Encoder<Vec<Event>> for BatchEncoder {
50 type Error = Error;
51
52 #[allow(unused_variables)]
53 fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
54 #[allow(unreachable_patterns)]
55 match &mut self.serializer {
56 #[cfg(feature = "codecs-arrow")]
57 BatchSerializer::Arrow(serializer) => {
58 serializer.encode(events, buffer).map_err(|err| {
59 use vector_lib::codecs::encoding::ArrowEncodingError;
60 match err {
61 ArrowEncodingError::NullConstraint { .. } => {
62 Error::SchemaConstraintViolation(Box::new(err))
63 }
64 _ => Error::SerializingError(Box::new(err)),
65 }
66 })
67 }
68 _ => unreachable!("BatchSerializer cannot be constructed without encode()"),
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub enum EncoderKind {
76 Framed(Box<Encoder<Framer>>),
78 #[cfg(feature = "codecs-arrow")]
80 Batch(BatchEncoder),
81}
82
83#[derive(Debug, Clone)]
84pub struct Encoder<Framer>
86where
87 Framer: Clone,
88{
89 framer: Framer,
90 serializer: Serializer,
91}
92
93impl Default for Encoder<Framer> {
94 fn default() -> Self {
95 Self {
96 framer: NewlineDelimitedEncoder::default().into(),
97 serializer: TextSerializerConfig::default().build().into(),
98 }
99 }
100}
101
102impl Default for Encoder<()> {
103 fn default() -> Self {
104 Self {
105 framer: (),
106 serializer: TextSerializerConfig::default().build().into(),
107 }
108 }
109}
110
111impl<Framer> Encoder<Framer>
112where
113 Framer: Clone,
114{
115 pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
117 let len = buffer.len();
118 let mut payload = buffer.split_off(len);
119
120 self.serialize_at_start(event, &mut payload)?;
121
122 buffer.unsplit(payload);
123
124 Ok(())
125 }
126
127 fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
129 self.serializer.encode(event, buffer).map_err(|error| {
130 emit!(EncoderSerializeError { error: &error });
131 Error::SerializingError(error)
132 })
133 }
134}
135
136impl Encoder<Framer> {
137 pub const fn new(framer: Framer, serializer: Serializer) -> Self {
141 Self { framer, serializer }
142 }
143
144 pub const fn framer(&self) -> &Framer {
146 &self.framer
147 }
148
149 pub const fn serializer(&self) -> &Serializer {
151 &self.serializer
152 }
153
154 pub const fn batch_prefix(&self) -> &[u8] {
156 match (&self.framer, &self.serializer) {
157 (
158 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
159 Serializer::Json(_) | Serializer::NativeJson(_),
160 ) => b"[",
161 _ => &[],
162 }
163 }
164
165 pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
167 match (&self.framer, &self.serializer, empty) {
168 (
169 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
170 Serializer::Json(_) | Serializer::NativeJson(_),
171 _,
172 ) => b"]",
173 (Framer::NewlineDelimited(_), _, false) => b"\n",
174 _ => &[],
175 }
176 }
177
178 pub const fn content_type(&self) -> &'static str {
180 match (&self.serializer, &self.framer) {
181 (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
182 "application/x-ndjson"
183 }
184 (
185 Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
186 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
187 ) => "application/json",
188 (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
189 (
190 Serializer::Avro(_)
191 | Serializer::Cef(_)
192 | Serializer::Csv(_)
193 | Serializer::Gelf(_)
194 | Serializer::Json(_)
195 | Serializer::Logfmt(_)
196 | Serializer::NativeJson(_)
197 | Serializer::RawMessage(_)
198 | Serializer::Text(_),
199 _,
200 ) => "text/plain",
201 #[cfg(feature = "codecs-opentelemetry")]
202 (Serializer::Otlp(_), _) => "application/x-protobuf",
203 }
204 }
205}
206
207impl Encoder<()> {
208 pub const fn new(serializer: Serializer) -> Self {
211 Self {
212 framer: (),
213 serializer,
214 }
215 }
216
217 pub const fn serializer(&self) -> &Serializer {
219 &self.serializer
220 }
221}
222
223impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
224 type Error = Error;
225
226 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
227 let len = buffer.len();
228 let mut payload = buffer.split_off(len);
229
230 self.serialize_at_start(event, &mut payload)?;
231
232 self.framer.encode((), &mut payload).map_err(|error| {
234 emit!(EncoderFramingError { error: &error });
235 Error::FramingError(error)
236 })?;
237
238 buffer.unsplit(payload);
239
240 Ok(())
241 }
242}
243
244impl tokio_util::codec::Encoder<Event> for Encoder<()> {
245 type Error = Error;
246
247 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
248 let len = buffer.len();
249 let mut payload = buffer.split_off(len);
250
251 self.serialize_at_start(event, &mut payload)?;
252
253 buffer.unsplit(payload);
254
255 Ok(())
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use bytes::BufMut;
262 use futures_util::{SinkExt, StreamExt};
263 use tokio_util::codec::FramedWrite;
264 use vector_lib::{codecs::encoding::BoxedFramingError, event::LogEvent};
265
266 use super::*;
267
268 #[derive(Debug, Clone)]
269 struct ParenEncoder;
270
271 impl ParenEncoder {
272 pub(super) const fn new() -> Self {
273 Self
274 }
275 }
276
277 impl tokio_util::codec::Encoder<()> for ParenEncoder {
278 type Error = BoxedFramingError;
279
280 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
281 dst.reserve(2);
282 let inner = dst.split();
283 dst.put_u8(b'(');
284 dst.unsplit(inner);
285 dst.put_u8(b')');
286 Ok(())
287 }
288 }
289
290 #[derive(Debug, Clone)]
291 struct ErrorNthEncoder<T>(T, usize, usize)
292 where
293 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
294
295 impl<T> ErrorNthEncoder<T>
296 where
297 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
298 {
299 pub(super) const fn new(encoder: T, n: usize) -> Self {
300 Self(encoder, 0, n)
301 }
302 }
303
304 impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
305 where
306 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
307 {
308 type Error = BoxedFramingError;
309
310 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
311 self.0.encode((), dst)?;
312 let result = if self.1 == self.2 {
313 Err(Box::new(std::io::Error::other("error")) as _)
314 } else {
315 Ok(())
316 };
317 self.1 += 1;
318 result
319 }
320 }
321
322 #[tokio::test]
323 async fn test_encode_events_sink_empty() {
324 let encoder = Encoder::<Framer>::new(
325 Framer::Boxed(Box::new(ParenEncoder::new())),
326 TextSerializerConfig::default().build().into(),
327 );
328 let source = futures::stream::iter(vec![
329 Event::Log(LogEvent::from("foo")),
330 Event::Log(LogEvent::from("bar")),
331 Event::Log(LogEvent::from("baz")),
332 ])
333 .map(Ok);
334 let sink = Vec::new();
335 let mut framed = FramedWrite::new(sink, encoder);
336 source.forward(&mut framed).await.unwrap();
337 let sink = framed.into_inner();
338 assert_eq!(sink, b"(foo)(bar)(baz)");
339 }
340
341 #[tokio::test]
342 async fn test_encode_events_sink_non_empty() {
343 let encoder = Encoder::<Framer>::new(
344 Framer::Boxed(Box::new(ParenEncoder::new())),
345 TextSerializerConfig::default().build().into(),
346 );
347 let source = futures::stream::iter(vec![
348 Event::Log(LogEvent::from("bar")),
349 Event::Log(LogEvent::from("baz")),
350 Event::Log(LogEvent::from("bat")),
351 ])
352 .map(Ok);
353 let sink = Vec::from("(foo)");
354 let mut framed = FramedWrite::new(sink, encoder);
355 source.forward(&mut framed).await.unwrap();
356 let sink = framed.into_inner();
357 assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
358 }
359
360 #[tokio::test]
361 async fn test_encode_events_sink_empty_handle_framing_error() {
362 let encoder = Encoder::<Framer>::new(
363 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
364 TextSerializerConfig::default().build().into(),
365 );
366 let source = futures::stream::iter(vec![
367 Event::Log(LogEvent::from("foo")),
368 Event::Log(LogEvent::from("bar")),
369 Event::Log(LogEvent::from("baz")),
370 ])
371 .map(Ok);
372 let sink = Vec::new();
373 let mut framed = FramedWrite::new(sink, encoder);
374 assert!(source.forward(&mut framed).await.is_err());
375 framed.flush().await.unwrap();
376 let sink = framed.into_inner();
377 assert_eq!(sink, b"(foo)");
378 }
379
380 #[tokio::test]
381 async fn test_encode_events_sink_non_empty_handle_framing_error() {
382 let encoder = Encoder::<Framer>::new(
383 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
384 TextSerializerConfig::default().build().into(),
385 );
386 let source = futures::stream::iter(vec![
387 Event::Log(LogEvent::from("bar")),
388 Event::Log(LogEvent::from("baz")),
389 Event::Log(LogEvent::from("bat")),
390 ])
391 .map(Ok);
392 let sink = Vec::from("(foo)");
393 let mut framed = FramedWrite::new(sink, encoder);
394 assert!(source.forward(&mut framed).await.is_err());
395 framed.flush().await.unwrap();
396 let sink = framed.into_inner();
397 assert_eq!(sink, b"(foo)(bar)");
398 }
399
400 #[tokio::test]
401 async fn test_encode_batch_newline() {
402 let encoder = Encoder::<Framer>::new(
403 Framer::NewlineDelimited(NewlineDelimitedEncoder::default()),
404 TextSerializerConfig::default().build().into(),
405 );
406 let source = futures::stream::iter(vec![
407 Event::Log(LogEvent::from("bar")),
408 Event::Log(LogEvent::from("baz")),
409 Event::Log(LogEvent::from("bat")),
410 ])
411 .map(Ok);
412 let sink: Vec<u8> = Vec::new();
413 let mut framed = FramedWrite::new(sink, encoder);
414 source.forward(&mut framed).await.unwrap();
415 let sink = framed.into_inner();
416 assert_eq!(sink, b"bar\nbaz\nbat\n");
417 }
418}