1use bytes::{Buf, Bytes, BytesMut};
2use memchr::memchr;
3use tokio_util::codec::Decoder;
4use tracing::{trace, warn};
5use vector_config::configurable_component;
6
7use super::BoxedFramingError;
8
9#[configurable_component]
11#[derive(Debug, Clone)]
12pub struct CharacterDelimitedDecoderConfig {
13 pub character_delimited: CharacterDelimitedDecoderOptions,
15}
16
17impl CharacterDelimitedDecoderConfig {
18 pub const fn new(delimiter: u8) -> Self {
20 Self {
21 character_delimited: CharacterDelimitedDecoderOptions::new(delimiter, None),
22 }
23 }
24 pub const fn build(&self) -> CharacterDelimitedDecoder {
26 if let Some(max_length) = self.character_delimited.max_length {
27 CharacterDelimitedDecoder::new_with_max_length(
28 self.character_delimited.delimiter,
29 max_length,
30 )
31 } else {
32 CharacterDelimitedDecoder::new(self.character_delimited.delimiter)
33 }
34 }
35}
36
37#[configurable_component]
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct CharacterDelimitedDecoderOptions {
41 #[configurable(metadata(docs::type_override = "ascii_char"))]
43 #[serde(with = "vector_core::serde::ascii_char")]
44 pub delimiter: u8,
45
46 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
58 pub max_length: Option<usize>,
59}
60
61impl CharacterDelimitedDecoderOptions {
62 pub const fn new(delimiter: u8, max_length: Option<usize>) -> Self {
64 Self {
65 delimiter,
66 max_length,
67 }
68 }
69}
70
71#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct CharacterDelimitedDecoder {
74 pub delimiter: u8,
76 pub max_length: usize,
78}
79
80impl CharacterDelimitedDecoder {
81 pub const fn new(delimiter: u8) -> Self {
83 CharacterDelimitedDecoder {
84 delimiter,
85 max_length: usize::MAX,
86 }
87 }
88
89 pub const fn new_with_max_length(delimiter: u8, max_length: usize) -> Self {
93 CharacterDelimitedDecoder {
94 max_length,
95 ..CharacterDelimitedDecoder::new(delimiter)
96 }
97 }
98
99 pub const fn max_length(&self) -> usize {
101 self.max_length
102 }
103}
104
105impl Decoder for CharacterDelimitedDecoder {
106 type Item = Bytes;
107 type Error = BoxedFramingError;
108
109 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
110 loop {
111 match memchr(self.delimiter, buf) {
118 None => return Ok(None),
119 Some(next_delimiter_idx) => {
120 if next_delimiter_idx > self.max_length {
121 warn!(
124 message = "Discarding frame larger than max_length.",
125 buf_len = buf.len(),
126 max_length = self.max_length
127 );
128 buf.advance(next_delimiter_idx + 1);
129 } else {
130 let frame = buf.split_to(next_delimiter_idx).freeze();
131 trace!(
132 message = "Decoding the frame.",
133 bytes_processed = frame.len()
134 );
135 buf.advance(1); return Ok(Some(frame));
137 }
138 }
139 }
140 }
141 }
142
143 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
144 match self.decode(buf)? {
145 Some(frame) => Ok(Some(frame)),
146 None => {
147 if buf.is_empty() {
148 Ok(None)
149 } else if buf.len() > self.max_length {
150 warn!(
151 message = "Discarding frame larger than max_length.",
152 buf_len = buf.len(),
153 max_length = self.max_length
154 );
155 Ok(None)
156 } else {
157 let bytes: Bytes = buf.split_to(buf.len()).freeze();
158 Ok(Some(bytes))
159 }
160 }
161 }
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use std::collections::HashMap;
168
169 use bytes::BufMut;
170 use indoc::indoc;
171
172 use super::*;
173
174 #[test]
175 fn decode() {
176 let mut codec = CharacterDelimitedDecoder::new(b'\n');
177 let buf = &mut BytesMut::new();
178 buf.put_slice(b"abc\n");
179 assert_eq!(Some("abc".into()), codec.decode(buf).unwrap());
180 }
181
182 #[test]
183 fn decode_max_length() {
184 const MAX_LENGTH: usize = 6;
185
186 let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
187 let buf = &mut BytesMut::new();
188
189 buf.put_slice(b"1234567\n123456\n123412314\n123");
191
192 assert_eq!(codec.decode(buf).unwrap(), Some(Bytes::from("123456")));
193 assert_eq!(codec.decode(buf).unwrap(), None);
194
195 let buf = &mut BytesMut::new();
196
197 buf.put_slice(b"1234567\n123456\n123412314\n123");
199
200 assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123456")));
201 assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123")));
202 assert_eq!(codec.decode_eof(buf).unwrap(), None);
203 }
204
205 #[test]
208 fn decode_discard_repeat() {
209 const MAX_LENGTH: usize = 1;
210
211 let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
212 let buf = &mut BytesMut::new();
213
214 buf.reserve(200);
215 buf.put(&b"aa"[..]);
216 assert!(codec.decode(buf).unwrap().is_none());
217 buf.put(&b"a"[..]);
218 assert!(codec.decode(buf).unwrap().is_none());
219 }
220
221 #[test]
222 fn decode_json_escaped() {
223 let mut input = HashMap::new();
224 input.insert("key", "value");
225 input.insert("new", "li\nne");
226
227 let mut bytes = serde_json::to_vec(&input).unwrap();
228 bytes.push(b'\n');
229
230 let mut codec = CharacterDelimitedDecoder::new(b'\n');
231 let buf = &mut BytesMut::new();
232
233 buf.reserve(bytes.len());
234 buf.extend(bytes);
235
236 let result = codec.decode(buf).unwrap();
237
238 assert!(result.is_some());
239 assert!(buf.is_empty());
240 }
241
242 #[test]
243 fn decode_json_multiline() {
244 let events = indoc! {r#"
245 {"log":"\u0009at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)\n","stream":"stdout","time":"2019-01-18T07:49:27.374616758Z"}
246 {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374640288Z"}
247 {"log":"\u0009at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)\n","stream":"stdout","time":"2019-01-18T07:49:27.374655505Z"}
248 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374671955Z"}
249 {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374690312Z"}
250 {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)\n","stream":"stdout","time":"2019-01-18T07:49:27.374704522Z"}
251 {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)\n","stream":"stdout","time":"2019-01-18T07:49:27.374718459Z"}
252 {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)\n","stream":"stdout","time":"2019-01-18T07:49:27.374732919Z"}
253 {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)\n","stream":"stdout","time":"2019-01-18T07:49:27.374750799Z"}
254 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374764819Z"}
255 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374778682Z"}
256 {"log":"\u0009at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\n","stream":"stdout","time":"2019-01-18T07:49:27.374792429Z"}
257 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374805985Z"}
258 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374819625Z"}
259 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374833335Z"}
260 {"log":"\u0009at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)\n","stream":"stdout","time":"2019-01-18T07:49:27.374847845Z"}
261 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374861925Z"}
262 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37487589Z"}
263 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374890043Z"}
264 {"log":"\u0009at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)\n","stream":"stdout","time":"2019-01-18T07:49:27.374903813Z"}
265 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374917793Z"}
266 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374931586Z"}
267 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374946006Z"}
268 {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)\n","stream":"stdout","time":"2019-01-18T07:49:27.37496104Z"}
269 {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106)\n","stream":"stdout","time":"2019-01-18T07:49:27.37498773Z"}
270 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.375003113Z"}
271 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.375017063Z"}
272 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.37503086Z"}
273 {"log":"\u0009at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)\n","stream":"stdout","time":"2019-01-18T07:49:27.3750454Z"}
274 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.37505928Z"}
275 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37507306Z"}
276 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.375086726Z"}
277 {"log":"\u0009at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\n","stream":"stdout","time":"2019-01-18T07:49:27.375100817Z"}
278 {"log":"\u0009at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)\n","stream":"stdout","time":"2019-01-18T07:49:27.375115354Z"}
279 {"log":"\u0009at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)\n","stream":"stdout","time":"2019-01-18T07:49:27.375129454Z"}
280 {"log":"\u0009at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\n","stream":"stdout","time":"2019-01-18T07:49:27.375144001Z"}
281 {"log":"\u0009at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)\n","stream":"stdout","time":"2019-01-18T07:49:27.375157464Z"}
282 {"log":"\u0009at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\n","stream":"stdout","time":"2019-01-18T07:49:27.375170981Z"}
283 {"log":"\u0009at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)\n","stream":"stdout","time":"2019-01-18T07:49:27.375184417Z"}
284 {"log":"\u0009at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:800)\n","stream":"stdout","time":"2019-01-18T07:49:27.375198024Z"}
285 {"log":"\u0009at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\n","stream":"stdout","time":"2019-01-18T07:49:27.375211594Z"}
286 {"log":"\u0009at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)\n","stream":"stdout","time":"2019-01-18T07:49:27.375225237Z"}
287 {"log":"\u0009at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)\n","stream":"stdout","time":"2019-01-18T07:49:27.375239487Z"}
288 {"log":"\u0009at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\n","stream":"stdout","time":"2019-01-18T07:49:27.375253464Z"}
289 {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n","stream":"stdout","time":"2019-01-18T07:49:27.375323255Z"}
290 {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n","stream":"stdout","time":"2019-01-18T07:49:27.375345642Z"}
291 {"log":"\u0009at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\n","stream":"stdout","time":"2019-01-18T07:49:27.375363208Z"}
292 {"log":"\u0009at java.lang.Thread.run(Thread.java:748)\n","stream":"stdout","time":"2019-01-18T07:49:27.375377695Z"}
293 {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375391335Z"}
294 {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375416915Z"}
295 {"log":"2019-01-18 07:53:06.419 [ ] INFO 1 --- [vent-bus.prod-1] c.t.listener.CommonListener : warehousing Dailywarehousing.daily\n","stream":"stdout","time":"2019-01-18T07:53:06.420527437Z"}
296 "#};
297
298 let mut codec = CharacterDelimitedDecoder::new(b'\n');
299 let buf = &mut BytesMut::new();
300
301 buf.extend(events.to_string().as_bytes());
302
303 let mut i = 0;
304 while codec.decode(buf).unwrap().is_some() {
305 i += 1;
306 }
307
308 assert_eq!(i, 51);
309 }
310}