1#![deny(missing_docs)]
4
5use std::{
6 collections::{hash_map::Entry, HashMap},
7 hash::Hash,
8 pin::Pin,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use bytes::{Bytes, BytesMut};
14use futures::{Stream, StreamExt};
15use pin_project::pin_project;
16use regex::bytes::Regex;
17use tokio_util::time::delay_queue::{DelayQueue, Key};
18use vector_lib::configurable::configurable_component;
19
20#[configurable_component]
22#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum Mode {
25 ContinueThrough,
32
33 ContinuePast,
38
39 HaltBefore,
43
44 HaltWith,
48}
49
50#[derive(Clone, Debug)]
52pub struct Config {
53 pub start_pattern: Regex,
55
56 pub condition_pattern: Regex,
60
61 pub mode: Mode,
65
66 pub timeout: Duration,
70}
71
72impl Config {
73 pub fn for_legacy(marker: Regex, timeout_ms: u64) -> Self {
76 let start_pattern = marker;
77 let condition_pattern = start_pattern.clone();
78 let mode = Mode::HaltBefore;
79 let timeout = Duration::from_millis(timeout_ms);
80
81 Self {
82 start_pattern,
83 condition_pattern,
84 mode,
85 timeout,
86 }
87 }
88}
89
90#[pin_project(project = LineAggProj)]
95pub struct LineAgg<T, K, C> {
96 #[pin]
98 inner: T,
99
100 logic: Logic<K, C>,
102
103 stashed: Option<(K, Bytes, C)>,
107
108 draining: Option<Vec<(K, Bytes, C, Option<C>)>>,
112}
113
114pub struct Logic<K, C> {
119 config: Config,
121
122 buffers: HashMap<K, (Key, Aggregate<C>)>,
125
126 timeouts: DelayQueue<K>,
128}
129
130impl<K, C> Logic<K, C> {
131 pub fn new(config: Config) -> Self {
133 Self {
134 config,
135 buffers: HashMap::new(),
136 timeouts: DelayQueue::new(),
137 }
138 }
139}
140
141impl<T, K, C> LineAgg<T, K, C>
142where
143 T: Stream<Item = (K, Bytes, C)> + Unpin,
144 K: Hash + Eq + Clone,
145{
146 pub const fn new(inner: T, logic: Logic<K, C>) -> Self {
149 Self {
150 inner,
151 logic,
152 draining: None,
153 stashed: None,
154 }
155 }
156}
157
158impl<T, K, C> Stream for LineAgg<T, K, C>
159where
160 T: Stream<Item = (K, Bytes, C)> + Unpin,
161 K: Hash + Eq + Clone,
162{
163 type Item = (K, Bytes, C, Option<C>);
168
169 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170 let mut this = self.project();
171 loop {
172 if let Some((src, line, context)) = this.stashed.take() {
174 if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context) {
179 return Poll::Ready(Some(val));
180 }
181 continue;
182 }
183
184 if let Some(to_drain) = &mut this.draining {
186 match to_drain.pop() {
187 Some(val) => {
188 return Poll::Ready(Some(val));
189 }
190 _ => {
191 return Poll::Ready(None);
192 }
193 }
194 }
195
196 match this.inner.poll_next_unpin(cx) {
197 Poll::Ready(Some((src, line, context))) => {
198 if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context)
202 {
203 return Poll::Ready(Some(val));
204 }
205 }
206 Poll::Ready(None) => {
207 *this.draining = Some(
210 this.logic
211 .buffers
212 .drain()
213 .map(|(src, (_, aggregate))| {
214 let (line, initial_context, last_context) = aggregate.merge();
215 (src, line, initial_context, last_context)
216 })
217 .collect(),
218 );
219 }
220 Poll::Pending => {
221 while let Poll::Ready(Some(expired_key)) = this.logic.timeouts.poll_expired(cx)
224 {
225 let key = expired_key.into_inner();
226 if let Some((_, aggregate)) = this.logic.buffers.remove(&key) {
227 let (line, initial_context, last_context) = aggregate.merge();
228 return Poll::Ready(Some((key, line, initial_context, last_context)));
229 }
230 }
231
232 return Poll::Pending;
233 }
234 };
235 }
236 }
237}
238
239impl<T, K, C> LineAgg<T, K, C>
240where
241 T: Stream<Item = (K, Bytes, C)> + Unpin,
242 K: Hash + Eq + Clone,
243{
244 fn handle_line_and_stashing(
248 this: &mut LineAggProj<'_, T, K, C>,
249 src: K,
250 line: Bytes,
251 context: C,
252 ) -> Option<(K, Bytes, C, Option<C>)> {
253 debug_assert!(this.stashed.is_none());
257 let val = this.logic.handle_line(src, line, context)?;
258 let val = match val {
259 (src, Emit::One((line, initial_context, last_context))) => {
262 (src, line, initial_context, last_context)
263 }
264 (
269 src,
270 Emit::Two(
271 (line, initial_context, last_context),
272 (line_to_stash, context_to_stash, _),
273 ),
274 ) => {
275 *this.stashed = Some((src.clone(), line_to_stash, context_to_stash));
276 (src, line, initial_context, last_context)
277 }
278 };
279 Some(val)
280 }
281}
282
283pub enum Emit<T> {
286 One(T),
288 Two(T, T),
290}
291
292enum Decision {
294 Continue,
295 EndInclude,
296 EndExclude,
297}
298
299impl<K, C> Logic<K, C>
300where
301 K: Hash + Eq + Clone,
302{
303 pub fn handle_line(
305 &mut self,
306 src: K,
307 line: Bytes,
308 context: C,
309 ) -> Option<(K, Emit<(Bytes, C, Option<C>)>)> {
310 match self.buffers.entry(src) {
312 Entry::Occupied(mut entry) => {
313 let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
314 let decision = match (self.config.mode, condition_matched) {
315 (Mode::ContinueThrough, true) => Decision::Continue,
318 (Mode::ContinueThrough, false) => Decision::EndExclude,
319 (Mode::ContinuePast, true) => Decision::Continue,
322 (Mode::ContinuePast, false) => Decision::EndInclude,
323 (Mode::HaltBefore, true) => Decision::EndExclude,
326 (Mode::HaltBefore, false) => Decision::Continue,
327 (Mode::HaltWith, true) => Decision::EndInclude,
330 (Mode::HaltWith, false) => Decision::Continue,
331 };
332
333 match decision {
334 Decision::Continue => {
335 let buffered = entry.get_mut();
336 self.timeouts.reset(&buffered.0, self.config.timeout);
337 buffered.1.add_next_line(line, context);
338 None
339 }
340 Decision::EndInclude => {
341 let (src, (key, mut buffered)) = entry.remove_entry();
342 self.timeouts.remove(&key);
343 buffered.add_next_line(line, context);
344 Some((src, Emit::One(buffered.merge())))
345 }
346 Decision::EndExclude => {
347 let (src, (key, buffered)) = entry.remove_entry();
348 self.timeouts.remove(&key);
349 Some((src, Emit::Two(buffered.merge(), (line, context, None))))
350 }
351 }
352 }
353 Entry::Vacant(entry) => {
354 if self.config.start_pattern.is_match(line.as_ref()) {
356 let key = self
359 .timeouts
360 .insert(entry.key().clone(), self.config.timeout);
361 entry.insert((key, Aggregate::new(line, context)));
362 None
363 } else {
364 Some((entry.into_key(), Emit::One((line, context, None))))
366 }
367 }
368 }
369 }
370}
371
372struct Aggregate<C> {
373 lines: Vec<Bytes>,
374 initial_context: C,
375 last_context: Option<C>,
376}
377
378impl<C> Aggregate<C> {
379 fn new(first_line: Bytes, initial_context: C) -> Self {
380 Self {
381 lines: vec![first_line],
382 initial_context,
383 last_context: None,
384 }
385 }
386
387 fn add_next_line(&mut self, line: Bytes, context: C) {
388 self.last_context = Some(context);
389 self.lines.push(line);
390 }
391
392 fn merge(self) -> (Bytes, C, Option<C>) {
393 let capacity = self.lines.iter().map(|line| line.len() + 1).sum::<usize>() - 1;
394 let mut bytes_mut = BytesMut::with_capacity(capacity);
395 let mut first = true;
396 for line in self.lines {
397 if first {
398 first = false;
399 } else {
400 bytes_mut.extend_from_slice(b"\n");
401 }
402 bytes_mut.extend_from_slice(&line);
403 }
404 (bytes_mut.freeze(), self.initial_context, self.last_context)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use bytes::Bytes;
411 use futures::SinkExt;
412 use similar_asserts::assert_eq;
413 use std::fmt::Write as _;
414
415 use super::*;
416
417 #[tokio::test]
418 async fn mode_continue_through_1() {
419 let lines = vec![
420 "some usual line",
421 "some other usual line",
422 "first part",
423 " second part",
424 " last part",
425 "another normal message",
426 "finishing message",
427 " last part of the incomplete finishing message",
428 ];
429 let config = Config {
430 start_pattern: Regex::new("^[^\\s]").unwrap(),
431 condition_pattern: Regex::new("^[\\s]+").unwrap(),
432 mode: Mode::ContinueThrough,
433 timeout: Duration::from_millis(10),
434 };
435 let expected = vec![
436 ("some usual line", 0, None),
437 ("some other usual line", 1, None),
438 (
439 concat!("first part\n", " second part\n", " last part"),
440 2,
441 Some(4),
442 ),
443 ("another normal message", 5, None),
444 (
445 concat!(
446 "finishing message\n",
447 " last part of the incomplete finishing message"
448 ),
449 6,
450 Some(7),
451 ),
452 ];
453 run_and_assert(&lines, config, &expected).await;
454 }
455
456 #[tokio::test]
457 async fn mode_continue_past_1() {
458 let lines = vec![
459 "some usual line",
460 "some other usual line",
461 "first part \\",
462 "second part \\",
463 "last part",
464 "another normal message",
465 "finishing message \\",
466 "last part of the incomplete finishing message \\",
467 ];
468 let config = Config {
469 start_pattern: Regex::new("\\\\$").unwrap(),
470 condition_pattern: Regex::new("\\\\$").unwrap(),
471 mode: Mode::ContinuePast,
472 timeout: Duration::from_millis(10),
473 };
474 let expected = vec![
475 ("some usual line", 0, None),
476 ("some other usual line", 1, None),
477 (
478 concat!("first part \\\n", "second part \\\n", "last part"),
479 2,
480 Some(4),
481 ),
482 ("another normal message", 5, None),
483 (
484 concat!(
485 "finishing message \\\n",
486 "last part of the incomplete finishing message \\"
487 ),
488 6,
489 Some(7),
490 ),
491 ];
492 run_and_assert(&lines, config, &expected).await;
493 }
494
495 #[tokio::test]
496 async fn mode_halt_before_1() {
497 let lines = vec![
498 "INFO some usual line",
499 "INFO some other usual line",
500 "INFO first part",
501 "second part",
502 "last part",
503 "ERROR another normal message",
504 "ERROR finishing message",
505 "last part of the incomplete finishing message",
506 ];
507 let config = Config {
508 start_pattern: Regex::new("").unwrap(),
509 condition_pattern: Regex::new("^(INFO|ERROR) ").unwrap(),
510 mode: Mode::HaltBefore,
511 timeout: Duration::from_millis(10),
512 };
513 let expected = vec![
514 ("INFO some usual line", 0, None),
515 ("INFO some other usual line", 1, None),
516 (
517 concat!("INFO first part\n", "second part\n", "last part"),
518 2,
519 Some(4),
520 ),
521 ("ERROR another normal message", 5, None),
522 (
523 concat!(
524 "ERROR finishing message\n",
525 "last part of the incomplete finishing message"
526 ),
527 6,
528 Some(7),
529 ),
530 ];
531 run_and_assert(&lines, config, &expected).await;
532 }
533
534 #[tokio::test]
535 async fn mode_halt_with_1() {
536 let lines = vec![
537 "some usual line;",
538 "some other usual line;",
539 "first part",
540 "second part",
541 "last part;",
542 "another normal message;",
543 "finishing message",
544 "last part of the incomplete finishing message",
545 ];
546 let config = Config {
547 start_pattern: Regex::new("[^;]$").unwrap(),
548 condition_pattern: Regex::new(";$").unwrap(),
549 mode: Mode::HaltWith,
550 timeout: Duration::from_millis(10),
551 };
552 let expected = vec![
553 ("some usual line;", 0, None),
554 ("some other usual line;", 1, None),
555 (
556 concat!("first part\n", "second part\n", "last part;"),
557 2,
558 Some(4),
559 ),
560 ("another normal message;", 5, None),
561 (
562 concat!(
563 "finishing message\n",
564 "last part of the incomplete finishing message"
565 ),
566 6,
567 Some(7),
568 ),
569 ];
570 run_and_assert(&lines, config, &expected).await;
571 }
572
573 #[tokio::test]
574 async fn use_case_java_exception() {
575 let lines = vec![
576 "java.lang.Exception",
577 " at com.foo.bar(bar.java:123)",
578 " at com.foo.baz(baz.java:456)",
579 ];
580 let config = Config {
581 start_pattern: Regex::new("^[^\\s]").unwrap(),
582 condition_pattern: Regex::new("^[\\s]+at").unwrap(),
583 mode: Mode::ContinueThrough,
584 timeout: Duration::from_millis(10),
585 };
586 let expected = vec![(
587 concat!(
588 "java.lang.Exception\n",
589 " at com.foo.bar(bar.java:123)\n",
590 " at com.foo.baz(baz.java:456)",
591 ),
592 0,
593 Some(2),
594 )];
595 run_and_assert(&lines, config, &expected).await;
596 }
597
598 #[tokio::test]
599 async fn use_case_ruby_exception() {
600 let lines = vec![
601 "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
602 "\tfrom foobar.rb:6:in `bar'",
603 "\tfrom foobar.rb:2:in `foo'",
604 "\tfrom foobar.rb:9:in `<main>'",
605 ];
606 let config = Config {
607 start_pattern: Regex::new("^[^\\s]").unwrap(),
608 condition_pattern: Regex::new("^[\\s]+from").unwrap(),
609 mode: Mode::ContinueThrough,
610 timeout: Duration::from_millis(10),
611 };
612 let expected = vec![(
613 concat!(
614 "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n",
615 "\tfrom foobar.rb:6:in `bar'\n",
616 "\tfrom foobar.rb:2:in `foo'\n",
617 "\tfrom foobar.rb:9:in `<main>'",
618 ),
619 0,
620 Some(3),
621 )];
622 run_and_assert(&lines, config, &expected).await;
623 }
624
625 #[tokio::test]
627 async fn two_lines_emit_with_continue_through() {
628 let lines = vec![
629 "not merged 1", " merged 1",
631 " merged 2",
632 "not merged 2", " merged 3",
634 " merged 4",
635 "not merged 3", "not merged 4", " merged 5",
638 "not merged 5", " merged 6",
640 " merged 7",
641 " merged 8",
642 "not merged 6", ];
644 let config = Config {
645 start_pattern: Regex::new("^\\s").unwrap(),
646 condition_pattern: Regex::new("^\\s").unwrap(),
647 mode: Mode::ContinueThrough,
648 timeout: Duration::from_millis(10),
649 };
650 let expected = vec![
651 ("not merged 1", 0, None),
652 (" merged 1\n merged 2", 1, Some(2)),
653 ("not merged 2", 3, None),
654 (" merged 3\n merged 4", 4, Some(5)),
655 ("not merged 3", 6, None),
656 ("not merged 4", 7, None),
657 (" merged 5", 8, None),
658 ("not merged 5", 9, None),
659 (" merged 6\n merged 7\n merged 8", 10, Some(12)),
660 ("not merged 6", 13, None),
661 ];
662 run_and_assert(&lines, config, &expected).await;
663 }
664
665 #[tokio::test]
666 async fn two_lines_emit_with_halt_before() {
667 let lines = vec![
668 "part 0.1",
669 "part 0.2",
670 "START msg 1", "part 1.1",
672 "part 1.2",
673 "START msg 2", "START msg 3", "part 3.1",
676 "START msg 4", "part 4.1",
678 "part 4.2",
679 "part 4.3",
680 "START msg 5", ];
682 let config = Config {
683 start_pattern: Regex::new("").unwrap(),
684 condition_pattern: Regex::new("^START ").unwrap(),
685 mode: Mode::HaltBefore,
686 timeout: Duration::from_millis(10),
687 };
688 let expected = vec![
689 ("part 0.1\npart 0.2", 0, Some(1)),
690 ("START msg 1\npart 1.1\npart 1.2", 2, Some(4)),
691 ("START msg 2", 5, None),
692 ("START msg 3\npart 3.1", 6, Some(7)),
693 ("START msg 4\npart 4.1\npart 4.2\npart 4.3", 8, Some(11)),
694 ("START msg 5", 12, None),
695 ];
696 run_and_assert(&lines, config, &expected).await;
697 }
698
699 #[tokio::test]
700 async fn legacy() {
701 let lines = vec![
702 "INFO some usual line",
703 "INFO some other usual line",
704 "INFO first part",
705 "second part",
706 "last part",
707 "ERROR another normal message",
708 "ERROR finishing message",
709 "last part of the incomplete finishing message",
710 ];
711 let expected = vec![
712 ("INFO some usual line", 0, None),
713 ("INFO some other usual line", 1, None),
714 (
715 concat!("INFO first part\n", "second part\n", "last part"),
716 2,
717 Some(4),
718 ),
719 ("ERROR another normal message", 5, None),
720 (
721 concat!(
722 "ERROR finishing message\n",
723 "last part of the incomplete finishing message"
724 ),
725 6,
726 Some(7),
727 ),
728 ];
729
730 let stream = stream_from_lines(&lines);
731 let line_agg = LineAgg::new(
732 stream,
733 Logic::new(Config::for_legacy(
734 Regex::new("^(INFO|ERROR)").unwrap(), 10,
736 )),
737 );
738 let results = line_agg.collect().await;
739 assert_results(results, &expected);
740 }
741
742 #[tokio::test]
743 async fn timeout_resets_on_new_line() {
744 let n: usize = 1000;
751 let mut lines = vec![
752 "START msg 1".to_string(), ];
754 for i in 0..n {
755 lines.push(format!("line {i}"));
756 }
757 let config = Config {
758 start_pattern: Regex::new("").unwrap(),
759 condition_pattern: Regex::new("^START ").unwrap(),
760 mode: Mode::HaltBefore,
761 timeout: Duration::from_millis(10),
762 };
763
764 let mut expected = "START msg 1".to_string();
765 for i in 0..n {
766 write!(expected, "\nline {i}").expect("write to String never fails");
767 }
768
769 let (mut send, recv) = futures::channel::mpsc::unbounded();
770
771 let logic = Logic::new(config);
772 let line_agg = LineAgg::new(recv, logic);
773 let results = tokio::spawn(line_agg.collect());
774
775 for (index, line) in lines.iter().enumerate() {
776 let data = (
777 "test.log".to_owned(),
778 Bytes::copy_from_slice(line.as_bytes()),
779 index,
780 );
781 send.send(data).await.unwrap();
782 tokio::time::sleep(Duration::from_millis(1)).await;
783 }
784 drop(send);
785
786 assert_results(
787 results.await.unwrap(),
788 &[(expected.as_str(), 0, Some(lines.len() - 1))],
789 );
790 }
791
792 type Filename = String;
796
797 fn stream_from_lines<'a>(
798 lines: &'a [&'static str],
799 ) -> impl Stream<Item = (Filename, Bytes, usize)> + 'a {
800 futures::stream::iter(lines.iter().enumerate().map(|(index, line)| {
801 (
802 "test.log".to_owned(),
803 Bytes::from_static(line.as_bytes()),
804 index,
805 )
806 }))
807 }
808
809 fn assert_results(
811 actual: Vec<(Filename, Bytes, usize, Option<usize>)>,
812 expected: &[(&str, usize, Option<usize>)],
813 ) {
814 let expected_mapped: Vec<(Filename, Bytes, usize, Option<usize>)> = expected
815 .iter()
816 .map(|(line, context, last_context)| {
817 (
818 "test.log".to_owned(),
819 Bytes::copy_from_slice(line.as_bytes()),
820 *context,
821 *last_context,
822 )
823 })
824 .collect();
825
826 assert_eq!(
827 actual, expected_mapped,
828 "actual on the left, expected on the right",
829 );
830 }
831
832 async fn run_and_assert(
833 lines: &[&'static str],
834 config: Config,
835 expected: &[(&'static str, usize, Option<usize>)],
836 ) {
837 let stream = stream_from_lines(lines);
838 let logic = Logic::new(config);
839 let line_agg = LineAgg::new(stream, logic);
840 let results = line_agg.collect().await;
841 assert_results(results, expected);
842 }
843}