vector_core/source_sender/
builder.rs

1use std::{collections::HashMap, time::Duration};
2
3use metrics::{Histogram, histogram};
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
8use crate::config::{ComponentKey, OutputId, SourceOutput};
9
10pub struct Builder {
11    buf_size: usize,
12    default_output: Option<Output>,
13    named_outputs: HashMap<String, Output>,
14    lag_time: Option<Histogram>,
15    timeout: Option<Duration>,
16}
17
18impl Default for Builder {
19    fn default() -> Self {
20        Self {
21            buf_size: CHUNK_SIZE,
22            default_output: None,
23            named_outputs: Default::default(),
24            lag_time: Some(histogram!(LAG_TIME_NAME)),
25            timeout: None,
26        }
27    }
28}
29
30impl Builder {
31    #[must_use]
32    pub fn with_buffer(mut self, n: usize) -> Self {
33        self.buf_size = n;
34        self
35    }
36
37    #[must_use]
38    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
39        self.timeout = timeout;
40        self
41    }
42
43    pub fn add_source_output(
44        &mut self,
45        output: SourceOutput,
46        component_key: ComponentKey,
47    ) -> LimitedReceiver<SourceSenderItem> {
48        let lag_time = self.lag_time.clone();
49        let log_definition = output.schema_definition.clone();
50        let output_id = OutputId {
51            component: component_key,
52            port: output.port.clone(),
53        };
54        match output.port {
55            None => {
56                let (output, rx) = Output::new_with_buffer(
57                    self.buf_size,
58                    DEFAULT_OUTPUT.to_owned(),
59                    lag_time,
60                    log_definition,
61                    output_id,
62                    self.timeout,
63                );
64                self.default_output = Some(output);
65                rx
66            }
67            Some(name) => {
68                let (output, rx) = Output::new_with_buffer(
69                    self.buf_size,
70                    name.clone(),
71                    lag_time,
72                    log_definition,
73                    output_id,
74                    self.timeout,
75                );
76                self.named_outputs.insert(name, output);
77                rx
78            }
79        }
80    }
81
82    pub fn build(self) -> SourceSender {
83        SourceSender {
84            default_output: self.default_output,
85            named_outputs: self.named_outputs,
86        }
87    }
88}