1use bytes::BytesMut;
2use tokio_util::codec::Encoder as _;
3use vector_lib::codecs::{
4 CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig,
5 encoding::{Error, Framer, Serializer},
6};
7
8use crate::{
9 event::Event,
10 internal_events::{EncoderFramingError, EncoderSerializeError},
11};
12
13#[derive(Debug, Clone)]
14pub struct Encoder<Framer>
16where
17 Framer: Clone,
18{
19 framer: Framer,
20 serializer: Serializer,
21}
22
23impl Default for Encoder<Framer> {
24 fn default() -> Self {
25 Self {
26 framer: NewlineDelimitedEncoder::default().into(),
27 serializer: TextSerializerConfig::default().build().into(),
28 }
29 }
30}
31
32impl Default for Encoder<()> {
33 fn default() -> Self {
34 Self {
35 framer: (),
36 serializer: TextSerializerConfig::default().build().into(),
37 }
38 }
39}
40
41impl<Framer> Encoder<Framer>
42where
43 Framer: Clone,
44{
45 pub fn serialize(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
47 let len = buffer.len();
48 let mut payload = buffer.split_off(len);
49
50 self.serialize_at_start(event, &mut payload)?;
51
52 buffer.unsplit(payload);
53
54 Ok(())
55 }
56
57 fn serialize_at_start(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Error> {
59 self.serializer.encode(event, buffer).map_err(|error| {
60 emit!(EncoderSerializeError { error: &error });
61 Error::SerializingError(error)
62 })
63 }
64}
65
66impl Encoder<Framer> {
67 pub const fn new(framer: Framer, serializer: Serializer) -> Self {
71 Self { framer, serializer }
72 }
73
74 pub const fn framer(&self) -> &Framer {
76 &self.framer
77 }
78
79 pub const fn serializer(&self) -> &Serializer {
81 &self.serializer
82 }
83
84 pub const fn batch_prefix(&self) -> &[u8] {
86 match (&self.framer, &self.serializer) {
87 (
88 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
89 Serializer::Json(_) | Serializer::NativeJson(_),
90 ) => b"[",
91 _ => &[],
92 }
93 }
94
95 pub const fn batch_suffix(&self, empty: bool) -> &[u8] {
97 match (&self.framer, &self.serializer, empty) {
98 (
99 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
100 Serializer::Json(_) | Serializer::NativeJson(_),
101 _,
102 ) => b"]",
103 (Framer::NewlineDelimited(_), _, false) => b"\n",
104 _ => &[],
105 }
106 }
107
108 pub const fn content_type(&self) -> &'static str {
110 match (&self.serializer, &self.framer) {
111 (Serializer::Json(_) | Serializer::NativeJson(_), Framer::NewlineDelimited(_)) => {
112 "application/x-ndjson"
113 }
114 (
115 Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_),
116 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
117 ) => "application/json",
118 (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream",
119 (
120 Serializer::Avro(_)
121 | Serializer::Cef(_)
122 | Serializer::Csv(_)
123 | Serializer::Gelf(_)
124 | Serializer::Json(_)
125 | Serializer::Logfmt(_)
126 | Serializer::NativeJson(_)
127 | Serializer::RawMessage(_)
128 | Serializer::Text(_),
129 _,
130 ) => "text/plain",
131 #[cfg(feature = "codecs-opentelemetry")]
132 (Serializer::Otlp(_), _) => "application/x-protobuf",
133 }
134 }
135}
136
137impl Encoder<()> {
138 pub const fn new(serializer: Serializer) -> Self {
141 Self {
142 framer: (),
143 serializer,
144 }
145 }
146
147 pub const fn serializer(&self) -> &Serializer {
149 &self.serializer
150 }
151}
152
153impl tokio_util::codec::Encoder<Event> for Encoder<Framer> {
154 type Error = Error;
155
156 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
157 let len = buffer.len();
158 let mut payload = buffer.split_off(len);
159
160 self.serialize_at_start(event, &mut payload)?;
161
162 self.framer.encode((), &mut payload).map_err(|error| {
164 emit!(EncoderFramingError { error: &error });
165 Error::FramingError(error)
166 })?;
167
168 buffer.unsplit(payload);
169
170 Ok(())
171 }
172}
173
174impl tokio_util::codec::Encoder<Event> for Encoder<()> {
175 type Error = Error;
176
177 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
178 let len = buffer.len();
179 let mut payload = buffer.split_off(len);
180
181 self.serialize_at_start(event, &mut payload)?;
182
183 buffer.unsplit(payload);
184
185 Ok(())
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use bytes::BufMut;
192 use futures_util::{SinkExt, StreamExt};
193 use tokio_util::codec::FramedWrite;
194 use vector_lib::{codecs::encoding::BoxedFramingError, event::LogEvent};
195
196 use super::*;
197
198 #[derive(Debug, Clone)]
199 struct ParenEncoder;
200
201 impl ParenEncoder {
202 pub(super) const fn new() -> Self {
203 Self
204 }
205 }
206
207 impl tokio_util::codec::Encoder<()> for ParenEncoder {
208 type Error = BoxedFramingError;
209
210 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
211 dst.reserve(2);
212 let inner = dst.split();
213 dst.put_u8(b'(');
214 dst.unsplit(inner);
215 dst.put_u8(b')');
216 Ok(())
217 }
218 }
219
220 #[derive(Debug, Clone)]
221 struct ErrorNthEncoder<T>(T, usize, usize)
222 where
223 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>;
224
225 impl<T> ErrorNthEncoder<T>
226 where
227 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
228 {
229 pub(super) const fn new(encoder: T, n: usize) -> Self {
230 Self(encoder, 0, n)
231 }
232 }
233
234 impl<T> tokio_util::codec::Encoder<()> for ErrorNthEncoder<T>
235 where
236 T: tokio_util::codec::Encoder<(), Error = BoxedFramingError>,
237 {
238 type Error = BoxedFramingError;
239
240 fn encode(&mut self, _: (), dst: &mut BytesMut) -> Result<(), Self::Error> {
241 self.0.encode((), dst)?;
242 let result = if self.1 == self.2 {
243 Err(Box::new(std::io::Error::other("error")) as _)
244 } else {
245 Ok(())
246 };
247 self.1 += 1;
248 result
249 }
250 }
251
252 #[tokio::test]
253 async fn test_encode_events_sink_empty() {
254 let encoder = Encoder::<Framer>::new(
255 Framer::Boxed(Box::new(ParenEncoder::new())),
256 TextSerializerConfig::default().build().into(),
257 );
258 let source = futures::stream::iter(vec![
259 Event::Log(LogEvent::from("foo")),
260 Event::Log(LogEvent::from("bar")),
261 Event::Log(LogEvent::from("baz")),
262 ])
263 .map(Ok);
264 let sink = Vec::new();
265 let mut framed = FramedWrite::new(sink, encoder);
266 source.forward(&mut framed).await.unwrap();
267 let sink = framed.into_inner();
268 assert_eq!(sink, b"(foo)(bar)(baz)");
269 }
270
271 #[tokio::test]
272 async fn test_encode_events_sink_non_empty() {
273 let encoder = Encoder::<Framer>::new(
274 Framer::Boxed(Box::new(ParenEncoder::new())),
275 TextSerializerConfig::default().build().into(),
276 );
277 let source = futures::stream::iter(vec![
278 Event::Log(LogEvent::from("bar")),
279 Event::Log(LogEvent::from("baz")),
280 Event::Log(LogEvent::from("bat")),
281 ])
282 .map(Ok);
283 let sink = Vec::from("(foo)");
284 let mut framed = FramedWrite::new(sink, encoder);
285 source.forward(&mut framed).await.unwrap();
286 let sink = framed.into_inner();
287 assert_eq!(sink, b"(foo)(bar)(baz)(bat)");
288 }
289
290 #[tokio::test]
291 async fn test_encode_events_sink_empty_handle_framing_error() {
292 let encoder = Encoder::<Framer>::new(
293 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
294 TextSerializerConfig::default().build().into(),
295 );
296 let source = futures::stream::iter(vec![
297 Event::Log(LogEvent::from("foo")),
298 Event::Log(LogEvent::from("bar")),
299 Event::Log(LogEvent::from("baz")),
300 ])
301 .map(Ok);
302 let sink = Vec::new();
303 let mut framed = FramedWrite::new(sink, encoder);
304 assert!(source.forward(&mut framed).await.is_err());
305 framed.flush().await.unwrap();
306 let sink = framed.into_inner();
307 assert_eq!(sink, b"(foo)");
308 }
309
310 #[tokio::test]
311 async fn test_encode_events_sink_non_empty_handle_framing_error() {
312 let encoder = Encoder::<Framer>::new(
313 Framer::Boxed(Box::new(ErrorNthEncoder::new(ParenEncoder::new(), 1))),
314 TextSerializerConfig::default().build().into(),
315 );
316 let source = futures::stream::iter(vec![
317 Event::Log(LogEvent::from("bar")),
318 Event::Log(LogEvent::from("baz")),
319 Event::Log(LogEvent::from("bat")),
320 ])
321 .map(Ok);
322 let sink = Vec::from("(foo)");
323 let mut framed = FramedWrite::new(sink, encoder);
324 assert!(source.forward(&mut framed).await.is_err());
325 framed.flush().await.unwrap();
326 let sink = framed.into_inner();
327 assert_eq!(sink, b"(foo)(bar)");
328 }
329
330 #[tokio::test]
331 async fn test_encode_batch_newline() {
332 let encoder = Encoder::<Framer>::new(
333 Framer::NewlineDelimited(NewlineDelimitedEncoder::default()),
334 TextSerializerConfig::default().build().into(),
335 );
336 let source = futures::stream::iter(vec![
337 Event::Log(LogEvent::from("bar")),
338 Event::Log(LogEvent::from("baz")),
339 Event::Log(LogEvent::from("bat")),
340 ])
341 .map(Ok);
342 let sink: Vec<u8> = Vec::new();
343 let mut framed = FramedWrite::new(sink, encoder);
344 source.forward(&mut framed).await.unwrap();
345 let sink = framed.into_inner();
346 assert_eq!(sink, b"bar\nbaz\nbat\n");
347 }
348}