vector/
line_agg.rs

1//! A reusable line aggregation implementation.
2
3#![deny(missing_docs)]
4
5use std::{
6    collections::{HashMap, hash_map::Entry},
7    hash::Hash,
8    pin::Pin,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use bytes::{Bytes, BytesMut};
14use futures::{Stream, StreamExt};
15use pin_project::pin_project;
16use regex::bytes::Regex;
17use tokio_util::time::delay_queue::{DelayQueue, Key};
18use vector_lib::configurable::configurable_component;
19
20/// Mode of operation of the line aggregator.
21#[configurable_component]
22#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
23#[serde(rename_all = "snake_case")]
24pub enum Mode {
25    /// All consecutive lines matching this pattern are included in the group.
26    ///
27    /// The first line (the line that matched the start pattern) does not need to match the `ContinueThrough` pattern.
28    ///
29    /// This is useful in cases such as a Java stack trace, where some indicator in the line (such as a leading
30    /// whitespace) indicates that it is an extension of the proceeding line.
31    ContinueThrough,
32
33    /// All consecutive lines matching this pattern, plus one additional line, are included in the group.
34    ///
35    /// This is useful in cases where a log message ends with a continuation marker, such as a backslash, indicating
36    /// that the following line is part of the same message.
37    ContinuePast,
38
39    /// All consecutive lines not matching this pattern are included in the group.
40    ///
41    /// This is useful where a log line contains a marker indicating that it begins a new message.
42    HaltBefore,
43
44    /// All consecutive lines, up to and including the first line matching this pattern, are included in the group.
45    ///
46    /// This is useful where a log line ends with a termination marker, such as a semicolon.
47    HaltWith,
48}
49
50/// Configuration of multi-line aggregation.
51#[derive(Clone, Debug)]
52pub struct Config {
53    /// Regular expression pattern that is used to match the start of a new message.
54    pub start_pattern: Regex,
55
56    /// Regular expression pattern that is used to determine whether or not more lines should be read.
57    ///
58    /// This setting must be configured in conjunction with `mode`.
59    pub condition_pattern: Regex,
60
61    /// Aggregation mode.
62    ///
63    /// This setting must be configured in conjunction with `condition_pattern`.
64    pub mode: Mode,
65
66    /// The maximum amount of time to wait for the next additional line, in milliseconds.
67    ///
68    /// Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.
69    pub timeout: Duration,
70}
71
72impl Config {
73    /// Build `Config` from legacy `file` source line aggregator configuration
74    /// params.
75    pub fn for_legacy(marker: Regex, timeout_ms: u64) -> Self {
76        let start_pattern = marker;
77        let condition_pattern = start_pattern.clone();
78        let mode = Mode::HaltBefore;
79        let timeout = Duration::from_millis(timeout_ms);
80
81        Self {
82            start_pattern,
83            condition_pattern,
84            mode,
85            timeout,
86        }
87    }
88}
89
90/// Line aggregator.
91///
92/// Provides a `Stream` implementation that reads lines from the `inner` stream
93/// and yields aggregated lines.
94#[pin_project(project = LineAggProj)]
95pub struct LineAgg<T, K, C> {
96    /// The stream from which we read the lines.
97    #[pin]
98    inner: T,
99
100    /// The core line aggregation logic.
101    logic: Logic<K, C>,
102
103    /// Stashed lines. When line aggregation results in more than one line being
104    /// emitted, we have to stash lines and return them into the stream after
105    /// that before doing any other work.
106    stashed: Option<(K, Bytes, C)>,
107
108    /// Draining queue. We switch to draining mode when we get `None` from
109    /// the inner stream. In this mode we stop polling `inner` for new lines
110    /// and just flush all the buffered data.
111    draining: Option<Vec<(K, Bytes, C, Option<C>)>>,
112}
113
114/// Core line aggregation logic.
115///
116/// Encapsulates the essential state and the core logic for the line
117/// aggregation algorithm.
118pub struct Logic<K, C> {
119    /// Configuration parameters to use.
120    config: Config,
121
122    /// Line per key.
123    /// Key is usually a filename or other line source identifier.
124    buffers: HashMap<K, (Key, Aggregate<C>)>,
125
126    /// A queue of key timeouts.
127    timeouts: DelayQueue<K>,
128}
129
130impl<K, C> Logic<K, C> {
131    /// Create a new `Logic` using the specified `Config`.
132    pub fn new(config: Config) -> Self {
133        Self {
134            config,
135            buffers: HashMap::new(),
136            timeouts: DelayQueue::new(),
137        }
138    }
139}
140
141impl<T, K, C> LineAgg<T, K, C>
142where
143    T: Stream<Item = (K, Bytes, C)> + Unpin,
144    K: Hash + Eq + Clone,
145{
146    /// Create a new `LineAgg` using the specified `inner` stream and
147    /// preconfigured `logic`.
148    pub const fn new(inner: T, logic: Logic<K, C>) -> Self {
149        Self {
150            inner,
151            logic,
152            draining: None,
153            stashed: None,
154        }
155    }
156}
157
158impl<T, K, C> Stream for LineAgg<T, K, C>
159where
160    T: Stream<Item = (K, Bytes, C)> + Unpin,
161    K: Hash + Eq + Clone,
162{
163    /// `K` - file name, or other line source,
164    /// `Bytes` - the line data,
165    /// `C` - the initial context related to the first line of data.
166    /// `Option<C>` - context related to the last-seen line data.
167    type Item = (K, Bytes, C, Option<C>);
168
169    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
170        let mut this = self.project();
171        loop {
172            // If we have a stashed line, process it before doing anything else.
173            if let Some((src, line, context)) = this.stashed.take() {
174                // Handle the stashed line. If the handler gave us something -
175                // return it, otherwise restart the loop iteration to start
176                // anew. Handler could've stashed another value, continuing to
177                // the new loop iteration handles that.
178                if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context) {
179                    return Poll::Ready(Some(val));
180                }
181                continue;
182            }
183
184            // If we're in draining mode, short circuit here.
185            if let Some(to_drain) = &mut this.draining {
186                match to_drain.pop() {
187                    Some(val) => {
188                        return Poll::Ready(Some(val));
189                    }
190                    _ => {
191                        return Poll::Ready(None);
192                    }
193                }
194            }
195
196            match this.inner.poll_next_unpin(cx) {
197                Poll::Ready(Some((src, line, context))) => {
198                    // Handle the incoming line we got from `inner`. If the
199                    // handler gave us something - return it, otherwise continue
200                    // with the flow.
201                    if let Some(val) = Self::handle_line_and_stashing(&mut this, src, line, context)
202                    {
203                        return Poll::Ready(Some(val));
204                    }
205                }
206                Poll::Ready(None) => {
207                    // We got `None`, this means the `inner` stream has ended.
208                    // Start flushing all existing data, stop polling `inner`.
209                    *this.draining = Some(
210                        this.logic
211                            .buffers
212                            .drain()
213                            .map(|(src, (_, aggregate))| {
214                                let (line, initial_context, last_context) = aggregate.merge();
215                                (src, line, initial_context, last_context)
216                            })
217                            .collect(),
218                    );
219                }
220                Poll::Pending => {
221                    // We didn't get any lines from `inner`, so we just give
222                    // a line from keys that have hit their timeout.
223                    while let Poll::Ready(Some(expired_key)) = this.logic.timeouts.poll_expired(cx)
224                    {
225                        let key = expired_key.into_inner();
226                        if let Some((_, aggregate)) = this.logic.buffers.remove(&key) {
227                            let (line, initial_context, last_context) = aggregate.merge();
228                            return Poll::Ready(Some((key, line, initial_context, last_context)));
229                        }
230                    }
231
232                    return Poll::Pending;
233                }
234            };
235        }
236    }
237}
238
239impl<T, K, C> LineAgg<T, K, C>
240where
241    T: Stream<Item = (K, Bytes, C)> + Unpin,
242    K: Hash + Eq + Clone,
243{
244    /// Handle line and do stashing of extra emitted lines.
245    /// Requires that the `stashed` item is empty (i.e. entry is vacant). This
246    /// invariant has to be taken care of by the caller.
247    fn handle_line_and_stashing(
248        this: &mut LineAggProj<'_, T, K, C>,
249        src: K,
250        line: Bytes,
251        context: C,
252    ) -> Option<(K, Bytes, C, Option<C>)> {
253        // Stashed line is always consumed at the start of the `poll`
254        // loop before entering this line processing logic. If it's
255        // non-empty here - it's a bug.
256        debug_assert!(this.stashed.is_none());
257        let val = this.logic.handle_line(src, line, context)?;
258        let val = match val {
259            // If we have to emit just one line - that's easy,
260            // we just return it.
261            (src, Emit::One((line, initial_context, last_context))) => {
262                (src, line, initial_context, last_context)
263            }
264            // If we have to emit two lines - take the second
265            // one and stash it, then return the first one.
266            // This way, the stashed line will be returned
267            // on the next stream poll.
268            (
269                src,
270                Emit::Two(
271                    (line, initial_context, last_context),
272                    (line_to_stash, context_to_stash, _),
273                ),
274            ) => {
275                *this.stashed = Some((src.clone(), line_to_stash, context_to_stash));
276                (src, line, initial_context, last_context)
277            }
278        };
279        Some(val)
280    }
281}
282
283/// Specifies the amount of lines to emit in response to a single input line.
284/// We have to emit either one or two lines.
285pub enum Emit<T> {
286    /// Emit one line.
287    One(T),
288    /// Emit two lines, in the order they're specified.
289    Two(T, T),
290}
291
292/// A helper enum
293enum Decision {
294    Continue,
295    EndInclude,
296    EndExclude,
297}
298
299impl<K, C> Logic<K, C>
300where
301    K: Hash + Eq + Clone,
302{
303    /// Handle line, if we have something to output - return it.
304    pub fn handle_line(
305        &mut self,
306        src: K,
307        line: Bytes,
308        context: C,
309    ) -> Option<(K, Emit<(Bytes, C, Option<C>)>)> {
310        // Check if we already have the buffered data for the source.
311        match self.buffers.entry(src) {
312            Entry::Occupied(mut entry) => {
313                let condition_matched = self.config.condition_pattern.is_match(line.as_ref());
314                let decision = match (self.config.mode, condition_matched) {
315                    // All consecutive lines matching this pattern are included in
316                    // the group.
317                    (Mode::ContinueThrough, true) => Decision::Continue,
318                    (Mode::ContinueThrough, false) => Decision::EndExclude,
319                    // All consecutive lines matching this pattern, plus one
320                    // additional line, are included in the group.
321                    (Mode::ContinuePast, true) => Decision::Continue,
322                    (Mode::ContinuePast, false) => Decision::EndInclude,
323                    // All consecutive lines not matching this pattern are included
324                    // in the group.
325                    (Mode::HaltBefore, true) => Decision::EndExclude,
326                    (Mode::HaltBefore, false) => Decision::Continue,
327                    // All consecutive lines, up to and including the first line
328                    // matching this pattern, are included in the group.
329                    (Mode::HaltWith, true) => Decision::EndInclude,
330                    (Mode::HaltWith, false) => Decision::Continue,
331                };
332
333                match decision {
334                    Decision::Continue => {
335                        let buffered = entry.get_mut();
336                        self.timeouts.reset(&buffered.0, self.config.timeout);
337                        buffered.1.add_next_line(line, context);
338                        None
339                    }
340                    Decision::EndInclude => {
341                        let (src, (key, mut buffered)) = entry.remove_entry();
342                        self.timeouts.remove(&key);
343                        buffered.add_next_line(line, context);
344                        Some((src, Emit::One(buffered.merge())))
345                    }
346                    Decision::EndExclude => {
347                        let (src, (key, buffered)) = entry.remove_entry();
348                        self.timeouts.remove(&key);
349                        Some((src, Emit::Two(buffered.merge(), (line, context, None))))
350                    }
351                }
352            }
353            Entry::Vacant(entry) => {
354                // This line is a candidate for buffering, or passing through.
355                if self.config.start_pattern.is_match(line.as_ref()) {
356                    // It was indeed a new line we need to filter.
357                    // Set the timeout and buffer this line.
358                    let key = self
359                        .timeouts
360                        .insert(entry.key().clone(), self.config.timeout);
361                    entry.insert((key, Aggregate::new(line, context)));
362                    None
363                } else {
364                    // It's just a regular line we don't really care about.
365                    Some((entry.into_key(), Emit::One((line, context, None))))
366                }
367            }
368        }
369    }
370}
371
372struct Aggregate<C> {
373    lines: Vec<Bytes>,
374    initial_context: C,
375    last_context: Option<C>,
376}
377
378impl<C> Aggregate<C> {
379    fn new(first_line: Bytes, initial_context: C) -> Self {
380        Self {
381            lines: vec![first_line],
382            initial_context,
383            last_context: None,
384        }
385    }
386
387    fn add_next_line(&mut self, line: Bytes, context: C) {
388        self.last_context = Some(context);
389        self.lines.push(line);
390    }
391
392    fn merge(self) -> (Bytes, C, Option<C>) {
393        let capacity = self.lines.iter().map(|line| line.len() + 1).sum::<usize>() - 1;
394        let mut bytes_mut = BytesMut::with_capacity(capacity);
395        let mut first = true;
396        for line in self.lines {
397            if first {
398                first = false;
399            } else {
400                bytes_mut.extend_from_slice(b"\n");
401            }
402            bytes_mut.extend_from_slice(&line);
403        }
404        (bytes_mut.freeze(), self.initial_context, self.last_context)
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use std::fmt::Write as _;
411
412    use bytes::Bytes;
413    use futures::SinkExt;
414    use similar_asserts::assert_eq;
415
416    use super::*;
417
418    #[tokio::test]
419    async fn mode_continue_through_1() {
420        let lines = vec![
421            "some usual line",
422            "some other usual line",
423            "first part",
424            " second part",
425            " last part",
426            "another normal message",
427            "finishing message",
428            " last part of the incomplete finishing message",
429        ];
430        let config = Config {
431            start_pattern: Regex::new("^[^\\s]").unwrap(),
432            condition_pattern: Regex::new("^[\\s]+").unwrap(),
433            mode: Mode::ContinueThrough,
434            timeout: Duration::from_millis(10),
435        };
436        let expected = vec![
437            ("some usual line", 0, None),
438            ("some other usual line", 1, None),
439            (
440                concat!("first part\n", " second part\n", " last part"),
441                2,
442                Some(4),
443            ),
444            ("another normal message", 5, None),
445            (
446                concat!(
447                    "finishing message\n",
448                    " last part of the incomplete finishing message"
449                ),
450                6,
451                Some(7),
452            ),
453        ];
454        run_and_assert(&lines, config, &expected).await;
455    }
456
457    #[tokio::test]
458    async fn mode_continue_past_1() {
459        let lines = vec![
460            "some usual line",
461            "some other usual line",
462            "first part \\",
463            "second part \\",
464            "last part",
465            "another normal message",
466            "finishing message \\",
467            "last part of the incomplete finishing message \\",
468        ];
469        let config = Config {
470            start_pattern: Regex::new("\\\\$").unwrap(),
471            condition_pattern: Regex::new("\\\\$").unwrap(),
472            mode: Mode::ContinuePast,
473            timeout: Duration::from_millis(10),
474        };
475        let expected = vec![
476            ("some usual line", 0, None),
477            ("some other usual line", 1, None),
478            (
479                concat!("first part \\\n", "second part \\\n", "last part"),
480                2,
481                Some(4),
482            ),
483            ("another normal message", 5, None),
484            (
485                concat!(
486                    "finishing message \\\n",
487                    "last part of the incomplete finishing message \\"
488                ),
489                6,
490                Some(7),
491            ),
492        ];
493        run_and_assert(&lines, config, &expected).await;
494    }
495
496    #[tokio::test]
497    async fn mode_halt_before_1() {
498        let lines = vec![
499            "INFO some usual line",
500            "INFO some other usual line",
501            "INFO first part",
502            "second part",
503            "last part",
504            "ERROR another normal message",
505            "ERROR finishing message",
506            "last part of the incomplete finishing message",
507        ];
508        let config = Config {
509            start_pattern: Regex::new("").unwrap(),
510            condition_pattern: Regex::new("^(INFO|ERROR) ").unwrap(),
511            mode: Mode::HaltBefore,
512            timeout: Duration::from_millis(10),
513        };
514        let expected = vec![
515            ("INFO some usual line", 0, None),
516            ("INFO some other usual line", 1, None),
517            (
518                concat!("INFO first part\n", "second part\n", "last part"),
519                2,
520                Some(4),
521            ),
522            ("ERROR another normal message", 5, None),
523            (
524                concat!(
525                    "ERROR finishing message\n",
526                    "last part of the incomplete finishing message"
527                ),
528                6,
529                Some(7),
530            ),
531        ];
532        run_and_assert(&lines, config, &expected).await;
533    }
534
535    #[tokio::test]
536    async fn mode_halt_with_1() {
537        let lines = vec![
538            "some usual line;",
539            "some other usual line;",
540            "first part",
541            "second part",
542            "last part;",
543            "another normal message;",
544            "finishing message",
545            "last part of the incomplete finishing message",
546        ];
547        let config = Config {
548            start_pattern: Regex::new("[^;]$").unwrap(),
549            condition_pattern: Regex::new(";$").unwrap(),
550            mode: Mode::HaltWith,
551            timeout: Duration::from_millis(10),
552        };
553        let expected = vec![
554            ("some usual line;", 0, None),
555            ("some other usual line;", 1, None),
556            (
557                concat!("first part\n", "second part\n", "last part;"),
558                2,
559                Some(4),
560            ),
561            ("another normal message;", 5, None),
562            (
563                concat!(
564                    "finishing message\n",
565                    "last part of the incomplete finishing message"
566                ),
567                6,
568                Some(7),
569            ),
570        ];
571        run_and_assert(&lines, config, &expected).await;
572    }
573
574    #[tokio::test]
575    async fn use_case_java_exception() {
576        let lines = vec![
577            "java.lang.Exception",
578            "    at com.foo.bar(bar.java:123)",
579            "    at com.foo.baz(baz.java:456)",
580        ];
581        let config = Config {
582            start_pattern: Regex::new("^[^\\s]").unwrap(),
583            condition_pattern: Regex::new("^[\\s]+at").unwrap(),
584            mode: Mode::ContinueThrough,
585            timeout: Duration::from_millis(10),
586        };
587        let expected = vec![(
588            concat!(
589                "java.lang.Exception\n",
590                "    at com.foo.bar(bar.java:123)\n",
591                "    at com.foo.baz(baz.java:456)",
592            ),
593            0,
594            Some(2),
595        )];
596        run_and_assert(&lines, config, &expected).await;
597    }
598
599    #[tokio::test]
600    async fn use_case_ruby_exception() {
601        let lines = vec![
602            "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)",
603            "\tfrom foobar.rb:6:in `bar'",
604            "\tfrom foobar.rb:2:in `foo'",
605            "\tfrom foobar.rb:9:in `<main>'",
606        ];
607        let config = Config {
608            start_pattern: Regex::new("^[^\\s]").unwrap(),
609            condition_pattern: Regex::new("^[\\s]+from").unwrap(),
610            mode: Mode::ContinueThrough,
611            timeout: Duration::from_millis(10),
612        };
613        let expected = vec![(
614            concat!(
615                "foobar.rb:6:in `/': divided by 0 (ZeroDivisionError)\n",
616                "\tfrom foobar.rb:6:in `bar'\n",
617                "\tfrom foobar.rb:2:in `foo'\n",
618                "\tfrom foobar.rb:9:in `<main>'",
619            ),
620            0,
621            Some(3),
622        )];
623        run_and_assert(&lines, config, &expected).await;
624    }
625
626    /// https://github.com/vectordotdev/vector/issues/3237
627    #[tokio::test]
628    async fn two_lines_emit_with_continue_through() {
629        let lines = vec![
630            "not merged 1", // will NOT be stashed, but passed-through
631            " merged 1",
632            " merged 2",
633            "not merged 2", // will be stashed
634            " merged 3",
635            " merged 4",
636            "not merged 3", // will be stashed
637            "not merged 4", // will NOT be stashed, but passed-through
638            " merged 5",
639            "not merged 5", // will be stashed
640            " merged 6",
641            " merged 7",
642            " merged 8",
643            "not merged 6", // will be stashed
644        ];
645        let config = Config {
646            start_pattern: Regex::new("^\\s").unwrap(),
647            condition_pattern: Regex::new("^\\s").unwrap(),
648            mode: Mode::ContinueThrough,
649            timeout: Duration::from_millis(10),
650        };
651        let expected = vec![
652            ("not merged 1", 0, None),
653            (" merged 1\n merged 2", 1, Some(2)),
654            ("not merged 2", 3, None),
655            (" merged 3\n merged 4", 4, Some(5)),
656            ("not merged 3", 6, None),
657            ("not merged 4", 7, None),
658            (" merged 5", 8, None),
659            ("not merged 5", 9, None),
660            (" merged 6\n merged 7\n merged 8", 10, Some(12)),
661            ("not merged 6", 13, None),
662        ];
663        run_and_assert(&lines, config, &expected).await;
664    }
665
666    #[tokio::test]
667    async fn two_lines_emit_with_halt_before() {
668        let lines = vec![
669            "part 0.1",
670            "part 0.2",
671            "START msg 1", // will be stashed
672            "part 1.1",
673            "part 1.2",
674            "START msg 2", // will be stashed
675            "START msg 3", // will be stashed
676            "part 3.1",
677            "START msg 4", // will be stashed
678            "part 4.1",
679            "part 4.2",
680            "part 4.3",
681            "START msg 5", // will be stashed
682        ];
683        let config = Config {
684            start_pattern: Regex::new("").unwrap(),
685            condition_pattern: Regex::new("^START ").unwrap(),
686            mode: Mode::HaltBefore,
687            timeout: Duration::from_millis(10),
688        };
689        let expected = vec![
690            ("part 0.1\npart 0.2", 0, Some(1)),
691            ("START msg 1\npart 1.1\npart 1.2", 2, Some(4)),
692            ("START msg 2", 5, None),
693            ("START msg 3\npart 3.1", 6, Some(7)),
694            ("START msg 4\npart 4.1\npart 4.2\npart 4.3", 8, Some(11)),
695            ("START msg 5", 12, None),
696        ];
697        run_and_assert(&lines, config, &expected).await;
698    }
699
700    #[tokio::test]
701    async fn legacy() {
702        let lines = vec![
703            "INFO some usual line",
704            "INFO some other usual line",
705            "INFO first part",
706            "second part",
707            "last part",
708            "ERROR another normal message",
709            "ERROR finishing message",
710            "last part of the incomplete finishing message",
711        ];
712        let expected = vec![
713            ("INFO some usual line", 0, None),
714            ("INFO some other usual line", 1, None),
715            (
716                concat!("INFO first part\n", "second part\n", "last part"),
717                2,
718                Some(4),
719            ),
720            ("ERROR another normal message", 5, None),
721            (
722                concat!(
723                    "ERROR finishing message\n",
724                    "last part of the incomplete finishing message"
725                ),
726                6,
727                Some(7),
728            ),
729        ];
730
731        let stream = stream_from_lines(&lines);
732        let line_agg = LineAgg::new(
733            stream,
734            Logic::new(Config::for_legacy(
735                Regex::new("^(INFO|ERROR)").unwrap(), // example from the docs
736                10,
737            )),
738        );
739        let results = line_agg.collect().await;
740        assert_results(results, &expected);
741    }
742
743    #[tokio::test]
744    async fn timeout_resets_on_new_line() {
745        // Tests if multiline aggregation updates
746        // it's timeout every time it get's a new line.
747        // To test this we are emitting a single large
748        // multiline but drip feeding it into the aggregator
749        // with 1ms delay.
750
751        let n: usize = 1000;
752        let mut lines = vec![
753            "START msg 1".to_string(), // will be stashed
754        ];
755        for i in 0..n {
756            lines.push(format!("line {i}"));
757        }
758        let config = Config {
759            start_pattern: Regex::new("").unwrap(),
760            condition_pattern: Regex::new("^START ").unwrap(),
761            mode: Mode::HaltBefore,
762            timeout: Duration::from_millis(10),
763        };
764
765        let mut expected = "START msg 1".to_string();
766        for i in 0..n {
767            write!(expected, "\nline {i}").expect("write to String never fails");
768        }
769
770        let (mut send, recv) = futures::channel::mpsc::unbounded();
771
772        let logic = Logic::new(config);
773        let line_agg = LineAgg::new(recv, logic);
774        let results = tokio::spawn(line_agg.collect());
775
776        for (index, line) in lines.iter().enumerate() {
777            let data = (
778                "test.log".to_owned(),
779                Bytes::copy_from_slice(line.as_bytes()),
780                index,
781            );
782            send.send(data).await.unwrap();
783            tokio::time::sleep(Duration::from_millis(1)).await;
784        }
785        drop(send);
786
787        assert_results(
788            results.await.unwrap(),
789            &[(expected.as_str(), 0, Some(lines.len() - 1))],
790        );
791    }
792
793    // Test helpers.
794
795    /// Private type alias to be more expressive in the internal implementation.
796    type Filename = String;
797
798    fn stream_from_lines<'a>(
799        lines: &'a [&'static str],
800    ) -> impl Stream<Item = (Filename, Bytes, usize)> + 'a {
801        futures::stream::iter(lines.iter().enumerate().map(|(index, line)| {
802            (
803                "test.log".to_owned(),
804                Bytes::from_static(line.as_bytes()),
805                index,
806            )
807        }))
808    }
809
810    /// Compare actual output to expected; expected is a list of the expected strings and context
811    fn assert_results(
812        actual: Vec<(Filename, Bytes, usize, Option<usize>)>,
813        expected: &[(&str, usize, Option<usize>)],
814    ) {
815        let expected_mapped: Vec<(Filename, Bytes, usize, Option<usize>)> = expected
816            .iter()
817            .map(|(line, context, last_context)| {
818                (
819                    "test.log".to_owned(),
820                    Bytes::copy_from_slice(line.as_bytes()),
821                    *context,
822                    *last_context,
823                )
824            })
825            .collect();
826
827        assert_eq!(
828            actual, expected_mapped,
829            "actual on the left, expected on the right",
830        );
831    }
832
833    async fn run_and_assert(
834        lines: &[&'static str],
835        config: Config,
836        expected: &[(&'static str, usize, Option<usize>)],
837    ) {
838        let stream = stream_from_lines(lines);
839        let logic = Logic::new(config);
840        let line_agg = LineAgg::new(stream, logic);
841        let results = line_agg.collect().await;
842        assert_results(results, expected);
843    }
844}