1use bytes::{Buf, Bytes, BytesMut};
2use memchr::memchr;
3use tokio_util::codec::Decoder;
4use tracing::{trace, warn};
5use vector_config::configurable_component;
6
7use super::BoxedFramingError;
8
9#[configurable_component]
11#[derive(Debug, Clone)]
12pub struct CharacterDelimitedDecoderConfig {
13 pub character_delimited: CharacterDelimitedDecoderOptions,
15}
16
17impl CharacterDelimitedDecoderConfig {
18 pub const fn new(delimiter: u8) -> Self {
20 Self {
21 character_delimited: CharacterDelimitedDecoderOptions::new(delimiter, None),
22 }
23 }
24 pub const fn build(&self) -> CharacterDelimitedDecoder {
26 if let Some(max_length) = self.character_delimited.max_length {
27 CharacterDelimitedDecoder::new_with_max_length(
28 self.character_delimited.delimiter,
29 max_length,
30 )
31 } else {
32 CharacterDelimitedDecoder::new(self.character_delimited.delimiter)
33 }
34 }
35}
36
37#[configurable_component]
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct CharacterDelimitedDecoderOptions {
41 #[configurable(metadata(docs::type_override = "ascii_char"))]
43 #[serde(with = "vector_core::serde::ascii_char")]
44 pub delimiter: u8,
45
46 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
58 pub max_length: Option<usize>,
59}
60
61impl CharacterDelimitedDecoderOptions {
62 pub const fn new(delimiter: u8, max_length: Option<usize>) -> Self {
64 Self {
65 delimiter,
66 max_length,
67 }
68 }
69}
70
71#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
73pub struct CharacterDelimitedDecoder {
74 pub delimiter: u8,
76 pub max_length: usize,
78}
79
80impl CharacterDelimitedDecoder {
81 pub const fn new(delimiter: u8) -> Self {
83 CharacterDelimitedDecoder {
84 delimiter,
85 max_length: usize::MAX,
86 }
87 }
88
89 pub const fn new_with_max_length(delimiter: u8, max_length: usize) -> Self {
93 CharacterDelimitedDecoder {
94 max_length,
95 ..CharacterDelimitedDecoder::new(delimiter)
96 }
97 }
98
99 pub const fn max_length(&self) -> usize {
101 self.max_length
102 }
103}
104
105impl Decoder for CharacterDelimitedDecoder {
106 type Item = Bytes;
107 type Error = BoxedFramingError;
108
109 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
110 loop {
111 match memchr(self.delimiter, buf) {
118 None => return Ok(None),
119 Some(next_delimiter_idx) => {
120 if next_delimiter_idx > self.max_length {
121 warn!(
124 message = "Discarding frame larger than max_length.",
125 buf_len = buf.len(),
126 max_length = self.max_length,
127 internal_log_rate_limit = true
128 );
129 buf.advance(next_delimiter_idx + 1);
130 } else {
131 let frame = buf.split_to(next_delimiter_idx).freeze();
132 trace!(
133 message = "Decoding the frame.",
134 bytes_processed = frame.len()
135 );
136 buf.advance(1); return Ok(Some(frame));
138 }
139 }
140 }
141 }
142 }
143
144 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, Self::Error> {
145 match self.decode(buf)? {
146 Some(frame) => Ok(Some(frame)),
147 None => {
148 if buf.is_empty() {
149 Ok(None)
150 } else if buf.len() > self.max_length {
151 warn!(
152 message = "Discarding frame larger than max_length.",
153 buf_len = buf.len(),
154 max_length = self.max_length,
155 internal_log_rate_limit = true
156 );
157 Ok(None)
158 } else {
159 let bytes: Bytes = buf.split_to(buf.len()).freeze();
160 Ok(Some(bytes))
161 }
162 }
163 }
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::collections::HashMap;
170
171 use bytes::BufMut;
172 use indoc::indoc;
173
174 use super::*;
175
176 #[test]
177 fn decode() {
178 let mut codec = CharacterDelimitedDecoder::new(b'\n');
179 let buf = &mut BytesMut::new();
180 buf.put_slice(b"abc\n");
181 assert_eq!(Some("abc".into()), codec.decode(buf).unwrap());
182 }
183
184 #[test]
185 fn decode_max_length() {
186 const MAX_LENGTH: usize = 6;
187
188 let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
189 let buf = &mut BytesMut::new();
190
191 buf.put_slice(b"1234567\n123456\n123412314\n123");
193
194 assert_eq!(codec.decode(buf).unwrap(), Some(Bytes::from("123456")));
195 assert_eq!(codec.decode(buf).unwrap(), None);
196
197 let buf = &mut BytesMut::new();
198
199 buf.put_slice(b"1234567\n123456\n123412314\n123");
201
202 assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123456")));
203 assert_eq!(codec.decode_eof(buf).unwrap(), Some(Bytes::from("123")));
204 assert_eq!(codec.decode_eof(buf).unwrap(), None);
205 }
206
207 #[test]
210 fn decode_discard_repeat() {
211 const MAX_LENGTH: usize = 1;
212
213 let mut codec = CharacterDelimitedDecoder::new_with_max_length(b'\n', MAX_LENGTH);
214 let buf = &mut BytesMut::new();
215
216 buf.reserve(200);
217 buf.put(&b"aa"[..]);
218 assert!(codec.decode(buf).unwrap().is_none());
219 buf.put(&b"a"[..]);
220 assert!(codec.decode(buf).unwrap().is_none());
221 }
222
223 #[test]
224 fn decode_json_escaped() {
225 let mut input = HashMap::new();
226 input.insert("key", "value");
227 input.insert("new", "li\nne");
228
229 let mut bytes = serde_json::to_vec(&input).unwrap();
230 bytes.push(b'\n');
231
232 let mut codec = CharacterDelimitedDecoder::new(b'\n');
233 let buf = &mut BytesMut::new();
234
235 buf.reserve(bytes.len());
236 buf.extend(bytes);
237
238 let result = codec.decode(buf).unwrap();
239
240 assert!(result.is_some());
241 assert!(buf.is_empty());
242 }
243
244 #[test]
245 fn decode_json_multiline() {
246 let events = indoc! {r#"
247 {"log":"\u0009at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)\n","stream":"stdout","time":"2019-01-18T07:49:27.374616758Z"}
248 {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374640288Z"}
249 {"log":"\u0009at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)\n","stream":"stdout","time":"2019-01-18T07:49:27.374655505Z"}
250 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374671955Z"}
251 {"log":"\u0009at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)\n","stream":"stdout","time":"2019-01-18T07:49:27.374690312Z"}
252 {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)\n","stream":"stdout","time":"2019-01-18T07:49:27.374704522Z"}
253 {"log":"\u0009at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)\n","stream":"stdout","time":"2019-01-18T07:49:27.374718459Z"}
254 {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)\n","stream":"stdout","time":"2019-01-18T07:49:27.374732919Z"}
255 {"log":"\u0009at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)\n","stream":"stdout","time":"2019-01-18T07:49:27.374750799Z"}
256 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374764819Z"}
257 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374778682Z"}
258 {"log":"\u0009at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)\n","stream":"stdout","time":"2019-01-18T07:49:27.374792429Z"}
259 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374805985Z"}
260 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374819625Z"}
261 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374833335Z"}
262 {"log":"\u0009at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)\n","stream":"stdout","time":"2019-01-18T07:49:27.374847845Z"}
263 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374861925Z"}
264 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37487589Z"}
265 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374890043Z"}
266 {"log":"\u0009at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93)\n","stream":"stdout","time":"2019-01-18T07:49:27.374903813Z"}
267 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.374917793Z"}
268 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.374931586Z"}
269 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.374946006Z"}
270 {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117)\n","stream":"stdout","time":"2019-01-18T07:49:27.37496104Z"}
271 {"log":"\u0009at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106)\n","stream":"stdout","time":"2019-01-18T07:49:27.37498773Z"}
272 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.375003113Z"}
273 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.375017063Z"}
274 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.37503086Z"}
275 {"log":"\u0009at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)\n","stream":"stdout","time":"2019-01-18T07:49:27.3750454Z"}
276 {"log":"\u0009at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)\n","stream":"stdout","time":"2019-01-18T07:49:27.37505928Z"}
277 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)\n","stream":"stdout","time":"2019-01-18T07:49:27.37507306Z"}
278 {"log":"\u0009at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)\n","stream":"stdout","time":"2019-01-18T07:49:27.375086726Z"}
279 {"log":"\u0009at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)\n","stream":"stdout","time":"2019-01-18T07:49:27.375100817Z"}
280 {"log":"\u0009at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)\n","stream":"stdout","time":"2019-01-18T07:49:27.375115354Z"}
281 {"log":"\u0009at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493)\n","stream":"stdout","time":"2019-01-18T07:49:27.375129454Z"}
282 {"log":"\u0009at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)\n","stream":"stdout","time":"2019-01-18T07:49:27.375144001Z"}
283 {"log":"\u0009at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)\n","stream":"stdout","time":"2019-01-18T07:49:27.375157464Z"}
284 {"log":"\u0009at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)\n","stream":"stdout","time":"2019-01-18T07:49:27.375170981Z"}
285 {"log":"\u0009at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)\n","stream":"stdout","time":"2019-01-18T07:49:27.375184417Z"}
286 {"log":"\u0009at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:800)\n","stream":"stdout","time":"2019-01-18T07:49:27.375198024Z"}
287 {"log":"\u0009at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)\n","stream":"stdout","time":"2019-01-18T07:49:27.375211594Z"}
288 {"log":"\u0009at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806)\n","stream":"stdout","time":"2019-01-18T07:49:27.375225237Z"}
289 {"log":"\u0009at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498)\n","stream":"stdout","time":"2019-01-18T07:49:27.375239487Z"}
290 {"log":"\u0009at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)\n","stream":"stdout","time":"2019-01-18T07:49:27.375253464Z"}
291 {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n","stream":"stdout","time":"2019-01-18T07:49:27.375323255Z"}
292 {"log":"\u0009at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n","stream":"stdout","time":"2019-01-18T07:49:27.375345642Z"}
293 {"log":"\u0009at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)\n","stream":"stdout","time":"2019-01-18T07:49:27.375363208Z"}
294 {"log":"\u0009at java.lang.Thread.run(Thread.java:748)\n","stream":"stdout","time":"2019-01-18T07:49:27.375377695Z"}
295 {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375391335Z"}
296 {"log":"\n","stream":"stdout","time":"2019-01-18T07:49:27.375416915Z"}
297 {"log":"2019-01-18 07:53:06.419 [ ] INFO 1 --- [vent-bus.prod-1] c.t.listener.CommonListener : warehousing Dailywarehousing.daily\n","stream":"stdout","time":"2019-01-18T07:53:06.420527437Z"}
298 "#};
299
300 let mut codec = CharacterDelimitedDecoder::new(b'\n');
301 let buf = &mut BytesMut::new();
302
303 buf.extend(events.to_string().as_bytes());
304
305 let mut i = 0;
306 while codec.decode(buf).unwrap().is_some() {
307 i += 1;
308 }
309
310 assert_eq!(i, 51);
311 }
312}