vector/
line_agg.rs

1//! A reusable line aggregation implementation.
2
3#![deny(missing_docs)]
4
5use std::{
6    collections::{hash_map::Entry, HashMap},
7    hash::Hash,
8    pin::Pin,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use bytes::{Bytes, BytesMut};
14use futures::{Stream, StreamExt};
15use pin_project::pin_project;
16use regex::bytes::Regex;
17use tokio_util::time::delay_queue::{DelayQueue, Key};
18use vector_lib::configurable::configurable_component;
19
20/// Mode of operation of the line aggregator.
21#[configurable_component]
22#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum Mode {
25    /// All consecutive lines matching this pattern are included in the group.
26    ///
27    /// The first line (the line that matched the start pattern) does not need to match the `ContinueThrough` pattern.
28    ///
29    /// This is useful in cases such as a Java stack trace, where some indicator in the line (such as a leading
30    /// whitespace) indicates that it is an extension of the proceeding line.
31    ContinueThrough,
32
33    /// All consecutive lines matching this pattern, plus one additional line, are included in the group.
34    ///
35    /// This is useful in cases where a log message ends with a continuation marker, such as a backslash, indicating
36    /// that the following line is part of the same message.
37    ContinuePast,
38
39    /// All consecutive lines not matching this pattern are included in the group.
40    ///
41    /// This is useful where a log line contains a marker indicating that it begins a new message.
42    HaltBefore,
43
44    /// All consecutive lines, up to and including the first line matching this pattern, are included in the group.
45    ///
46    /// This is useful where a log line ends with a termination marker, such as a semicolon.
47    HaltWith,
48}
49
50/// Configuration of multi-line aggregation.
51#[derive(Clone, Debug)]
52pub struct Config {
53    /// Regular expression pattern that is used to match the start of a new message.
54    pub start_pattern: Regex,
55
56    /// Regular expression pattern that is used to determine whether or not more lines should be read.
57    ///
58    /// This setting must be configured in conjunction with `mode`.
59    pub condition_pattern: Regex,
60
61    /// Aggregation mode.
62    ///
63    /// This setting must be configured in conjunction with `condition_pattern`.
64    pub mode: Mode,
65
66    /// The maximum amount of time to wait for the next additional line, in milliseconds.
67    ///
68    /// Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.
69    pub timeout: Duration,
70}
71
72impl Config {
73    /// Build `Config` from legacy `file` source line aggregator configuration
74    /// params.
75    pub fn for_legacy(marker: Regex, timeout_ms: u64) -> Self {
76        let start_pattern = marker;
77        let condition_pattern = start_pattern.clone();
78        let mode = Mode::HaltBefore;
79        let timeout = Duration::from_millis(timeout_ms);
80
81        Self {
82            start_pattern,
83            condition_pattern,
84            mode,
85            timeout,
86        }
87    }
88}
89
90/// Line aggregator.
91///
92/// Provides a `Stream` implementation that reads lines from the `inner` stream
93/// and yields aggregated lines.
94#[pin_project(project = LineAggProj)]
95pub struct LineAgg<T, K, C> {
96    /// The stream from which we read the lines.
97    #[pin]
98    inner: T,
99
100    /// The core line aggregation logic.
101    logic: Logic<K, C>,
102
103    /// Stashed lines. When line aggregation results in more than one line being
104    /// emitted, we have to stash lines and return them into the stream after
105    /// that before doing any other work.
106    stashed: Option<(K, Bytes, C)>,
107
108    /// Draining queue. We switch to draining mode when we get `None` from
109    /// the inner stream. In this mode we stop polling `inner` for new lines
110    /// and just flush all the buffered data.
111    draining: Option<Vec<(K, Bytes, C, Option<C>)>>,
112}
113
114/// Core line aggregation logic.
115///
116/// Encapsulates the essential state and the core logic for the line
117/// aggregation algorithm.
118pub struct Logic<K, C> {
119    /// Configuration parameters to use.
120    config: Config,
121
122    /// Line per key.
123    /// Key is usually a filename or other line source identifier.
124    buffers: HashMap<K, (Key, Aggregate<C>)>,
125
126    /// A queue of key timeouts.
127    timeouts: DelayQueue<K>,
128}
129
130impl<K, C> Logic<K, C> {
131    /// Create a new `Logic` using the specified `Config`.
132    pub fn new(config: Config) -> Self {
133        Self {
134            config,
135            buffers: HashMap::new(),
136            timeouts: DelayQueue::new(),
137        }
138    }
139}
140
141impl<T, K, C> LineAgg<T, K, C>
142where
143    T: Stream<Item = (K, Bytes, C)> + Unpin,
144    K: Hash + Eq + Clone,
145{
146    /// Create a new `LineAgg` using the specified `inner` stream and
147    /// preconfigured `logic`.
148    pub const fn new(inner: T, logic: Logic<K, C>) -> Self {
149        Self {
150            inner,
151            logic,
152            draining: None,
153            stashed: None,
154        }
155    }
156}
157
158impl<T, K, C> Stream for LineAgg<T, K, C>
159where
160    T: Stream<Item = (K, Bytes, C)> + Unpin,
161    K: Hash + Eq + Clone,
162{
163    /// `K` - file name, or other line source,
164    /// `Bytes` - the line data,
165    /// `C` - the initial context related to the first line of data.
166    /// `Option<C>` - context related to the last-seen line data.
167    type Item = (K, Bytes, C, Option<C>);
168
169    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170        let mut this = self.project();
171        loop {
172            // If we have a stashed line, process it before doing anything else.
173            if let Some((src, line, context)) = this.stashed.take() {
174                // Handle the stashed line. If the handler gave us something -
175                // return it, otherwise restart the loop iteration to start
176                // anew. Handler could've stashed another value, continuing to
177                // the new loop iteration handles that.
178                if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context) {
179                    return Poll::Ready(Some(val));
180                }
181                continue;
182            }
183
184            // If we're in draining mode, short circuit here.
185            if let Some(to_drain) = &mut this.draining {
186                match to_drain.pop() {
187                    Some(val) => {
188                        return Poll::Ready(Some(val));
189                    }
190                    _ => {
191                        return Poll::Ready(None);
192                    }
193                }
194            }
195
196            match this.inner.poll_next_unpin(cx) {
197                Poll::Ready(Some((src, line, context))) => {
198                    // Handle the incoming line we got from `inner`. If the
199                    // handler gave us something - return it, otherwise continue
200                    // with the flow.
201                    if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context)
202                    {
203                        return Poll::Ready(Some(val));
204                    }
205                }
206                Poll::Ready(None) => {
207                    // We got `None`, this means the `inner` stream has ended.
208                    // Start flushing all existing data, stop polling `inner`.
209                    *this.draining = Some(
210                        this.logic
211                            .buffers
212                            .drain()
213                            .map(|(src, (_, aggregate))| {
214                                let (line, initial_context, last_context) = aggregate.merge();
215                                (src, line, initial_context, last_context)
216                            })
217                            .collect(),
218                    );
219                }
220                Poll::Pending => {
221                    // We didn't get any lines from `inner`, so we just give
222                    // a line from keys that have hit their timeout.
223                    while let Poll::Ready(Some(expired_key)) = this.logic.timeouts.poll_expired(cx)
224                    {
225                        let key = expired_key.into_inner();
226                        if let Some((_, aggregate)) = this.logic.buffers.remove(&key) {
227                            let (line, initial_context, last_context) = aggregate.merge();
228                            return Poll::Ready(Some((key, line, initial_context, last_context)));
229                        }
230                    }
231
232                    return Poll::Pending;
233                }
234            };
235        }
236    }
237}
238
239impl<T, K, C> LineAgg<T, K, C>
240where
241    T: Stream<Item = (K, Bytes, C)> + Unpin,
242    K: Hash + Eq + Clone,
243{
244    /// Handle line and do stashing of extra emitted lines.
245    /// Requires that the `stashed` item is empty (i.e. entry is vacant). This
246    /// invariant has to be taken care of by the caller.
247    fn handle_line_and_stashing(
248        this: &mut LineAggProj<'_, T, K, C>,
249        src: K,
250        line: Bytes,
251        context: C,
252    ) -> Option<(K, Bytes, C, Option<C>)> {
253        // Stashed line is always consumed at the start of the `poll`
254        // loop before entering this line processing logic. If it's
255        // non-empty here - it's a bug.
256        debug_assert!(this.stashed.is_none());
257        let val = this.logic.handle_line(src, line, context)?;
258        let val = match val {
259            // If we have to emit just one line - that's easy,
260            // we just return it.
261            (src, Emit::One((line, initial_context, last_context))) => {
262                (src, line, initial_context, last_context)
263            }
264            // If we have to emit two lines - take the second
265            // one and stash it, then return the first one.
266            // This way, the stashed line will be returned
267            // on the next stream poll.
268            (
269                src,
270                Emit::Two(
271                    (line, initial_context, last_context),
272                    (line_to_stash, context_to_stash, _),
273                ),
274            ) => {
275                *this.stashed = Some((src.clone(), line_to_stash, context_to_stash));
276                (src, line, initial_context, last_context)
277            }
278        };
279        Some(val)
280    }
281}
282
283/// Specifies the amount of lines to emit in response to a single input line.
284/// We have to emit either one or two lines.
285pub enum Emit<T> {
286    /// Emit one line.
287    One(T),
288    /// Emit two lines, in the order they're specified.
289    Two(T, T),
290}
291
292/// A helper enum
293enum Decision {
294    Continue,
295    EndInclude,
296    EndExclude,
297}
298
299impl<K, C> Logic<K, C>
300where
301    K: Hash + Eq + Clone,
302{
303    /// Handle line, if we have something to output - return it.
304    pub fn handle_line(
305        &mut self,
306        src: K,
307        line: Bytes,
308        context: C,
309    ) -> Option<(K, Emit<(Bytes, C, Option<C>)>)> {
310        // Check if we already have the buffered data for the source.
311        match self.buffers.entry(src) {
312            Entry::Occupied(mut entry) => {
313                let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
314                let decision = match (self.config.mode, condition_matched) {
315                    // All consecutive lines matching this pattern are included in
316                    // the group.
317                    (Mode::ContinueThrough, true) => Decision::Continue,
318                    (Mode::ContinueThrough, false) => Decision::EndExclude,
319                    // All consecutive lines matching this pattern, plus one
320                    // additional line, are included in the group.
321                    (Mode::ContinuePast, true) => Decision::Continue,
322                    (Mode::ContinuePast, false) => Decision::EndInclude,
323                    // All consecutive lines not matching this pattern are included
324                    // in the group.
325                    (Mode::HaltBefore, true) => Decision::EndExclude,
326                    (Mode::HaltBefore, false) => Decision::Continue,
327                    // All consecutive lines, up to and including the first line
328                    // matching this pattern, are included in the group.
329                    (Mode::HaltWith, true) => Decision::EndInclude,
330                    (Mode::HaltWith, false) => Decision::Continue,
331                };
332
333                match decision {
334                    Decision::Continue => {
335                        let buffered = entry.get_mut();
336                        self.timeouts.reset(&buffered.0, self.config.timeout);
337                        buffered.1.add_next_line(line, context);
338                        None
339                    }
340                    Decision::EndInclude => {
341                        let (src, (key, mut buffered)) = entry.remove_entry();
342                        self.timeouts.remove(&key);
343                        buffered.add_next_line(line, context);
344                        Some((src, Emit::One(buffered.merge())))
345                    }
346                    Decision::EndExclude => {
347                        let (src, (key, buffered)) = entry.remove_entry();
348                        self.timeouts.remove(&key);
349                        Some((src, Emit::Two(buffered.merge(), (line, context, None))))
350                    }
351                }
352            }
353            Entry::Vacant(entry) => {
354                // This line is a candidate for buffering, or passing through.
355                if self.config.start_pattern.is_match(line.as_ref()) {
356                    // It was indeed a new line we need to filter.
357                    // Set the timeout and buffer this line.
358                    let key = self
359                        .timeouts
360                        .insert(entry.key().clone(), self.config.timeout);
361                    entry.insert((key, Aggregate::new(line, context)));
362                    None
363                } else {
364                    // It's just a regular line we don't really care about.
365                    Some((entry.into_key(), Emit::One((line, context, None))))
366                }
367            }
368        }
369    }
370}
371
372struct Aggregate<C> {
373    lines: Vec<Bytes>,
374    initial_context: C,
375    last_context: Option<C>,
376}
377
378impl<C> Aggregate<C> {
379    fn new(first_line: Bytes, initial_context: C) -> Self {
380        Self {
381            lines: vec![first_line],
382            initial_context,
383            last_context: None,
384        }
385    }
386
387    fn add_next_line(&mut self, line: Bytes, context: C) {
388        self.last_context = Some(context);
389        self.lines.push(line);
390    }
391
392    fn merge(self) -> (Bytes, C, Option<C>) {
393        let capacity = self.lines.iter().map(|line| line.len() + 1).sum::<usize>() - 1;
394        let mut bytes_mut = BytesMut::with_capacity(capacity);
395        let mut first = true;
396        for line in self.lines {
397            if first {
398                first = false;
399            } else {
400                bytes_mut.extend_from_slice(b"\n");
401            }
402            bytes_mut.extend_from_slice(&line);
403        }
404        (bytes_mut.freeze(), self.initial_context, self.last_context)
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use bytes::Bytes;
411    use futures::SinkExt;
412    use similar_asserts::assert_eq;
413    use std::fmt::Write as _;
414
415    use super::*;
416
417    #[tokio::test]
418    async fn mode_continue_through_1() {
419        let lines = vec![
420            "some usual line",
421            "some other usual line",
422            "first part",
423            " second part",
424            " last part",
425            "another normal message",
426            "finishing message",
427            " last part of the incomplete finishing message",
428        ];
429        let config = Config {
430            start_pattern: Regex::new("^[^\\s]").unwrap(),
431            condition_pattern: Regex::new("^[\\s]+").unwrap(),
432            mode: Mode::ContinueThrough,
433            timeout: Duration::from_millis(10),
434        };
435        let expected = vec![
436            ("some usual line", 0, None),
437            ("some other usual line", 1, None),
438            (
439                concat!("first part\n", " second part\n", " last part"),
440                2,
441                Some(4),
442            ),
443            ("another normal message", 5, None),
444            (
445                concat!(
446                    "finishing message\n",
447                    " last part of the incomplete finishing message"
448                ),
449                6,
450                Some(7),
451            ),
452        ];
453        run_and_assert(&lines, config, &expected).await;
454    }
455
456    #[tokio::test]
457    async fn mode_continue_past_1() {
458        let lines = vec![
459            "some usual line",
460            "some other usual line",
461            "first part \\",
462            "second part \\",
463            "last part",
464            "another normal message",
465            "finishing message \\",
466            "last part of the incomplete finishing message \\",
467        ];
468        let config = Config {
469            start_pattern: Regex::new("\\\\$").unwrap(),
470            condition_pattern: Regex::new("\\\\$").unwrap(),
471            mode: Mode::ContinuePast,
472            timeout: Duration::from_millis(10),
473        };
474        let expected = vec![
475            ("some usual line", 0, None),
476            ("some other usual line", 1, None),
477            (
478                concat!("first part \\\n", "second part \\\n", "last part"),
479                2,
480                Some(4),
481            ),
482            ("another normal message", 5, None),
483            (
484                concat!(
485                    "finishing message \\\n",
486                    "last part of the incomplete finishing message \\"
487                ),
488                6,
489                Some(7),
490            ),
491        ];
492        run_and_assert(&lines, config, &expected).await;
493    }
494
495    #[tokio::test]
496    async fn mode_halt_before_1() {
497        let lines = vec![
498            "INFO some usual line",
499            "INFO some other usual line",
500            "INFO first part",
501            "second part",
502            "last part",
503            "ERROR another normal message",
504            "ERROR finishing message",
505            "last part of the incomplete finishing message",
506        ];
507        let config = Config {
508            start_pattern: Regex::new("").unwrap(),
509            condition_pattern: Regex::new("^(INFO|ERROR) ").unwrap(),
510            mode: Mode::HaltBefore,
511            timeout: Duration::from_millis(10),
512        };
513        let expected = vec![
514            ("INFO some usual line", 0, None),
515            ("INFO some other usual line", 1, None),
516            (
517                concat!("INFO first part\n", "second part\n", "last part"),
518                2,
519                Some(4),
520            ),
521            ("ERROR another normal message", 5, None),
522            (
523                concat!(
524                    "ERROR finishing message\n",
525                    "last part of the incomplete finishing message"
526                ),
527                6,
528                Some(7),
529            ),
530        ];
531        run_and_assert(&lines, config, &expected).await;
532    }
533
534    #[tokio::test]
535    async fn mode_halt_with_1() {
536        let lines = vec![
537            "some usual line;",
538            "some other usual line;",
539            "first part",
540            "second part",
541            "last part;",
542            "another normal message;",
543            "finishing message",
544            "last part of the incomplete finishing message",
545        ];
546        let config = Config {
547            start_pattern: Regex::new("[^;]$").unwrap(),
548            condition_pattern: Regex::new(";$").unwrap(),
549            mode: Mode::HaltWith,
550            timeout: Duration::from_millis(10),
551        };
552        let expected = vec![
553            ("some usual line;", 0, None),
554            ("some other usual line;", 1, None),
555            (
556                concat!("first part\n", "second part\n", "last part;"),
557                2,
558                Some(4),
559            ),
560            ("another normal message;", 5, None),
561            (
562                concat!(
563                    "finishing message\n",
564                    "last part of the incomplete finishing message"
565                ),
566                6,
567                Some(7),
568            ),
569        ];
570        run_and_assert(&lines, config, &expected).await;
571    }
572
573    #[tokio::test]
574    async fn use_case_java_exception() {
575        let lines = vec![
576            "java.lang.Exception",
577            "    at com.foo.bar(bar.java:123)",
578            "    at com.foo.baz(baz.java:456)",
579        ];
580        let config = Config {
581            start_pattern: Regex::new("^[^\\s]").unwrap(),
582            condition_pattern: Regex::new("^[\\s]+at").unwrap(),
583            mode: Mode::ContinueThrough,
584            timeout: Duration::from_millis(10),
585        };
586        let expected = vec![(
587            concat!(
588                "java.lang.Exception\n",
589                "    at com.foo.bar(bar.java:123)\n",
590                "    at com.foo.baz(baz.java:456)",
591            ),
592            0,
593            Some(2),
594        )];
595        run_and_assert(&lines, config, &expected).await;
596    }
597
598    #[tokio::test]
599    async fn use_case_ruby_exception() {
600        let lines = vec![
601            "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
602            "\tfrom foobar.rb:6:in `bar'",
603            "\tfrom foobar.rb:2:in `foo'",
604            "\tfrom foobar.rb:9:in `<main>'",
605        ];
606        let config = Config {
607            start_pattern: Regex::new("^[^\\s]").unwrap(),
608            condition_pattern: Regex::new("^[\\s]+from").unwrap(),
609            mode: Mode::ContinueThrough,
610            timeout: Duration::from_millis(10),
611        };
612        let expected = vec![(
613            concat!(
614                "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n",
615                "\tfrom foobar.rb:6:in `bar'\n",
616                "\tfrom foobar.rb:2:in `foo'\n",
617                "\tfrom foobar.rb:9:in `<main>'",
618            ),
619            0,
620            Some(3),
621        )];
622        run_and_assert(&lines, config, &expected).await;
623    }
624
625    /// https://github.com/vectordotdev/vector/issues/3237
626    #[tokio::test]
627    async fn two_lines_emit_with_continue_through() {
628        let lines = vec![
629            "not merged 1", // will NOT be stashed, but passed-through
630            " merged 1",
631            " merged 2",
632            "not merged 2", // will be stashed
633            " merged 3",
634            " merged 4",
635            "not merged 3", // will be stashed
636            "not merged 4", // will NOT be stashed, but passed-through
637            " merged 5",
638            "not merged 5", // will be stashed
639            " merged 6",
640            " merged 7",
641            " merged 8",
642            "not merged 6", // will be stashed
643        ];
644        let config = Config {
645            start_pattern: Regex::new("^\\s").unwrap(),
646            condition_pattern: Regex::new("^\\s").unwrap(),
647            mode: Mode::ContinueThrough,
648            timeout: Duration::from_millis(10),
649        };
650        let expected = vec![
651            ("not merged 1", 0, None),
652            (" merged 1\n merged 2", 1, Some(2)),
653            ("not merged 2", 3, None),
654            (" merged 3\n merged 4", 4, Some(5)),
655            ("not merged 3", 6, None),
656            ("not merged 4", 7, None),
657            (" merged 5", 8, None),
658            ("not merged 5", 9, None),
659            (" merged 6\n merged 7\n merged 8", 10, Some(12)),
660            ("not merged 6", 13, None),
661        ];
662        run_and_assert(&lines, config, &expected).await;
663    }
664
665    #[tokio::test]
666    async fn two_lines_emit_with_halt_before() {
667        let lines = vec![
668            "part 0.1",
669            "part 0.2",
670            "START msg 1", // will be stashed
671            "part 1.1",
672            "part 1.2",
673            "START msg 2", // will be stashed
674            "START msg 3", // will be stashed
675            "part 3.1",
676            "START msg 4", // will be stashed
677            "part 4.1",
678            "part 4.2",
679            "part 4.3",
680            "START msg 5", // will be stashed
681        ];
682        let config = Config {
683            start_pattern: Regex::new("").unwrap(),
684            condition_pattern: Regex::new("^START ").unwrap(),
685            mode: Mode::HaltBefore,
686            timeout: Duration::from_millis(10),
687        };
688        let expected = vec![
689            ("part 0.1\npart 0.2", 0, Some(1)),
690            ("START msg 1\npart 1.1\npart 1.2", 2, Some(4)),
691            ("START msg 2", 5, None),
692            ("START msg 3\npart 3.1", 6, Some(7)),
693            ("START msg 4\npart 4.1\npart 4.2\npart 4.3", 8, Some(11)),
694            ("START msg 5", 12, None),
695        ];
696        run_and_assert(&lines, config, &expected).await;
697    }
698
699    #[tokio::test]
700    async fn legacy() {
701        let lines = vec![
702            "INFO some usual line",
703            "INFO some other usual line",
704            "INFO first part",
705            "second part",
706            "last part",
707            "ERROR another normal message",
708            "ERROR finishing message",
709            "last part of the incomplete finishing message",
710        ];
711        let expected = vec![
712            ("INFO some usual line", 0, None),
713            ("INFO some other usual line", 1, None),
714            (
715                concat!("INFO first part\n", "second part\n", "last part"),
716                2,
717                Some(4),
718            ),
719            ("ERROR another normal message", 5, None),
720            (
721                concat!(
722                    "ERROR finishing message\n",
723                    "last part of the incomplete finishing message"
724                ),
725                6,
726                Some(7),
727            ),
728        ];
729
730        let stream = stream_from_lines(&lines);
731        let line_agg = LineAgg::new(
732            stream,
733            Logic::new(Config::for_legacy(
734                Regex::new("^(INFO|ERROR)").unwrap(), // example from the docs
735                10,
736            )),
737        );
738        let results = line_agg.collect().await;
739        assert_results(results, &expected);
740    }
741
742    #[tokio::test]
743    async fn timeout_resets_on_new_line() {
744        // Tests if multiline aggregation updates
745        // it's timeout every time it get's a new line.
746        // To test this we are emitting a single large
747        // multiline but drip feeding it into the aggregator
748        // with 1ms delay.
749
750        let n: usize = 1000;
751        let mut lines = vec![
752            "START msg 1".to_string(), // will be stashed
753        ];
754        for i in 0..n {
755            lines.push(format!("line {i}"));
756        }
757        let config = Config {
758            start_pattern: Regex::new("").unwrap(),
759            condition_pattern: Regex::new("^START ").unwrap(),
760            mode: Mode::HaltBefore,
761            timeout: Duration::from_millis(10),
762        };
763
764        let mut expected = "START msg 1".to_string();
765        for i in 0..n {
766            write!(expected, "\nline {i}").expect("write to String never fails");
767        }
768
769        let (mut send, recv) = futures::channel::mpsc::unbounded();
770
771        let logic = Logic::new(config);
772        let line_agg = LineAgg::new(recv, logic);
773        let results = tokio::spawn(line_agg.collect());
774
775        for (index, line) in lines.iter().enumerate() {
776            let data = (
777                "test.log".to_owned(),
778                Bytes::copy_from_slice(line.as_bytes()),
779                index,
780            );
781            send.send(data).await.unwrap();
782            tokio::time::sleep(Duration::from_millis(1)).await;
783        }
784        drop(send);
785
786        assert_results(
787            results.await.unwrap(),
788            &[(expected.as_str(), 0, Some(lines.len() - 1))],
789        );
790    }
791
792    // Test helpers.
793
794    /// Private type alias to be more expressive in the internal implementation.
795    type Filename = String;
796
797    fn stream_from_lines<'a>(
798        lines: &'a [&'static str],
799    ) -> impl Stream<Item = (Filename, Bytes, usize)> + 'a {
800        futures::stream::iter(lines.iter().enumerate().map(|(index, line)| {
801            (
802                "test.log".to_owned(),
803                Bytes::from_static(line.as_bytes()),
804                index,
805            )
806        }))
807    }
808
809    /// Compare actual output to expected; expected is a list of the expected strings and context
810    fn assert_results(
811        actual: Vec<(Filename, Bytes, usize, Option<usize>)>,
812        expected: &[(&str, usize, Option<usize>)],
813    ) {
814        let expected_mapped: Vec<(Filename, Bytes, usize, Option<usize>)> = expected
815            .iter()
816            .map(|(line, context, last_context)| {
817                (
818                    "test.log".to_owned(),
819                    Bytes::copy_from_slice(line.as_bytes()),
820                    *context,
821                    *last_context,
822                )
823            })
824            .collect();
825
826        assert_eq!(
827            actual, expected_mapped,
828            "actual on the left, expected on the right",
829        );
830    }
831
832    async fn run_and_assert(
833        lines: &[&'static str],
834        config: Config,
835        expected: &[(&'static str, usize, Option<usize>)],
836    ) {
837        let stream = stream_from_lines(lines);
838        let logic = Logic::new(config);
839        let line_agg = LineAgg::new(stream, logic);
840        let results = line_agg.collect().await;
841        assert_results(results, expected);
842    }
843}