vector_core/source_sender/
builder.rs

1use std::{collections::HashMap, time::Duration};
2
3use metrics::{Histogram, histogram};
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
8use crate::config::{ComponentKey, OutputId, SourceOutput};
9
10pub struct Builder {
11    buf_size: usize,
12    default_output: Option<Output>,
13    named_outputs: HashMap<String, Output>,
14    lag_time: Option<Histogram>,
15    timeout: Option<Duration>,
16    ewma_alpha: Option<f64>,
17}
18
19impl Default for Builder {
20    fn default() -> Self {
21        Self {
22            buf_size: CHUNK_SIZE,
23            default_output: None,
24            named_outputs: Default::default(),
25            lag_time: Some(histogram!(LAG_TIME_NAME)),
26            timeout: None,
27            ewma_alpha: None,
28        }
29    }
30}
31
32impl Builder {
33    #[must_use]
34    pub fn with_buffer(mut self, n: usize) -> Self {
35        self.buf_size = n;
36        self
37    }
38
39    #[must_use]
40    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
41        self.timeout = timeout;
42        self
43    }
44
45    #[must_use]
46    pub fn with_ewma_alpha(mut self, alpha: Option<f64>) -> Self {
47        self.ewma_alpha = alpha;
48        self
49    }
50
51    pub fn add_source_output(
52        &mut self,
53        output: SourceOutput,
54        component_key: ComponentKey,
55    ) -> LimitedReceiver<SourceSenderItem> {
56        let lag_time = self.lag_time.clone();
57        let log_definition = output.schema_definition.clone();
58        let output_id = OutputId {
59            component: component_key,
60            port: output.port.clone(),
61        };
62        match output.port {
63            None => {
64                let (output, rx) = Output::new_with_buffer(
65                    self.buf_size,
66                    DEFAULT_OUTPUT.to_owned(),
67                    lag_time,
68                    log_definition,
69                    output_id,
70                    self.timeout,
71                    self.ewma_alpha,
72                );
73                self.default_output = Some(output);
74                rx
75            }
76            Some(name) => {
77                let (output, rx) = Output::new_with_buffer(
78                    self.buf_size,
79                    name.clone(),
80                    lag_time,
81                    log_definition,
82                    output_id,
83                    self.timeout,
84                    self.ewma_alpha,
85                );
86                self.named_outputs.insert(name, output);
87                rx
88            }
89        }
90    }
91
92    pub fn build(self) -> SourceSender {
93        SourceSender {
94            default_output: self.default_output,
95            named_outputs: self.named_outputs,
96        }
97    }
98}