codecs/decoding/framing/
newline_delimited.rs1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::{BoxedFramingError, CharacterDelimitedDecoder};
7
8#[configurable_component]
10#[derive(Debug, Clone, Default, PartialEq, Eq)]
11pub struct NewlineDelimitedDecoderConfig {
12 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
14 pub newline_delimited: NewlineDelimitedDecoderOptions,
15}
16
17#[configurable_component]
19#[derive(Clone, Debug, Derivative, PartialEq, Eq)]
20#[derivative(Default)]
21pub struct NewlineDelimitedDecoderOptions {
22 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
34 pub max_length: Option<usize>,
35}
36
37impl NewlineDelimitedDecoderOptions {
38 pub const fn new_with_max_length(max_length: usize) -> Self {
40 Self {
41 max_length: Some(max_length),
42 }
43 }
44}
45
46impl NewlineDelimitedDecoderConfig {
47 pub fn new() -> Self {
49 Default::default()
50 }
51
52 pub const fn new_with_max_length(max_length: usize) -> Self {
54 Self {
55 newline_delimited: { NewlineDelimitedDecoderOptions::new_with_max_length(max_length) },
56 }
57 }
58
59 pub const fn build(&self) -> NewlineDelimitedDecoder {
61 if let Some(max_length) = self.newline_delimited.max_length {
62 NewlineDelimitedDecoder::new_with_max_length(max_length)
63 } else {
64 NewlineDelimitedDecoder::new()
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct NewlineDelimitedDecoder(CharacterDelimitedDecoder);
72
73impl NewlineDelimitedDecoder {
74 pub const fn new() -> Self {
76 Self(CharacterDelimitedDecoder::new(b'\n'))
77 }
78
79 pub const fn new_with_max_length(max_length: usize) -> Self {
83 Self(CharacterDelimitedDecoder::new_with_max_length(
84 b'\n', max_length,
85 ))
86 }
87}
88
89impl Default for NewlineDelimitedDecoder {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl Decoder for NewlineDelimitedDecoder {
96 type Item = Bytes;
97 type Error = BoxedFramingError;
98
99 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
100 self.0.decode(src)
101 }
102
103 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
104 self.0.decode_eof(src)
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[test]
113 fn decode_bytes_with_newlines() {
114 let mut input = BytesMut::from("foo\nbar\nbaz");
115 let mut decoder = NewlineDelimitedDecoder::new();
116
117 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
118 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
119 assert_eq!(decoder.decode(&mut input).unwrap(), None);
120 }
121
122 #[test]
123 fn decode_bytes_with_newlines_trailing() {
124 let mut input = BytesMut::from("foo\nbar\nbaz\n");
125 let mut decoder = NewlineDelimitedDecoder::new();
126
127 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
128 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
129 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
130 assert_eq!(decoder.decode(&mut input).unwrap(), None);
131 }
132
133 #[test]
134 fn decode_bytes_with_newlines_and_max_length() {
135 let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
136 let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
137
138 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
139 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "baz");
140 assert_eq!(decoder.decode(&mut input).unwrap(), None);
141 }
142
143 #[test]
144 fn decode_eof_bytes_with_newlines() {
145 let mut input = BytesMut::from("foo\nbar\nbaz");
146 let mut decoder = NewlineDelimitedDecoder::new();
147
148 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
149 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
150 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
151 }
152
153 #[test]
154 fn decode_eof_bytes_with_newlines_trailing() {
155 let mut input = BytesMut::from("foo\nbar\nbaz\n");
156 let mut decoder = NewlineDelimitedDecoder::new();
157
158 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
159 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
160 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
161 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
162 }
163
164 #[test]
165 fn decode_eof_bytes_with_newlines_and_max_length() {
166 let mut input = BytesMut::from("foo\nbarbara\nbaz\n");
167 let mut decoder = NewlineDelimitedDecoder::new_with_max_length(3);
168
169 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
170 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "baz");
171 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
172 }
173}