vector/sources/util/
multiline_config.rs

1use std::{convert::TryFrom, time::Duration};
2
3use regex::bytes::Regex;
4use serde_with::serde_as;
5use snafu::{ResultExt, Snafu};
6use vector_lib::configurable::configurable_component;
7
8use crate::line_agg;
9
10/// Configuration of multi-line aggregation.
11#[serde_as]
12#[configurable_component]
13#[derive(Clone, Debug, PartialEq, Eq)]
14#[serde(deny_unknown_fields)]
15pub struct MultilineConfig {
16    /// Regular expression pattern that is used to match the start of a new message.
17    #[configurable(metadata(docs::examples = "^[\\s]+"))]
18    #[configurable(metadata(docs::examples = "\\\\$"))]
19    #[configurable(metadata(docs::examples = "^(INFO|ERROR) "))]
20    #[configurable(metadata(docs::examples = ";$"))]
21    pub start_pattern: String,
22
23    /// Regular expression pattern that is used to determine whether or not more lines should be read.
24    ///
25    /// This setting must be configured in conjunction with `mode`.
26    #[configurable(metadata(docs::examples = "^[\\s]+"))]
27    #[configurable(metadata(docs::examples = "\\\\$"))]
28    #[configurable(metadata(docs::examples = "^(INFO|ERROR) "))]
29    #[configurable(metadata(docs::examples = ";$"))]
30    pub condition_pattern: String,
31
32    /// Aggregation mode.
33    ///
34    /// This setting must be configured in conjunction with `condition_pattern`.
35    #[configurable(derived)]
36    pub mode: line_agg::Mode,
37
38    /// The maximum amount of time to wait for the next additional line, in milliseconds.
39    ///
40    /// Once this timeout is reached, the buffered message is guaranteed to be flushed, even if incomplete.
41    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
42    #[configurable(metadata(docs::examples = 1000))]
43    #[configurable(metadata(docs::examples = 600000))]
44    #[configurable(metadata(docs::human_name = "Timeout"))]
45    pub timeout_ms: Duration,
46}
47
48impl TryFrom<&MultilineConfig> for line_agg::Config {
49    type Error = Error;
50
51    fn try_from(config: &MultilineConfig) -> Result<Self, Self::Error> {
52        let MultilineConfig {
53            start_pattern,
54            condition_pattern,
55            mode,
56            timeout_ms,
57        } = config;
58
59        let start_pattern = Regex::new(start_pattern)
60            .with_context(|_| InvalidMultilineStartPatternSnafu { start_pattern })?;
61        let condition_pattern = Regex::new(condition_pattern)
62            .with_context(|_| InvalidMultilineConditionPatternSnafu { condition_pattern })?;
63        let timeout = *timeout_ms;
64
65        Ok(Self {
66            start_pattern,
67            condition_pattern,
68            mode: *mode,
69            timeout,
70        })
71    }
72}
73
74#[derive(Debug, Snafu)]
75pub enum Error {
76    #[snafu(display(
77        "unable to parse multiline start pattern from {:?}: {}",
78        start_pattern,
79        source
80    ))]
81    InvalidMultilineStartPattern {
82        start_pattern: String,
83        source: regex::Error,
84    },
85    #[snafu(display(
86        "unable to parse multiline condition pattern from {:?}: {}",
87        condition_pattern,
88        source
89    ))]
90    InvalidMultilineConditionPattern {
91        condition_pattern: String,
92        source: regex::Error,
93    },
94}