1#![deny(missing_docs)]
4
5use std::{
6 collections::{HashMap, hash_map::Entry},
7 hash::Hash,
8 pin::Pin,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use bytes::{Bytes, BytesMut};
14use futures::{Stream, StreamExt};
15use pin_project::pin_project;
16use regex::bytes::Regex;
17use tokio_util::time::delay_queue::{DelayQueue, Key};
18use vector_lib::configurable::configurable_component;
19
20#[configurable_component]
22#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum Mode {
25 ContinueThrough,
32
33 ContinuePast,
38
39 HaltBefore,
43
44 HaltWith,
48}
49
50#[derive(Clone, Debug)]
52pub struct Config {
53 pub start_pattern: Regex,
55
56 pub condition_pattern: Regex,
60
61 pub mode: Mode,
65
66 pub timeout: Duration,
70}
71
72impl Config {
73 pub fn for_legacy(marker: Regex, timeout_ms: u64) -> Self {
76 let start_pattern = marker;
77 let condition_pattern = start_pattern.clone();
78 let mode = Mode::HaltBefore;
79 let timeout = Duration::from_millis(timeout_ms);
80
81 Self {
82 start_pattern,
83 condition_pattern,
84 mode,
85 timeout,
86 }
87 }
88}
89
90#[pin_project(project = LineAggProj)]
95pub struct LineAgg<T, K, C> {
96 #[pin]
98 inner: T,
99
100 logic: Logic<K, C>,
102
103 stashed: Option<(K, Bytes, C)>,
107
108 draining: Option<Vec<(K, Bytes, C, Option<C>)>>,
112}
113
114pub struct Logic<K, C> {
119 config: Config,
121
122 buffers: HashMap<K, (Key, Aggregate<C>)>,
125
126 timeouts: DelayQueue<K>,
128}
129
130impl<K, C> Logic<K, C> {
131 pub fn new(config: Config) -> Self {
133 Self {
134 config,
135 buffers: HashMap::new(),
136 timeouts: DelayQueue::new(),
137 }
138 }
139}
140
141impl<T, K, C> LineAgg<T, K, C>
142where
143 T: Stream<Item = (K, Bytes, C)> + Unpin,
144 K: Hash + Eq + Clone,
145{
146 pub const fn new(inner: T, logic: Logic<K, C>) -> Self {
149 Self {
150 inner,
151 logic,
152 draining: None,
153 stashed: None,
154 }
155 }
156}
157
158impl<T, K, C> Stream for LineAgg<T, K, C>
159where
160 T: Stream<Item = (K, Bytes, C)> + Unpin,
161 K: Hash + Eq + Clone,
162{
163 type Item = (K, Bytes, C, Option<C>);
168
169 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170 let mut this = self.project();
171 loop {
172 if let Some((src, line, context)) = this.stashed.take() {
174 if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context) {
179 return Poll::Ready(Some(val));
180 }
181 continue;
182 }
183
184 if let Some(to_drain) = &mut this.draining {
186 match to_drain.pop() {
187 Some(val) => {
188 return Poll::Ready(Some(val));
189 }
190 _ => {
191 return Poll::Ready(None);
192 }
193 }
194 }
195
196 match this.inner.poll_next_unpin(cx) {
197 Poll::Ready(Some((src, line, context))) => {
198 if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context)
202 {
203 return Poll::Ready(Some(val));
204 }
205 }
206 Poll::Ready(None) => {
207 *this.draining = Some(
210 this.logic
211 .buffers
212 .drain()
213 .map(|(src, (_, aggregate))| {
214 let (line, initial_context, last_context) = aggregate.merge();
215 (src, line, initial_context, last_context)
216 })
217 .collect(),
218 );
219 }
220 Poll::Pending => {
221 while let Poll::Ready(Some(expired_key)) = this.logic.timeouts.poll_expired(cx)
224 {
225 let key = expired_key.into_inner();
226 if let Some((_, aggregate)) = this.logic.buffers.remove(&key) {
227 let (line, initial_context, last_context) = aggregate.merge();
228 return Poll::Ready(Some((key, line, initial_context, last_context)));
229 }
230 }
231
232 return Poll::Pending;
233 }
234 };
235 }
236 }
237}
238
239impl<T, K, C> LineAgg<T, K, C>
240where
241 T: Stream<Item = (K, Bytes, C)> + Unpin,
242 K: Hash + Eq + Clone,
243{
244 fn handle_line_and_stashing(
248 this: &mut LineAggProj<'_, T, K, C>,
249 src: K,
250 line: Bytes,
251 context: C,
252 ) -> Option<(K, Bytes, C, Option<C>)> {
253 debug_assert!(this.stashed.is_none());
257 let val = this.logic.handle_line(src, line, context)?;
258 let val = match val {
259 (src, Emit::One((line, initial_context, last_context))) => {
262 (src, line, initial_context, last_context)
263 }
264 (
269 src,
270 Emit::Two(
271 (line, initial_context, last_context),
272 (line_to_stash, context_to_stash, _),
273 ),
274 ) => {
275 *this.stashed = Some((src.clone(), line_to_stash, context_to_stash));
276 (src, line, initial_context, last_context)
277 }
278 };
279 Some(val)
280 }
281}
282
283pub enum Emit<T> {
286 One(T),
288 Two(T, T),
290}
291
292enum Decision {
294 Continue,
295 EndInclude,
296 EndExclude,
297}
298
299impl<K, C> Logic<K, C>
300where
301 K: Hash + Eq + Clone,
302{
303 pub fn handle_line(
305 &mut self,
306 src: K,
307 line: Bytes,
308 context: C,
309 ) -> Option<(K, Emit<(Bytes, C, Option<C>)>)> {
310 match self.buffers.entry(src) {
312 Entry::Occupied(mut entry) => {
313 let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
314 let decision = match (self.config.mode, condition_matched) {
315 (Mode::ContinueThrough, true) => Decision::Continue,
318 (Mode::ContinueThrough, false) => Decision::EndExclude,
319 (Mode::ContinuePast, true) => Decision::Continue,
322 (Mode::ContinuePast, false) => Decision::EndInclude,
323 (Mode::HaltBefore, true) => Decision::EndExclude,
326 (Mode::HaltBefore, false) => Decision::Continue,
327 (Mode::HaltWith, true) => Decision::EndInclude,
330 (Mode::HaltWith, false) => Decision::Continue,
331 };
332
333 match decision {
334 Decision::Continue => {
335 let buffered = entry.get_mut();
336 self.timeouts.reset(&buffered.0, self.config.timeout);
337 buffered.1.add_next_line(line, context);
338 None
339 }
340 Decision::EndInclude => {
341 let (src, (key, mut buffered)) = entry.remove_entry();
342 self.timeouts.remove(&key);
343 buffered.add_next_line(line, context);
344 Some((src, Emit::One(buffered.merge())))
345 }
346 Decision::EndExclude => {
347 let (src, (key, buffered)) = entry.remove_entry();
348 self.timeouts.remove(&key);
349 Some((src, Emit::Two(buffered.merge(), (line, context, None))))
350 }
351 }
352 }
353 Entry::Vacant(entry) => {
354 if self.config.start_pattern.is_match(line.as_ref()) {
356 let key = self
359 .timeouts
360 .insert(entry.key().clone(), self.config.timeout);
361 entry.insert((key, Aggregate::new(line, context)));
362 None
363 } else {
364 Some((entry.into_key(), Emit::One((line, context, None))))
366 }
367 }
368 }
369 }
370}
371
372struct Aggregate<C> {
373 lines: Vec<Bytes>,
374 initial_context: C,
375 last_context: Option<C>,
376}
377
378impl<C> Aggregate<C> {
379 fn new(first_line: Bytes, initial_context: C) -> Self {
380 Self {
381 lines: vec![first_line],
382 initial_context,
383 last_context: None,
384 }
385 }
386
387 fn add_next_line(&mut self, line: Bytes, context: C) {
388 self.last_context = Some(context);
389 self.lines.push(line);
390 }
391
392 fn merge(self) -> (Bytes, C, Option<C>) {
393 let capacity = self.lines.iter().map(|line| line.len() + 1).sum::<usize>() - 1;
394 let mut bytes_mut = BytesMut::with_capacity(capacity);
395 let mut first = true;
396 for line in self.lines {
397 if first {
398 first = false;
399 } else {
400 bytes_mut.extend_from_slice(b"\n");
401 }
402 bytes_mut.extend_from_slice(&line);
403 }
404 (bytes_mut.freeze(), self.initial_context, self.last_context)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use std::fmt::Write as _;
411
412 use bytes::Bytes;
413 use futures::SinkExt;
414 use similar_asserts::assert_eq;
415
416 use super::*;
417
418 #[tokio::test]
419 async fn mode_continue_through_1() {
420 let lines = vec![
421 "some usual line",
422 "some other usual line",
423 "first part",
424 " second part",
425 " last part",
426 "another normal message",
427 "finishing message",
428 " last part of the incomplete finishing message",
429 ];
430 let config = Config {
431 start_pattern: Regex::new("^[^\\s]").unwrap(),
432 condition_pattern: Regex::new("^[\\s]+").unwrap(),
433 mode: Mode::ContinueThrough,
434 timeout: Duration::from_millis(10),
435 };
436 let expected = vec![
437 ("some usual line", 0, None),
438 ("some other usual line", 1, None),
439 (
440 concat!("first part\n", " second part\n", " last part"),
441 2,
442 Some(4),
443 ),
444 ("another normal message", 5, None),
445 (
446 concat!(
447 "finishing message\n",
448 " last part of the incomplete finishing message"
449 ),
450 6,
451 Some(7),
452 ),
453 ];
454 run_and_assert(&lines, config, &expected).await;
455 }
456
457 #[tokio::test]
458 async fn mode_continue_past_1() {
459 let lines = vec![
460 "some usual line",
461 "some other usual line",
462 "first part \\",
463 "second part \\",
464 "last part",
465 "another normal message",
466 "finishing message \\",
467 "last part of the incomplete finishing message \\",
468 ];
469 let config = Config {
470 start_pattern: Regex::new("\\\\$").unwrap(),
471 condition_pattern: Regex::new("\\\\$").unwrap(),
472 mode: Mode::ContinuePast,
473 timeout: Duration::from_millis(10),
474 };
475 let expected = vec![
476 ("some usual line", 0, None),
477 ("some other usual line", 1, None),
478 (
479 concat!("first part \\\n", "second part \\\n", "last part"),
480 2,
481 Some(4),
482 ),
483 ("another normal message", 5, None),
484 (
485 concat!(
486 "finishing message \\\n",
487 "last part of the incomplete finishing message \\"
488 ),
489 6,
490 Some(7),
491 ),
492 ];
493 run_and_assert(&lines, config, &expected).await;
494 }
495
496 #[tokio::test]
497 async fn mode_halt_before_1() {
498 let lines = vec![
499 "INFO some usual line",
500 "INFO some other usual line",
501 "INFO first part",
502 "second part",
503 "last part",
504 "ERROR another normal message",
505 "ERROR finishing message",
506 "last part of the incomplete finishing message",
507 ];
508 let config = Config {
509 start_pattern: Regex::new("").unwrap(),
510 condition_pattern: Regex::new("^(INFO|ERROR) ").unwrap(),
511 mode: Mode::HaltBefore,
512 timeout: Duration::from_millis(10),
513 };
514 let expected = vec![
515 ("INFO some usual line", 0, None),
516 ("INFO some other usual line", 1, None),
517 (
518 concat!("INFO first part\n", "second part\n", "last part"),
519 2,
520 Some(4),
521 ),
522 ("ERROR another normal message", 5, None),
523 (
524 concat!(
525 "ERROR finishing message\n",
526 "last part of the incomplete finishing message"
527 ),
528 6,
529 Some(7),
530 ),
531 ];
532 run_and_assert(&lines, config, &expected).await;
533 }
534
535 #[tokio::test]
536 async fn mode_halt_with_1() {
537 let lines = vec![
538 "some usual line;",
539 "some other usual line;",
540 "first part",
541 "second part",
542 "last part;",
543 "another normal message;",
544 "finishing message",
545 "last part of the incomplete finishing message",
546 ];
547 let config = Config {
548 start_pattern: Regex::new("[^;]$").unwrap(),
549 condition_pattern: Regex::new(";$").unwrap(),
550 mode: Mode::HaltWith,
551 timeout: Duration::from_millis(10),
552 };
553 let expected = vec![
554 ("some usual line;", 0, None),
555 ("some other usual line;", 1, None),
556 (
557 concat!("first part\n", "second part\n", "last part;"),
558 2,
559 Some(4),
560 ),
561 ("another normal message;", 5, None),
562 (
563 concat!(
564 "finishing message\n",
565 "last part of the incomplete finishing message"
566 ),
567 6,
568 Some(7),
569 ),
570 ];
571 run_and_assert(&lines, config, &expected).await;
572 }
573
574 #[tokio::test]
575 async fn use_case_java_exception() {
576 let lines = vec![
577 "java.lang.Exception",
578 " at com.foo.bar(bar.java:123)",
579 " at com.foo.baz(baz.java:456)",
580 ];
581 let config = Config {
582 start_pattern: Regex::new("^[^\\s]").unwrap(),
583 condition_pattern: Regex::new("^[\\s]+at").unwrap(),
584 mode: Mode::ContinueThrough,
585 timeout: Duration::from_millis(10),
586 };
587 let expected = vec![(
588 concat!(
589 "java.lang.Exception\n",
590 " at com.foo.bar(bar.java:123)\n",
591 " at com.foo.baz(baz.java:456)",
592 ),
593 0,
594 Some(2),
595 )];
596 run_and_assert(&lines, config, &expected).await;
597 }
598
599 #[tokio::test]
600 async fn use_case_ruby_exception() {
601 let lines = vec![
602 "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
603 "\tfrom foobar.rb:6:in `bar'",
604 "\tfrom foobar.rb:2:in `foo'",
605 "\tfrom foobar.rb:9:in `<main>'",
606 ];
607 let config = Config {
608 start_pattern: Regex::new("^[^\\s]").unwrap(),
609 condition_pattern: Regex::new("^[\\s]+from").unwrap(),
610 mode: Mode::ContinueThrough,
611 timeout: Duration::from_millis(10),
612 };
613 let expected = vec![(
614 concat!(
615 "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n",
616 "\tfrom foobar.rb:6:in `bar'\n",
617 "\tfrom foobar.rb:2:in `foo'\n",
618 "\tfrom foobar.rb:9:in `<main>'",
619 ),
620 0,
621 Some(3),
622 )];
623 run_and_assert(&lines, config, &expected).await;
624 }
625
626 #[tokio::test]
628 async fn two_lines_emit_with_continue_through() {
629 let lines = vec![
630 "not merged 1", " merged 1",
632 " merged 2",
633 "not merged 2", " merged 3",
635 " merged 4",
636 "not merged 3", "not merged 4", " merged 5",
639 "not merged 5", " merged 6",
641 " merged 7",
642 " merged 8",
643 "not merged 6", ];
645 let config = Config {
646 start_pattern: Regex::new("^\\s").unwrap(),
647 condition_pattern: Regex::new("^\\s").unwrap(),
648 mode: Mode::ContinueThrough,
649 timeout: Duration::from_millis(10),
650 };
651 let expected = vec![
652 ("not merged 1", 0, None),
653 (" merged 1\n merged 2", 1, Some(2)),
654 ("not merged 2", 3, None),
655 (" merged 3\n merged 4", 4, Some(5)),
656 ("not merged 3", 6, None),
657 ("not merged 4", 7, None),
658 (" merged 5", 8, None),
659 ("not merged 5", 9, None),
660 (" merged 6\n merged 7\n merged 8", 10, Some(12)),
661 ("not merged 6", 13, None),
662 ];
663 run_and_assert(&lines, config, &expected).await;
664 }
665
666 #[tokio::test]
667 async fn two_lines_emit_with_halt_before() {
668 let lines = vec![
669 "part 0.1",
670 "part 0.2",
671 "START msg 1", "part 1.1",
673 "part 1.2",
674 "START msg 2", "START msg 3", "part 3.1",
677 "START msg 4", "part 4.1",
679 "part 4.2",
680 "part 4.3",
681 "START msg 5", ];
683 let config = Config {
684 start_pattern: Regex::new("").unwrap(),
685 condition_pattern: Regex::new("^START ").unwrap(),
686 mode: Mode::HaltBefore,
687 timeout: Duration::from_millis(10),
688 };
689 let expected = vec![
690 ("part 0.1\npart 0.2", 0, Some(1)),
691 ("START msg 1\npart 1.1\npart 1.2", 2, Some(4)),
692 ("START msg 2", 5, None),
693 ("START msg 3\npart 3.1", 6, Some(7)),
694 ("START msg 4\npart 4.1\npart 4.2\npart 4.3", 8, Some(11)),
695 ("START msg 5", 12, None),
696 ];
697 run_and_assert(&lines, config, &expected).await;
698 }
699
700 #[tokio::test]
701 async fn legacy() {
702 let lines = vec![
703 "INFO some usual line",
704 "INFO some other usual line",
705 "INFO first part",
706 "second part",
707 "last part",
708 "ERROR another normal message",
709 "ERROR finishing message",
710 "last part of the incomplete finishing message",
711 ];
712 let expected = vec![
713 ("INFO some usual line", 0, None),
714 ("INFO some other usual line", 1, None),
715 (
716 concat!("INFO first part\n", "second part\n", "last part"),
717 2,
718 Some(4),
719 ),
720 ("ERROR another normal message", 5, None),
721 (
722 concat!(
723 "ERROR finishing message\n",
724 "last part of the incomplete finishing message"
725 ),
726 6,
727 Some(7),
728 ),
729 ];
730
731 let stream = stream_from_lines(&lines);
732 let line_agg = LineAgg::new(
733 stream,
734 Logic::new(Config::for_legacy(
735 Regex::new("^(INFO|ERROR)").unwrap(), 10,
737 )),
738 );
739 let results = line_agg.collect().await;
740 assert_results(results, &expected);
741 }
742
743 #[tokio::test]
744 async fn timeout_resets_on_new_line() {
745 let n: usize = 1000;
752 let mut lines = vec![
753 "START msg 1".to_string(), ];
755 for i in 0..n {
756 lines.push(format!("line {i}"));
757 }
758 let config = Config {
759 start_pattern: Regex::new("").unwrap(),
760 condition_pattern: Regex::new("^START ").unwrap(),
761 mode: Mode::HaltBefore,
762 timeout: Duration::from_millis(10),
763 };
764
765 let mut expected = "START msg 1".to_string();
766 for i in 0..n {
767 write!(expected, "\nline {i}").expect("write to String never fails");
768 }
769
770 let (mut send, recv) = futures::channel::mpsc::unbounded();
771
772 let logic = Logic::new(config);
773 let line_agg = LineAgg::new(recv, logic);
774 let results = tokio::spawn(line_agg.collect());
775
776 for (index, line) in lines.iter().enumerate() {
777 let data = (
778 "test.log".to_owned(),
779 Bytes::copy_from_slice(line.as_bytes()),
780 index,
781 );
782 send.send(data).await.unwrap();
783 tokio::time::sleep(Duration::from_millis(1)).await;
784 }
785 drop(send);
786
787 assert_results(
788 results.await.unwrap(),
789 &[(expected.as_str(), 0, Some(lines.len() - 1))],
790 );
791 }
792
793 type Filename = String;
797
798 fn stream_from_lines<'a>(
799 lines: &'a [&'static str],
800 ) -> impl Stream<Item = (Filename, Bytes, usize)> + 'a {
801 futures::stream::iter(lines.iter().enumerate().map(|(index, line)| {
802 (
803 "test.log".to_owned(),
804 Bytes::from_static(line.as_bytes()),
805 index,
806 )
807 }))
808 }
809
810 fn assert_results(
812 actual: Vec<(Filename, Bytes, usize, Option<usize>)>,
813 expected: &[(&str, usize, Option<usize>)],
814 ) {
815 let expected_mapped: Vec<(Filename, Bytes, usize, Option<usize>)> = expected
816 .iter()
817 .map(|(line, context, last_context)| {
818 (
819 "test.log".to_owned(),
820 Bytes::copy_from_slice(line.as_bytes()),
821 *context,
822 *last_context,
823 )
824 })
825 .collect();
826
827 assert_eq!(
828 actual, expected_mapped,
829 "actual on the left, expected on the right",
830 );
831 }
832
833 async fn run_and_assert(
834 lines: &[&'static str],
835 config: Config,
836 expected: &[(&'static str, usize, Option<usize>)],
837 ) {
838 let stream = stream_from_lines(lines);
839 let logic = Logic::new(config);
840 let line_agg = LineAgg::new(stream, logic);
841 let results = line_agg.collect().await;
842 assert_results(results, expected);
843 }
844}