vector_core/source_sender/
builder.rs1use std::{collections::HashMap, time::Duration};
2
3use metrics::{Histogram, histogram};
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
8use crate::config::{ComponentKey, OutputId, SourceOutput};
9
10pub struct Builder {
11 buf_size: usize,
12 default_output: Option<Output>,
13 named_outputs: HashMap<String, Output>,
14 lag_time: Option<Histogram>,
15 timeout: Option<Duration>,
16 ewma_alpha: Option<f64>,
17}
18
19impl Default for Builder {
20 fn default() -> Self {
21 Self {
22 buf_size: CHUNK_SIZE,
23 default_output: None,
24 named_outputs: Default::default(),
25 lag_time: Some(histogram!(LAG_TIME_NAME)),
26 timeout: None,
27 ewma_alpha: None,
28 }
29 }
30}
31
32impl Builder {
33 #[must_use]
34 pub fn with_buffer(mut self, n: usize) -> Self {
35 self.buf_size = n;
36 self
37 }
38
39 #[must_use]
40 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
41 self.timeout = timeout;
42 self
43 }
44
45 #[must_use]
46 pub fn with_ewma_alpha(mut self, alpha: Option<f64>) -> Self {
47 self.ewma_alpha = alpha;
48 self
49 }
50
51 pub fn add_source_output(
52 &mut self,
53 output: SourceOutput,
54 component_key: ComponentKey,
55 ) -> LimitedReceiver<SourceSenderItem> {
56 let lag_time = self.lag_time.clone();
57 let log_definition = output.schema_definition.clone();
58 let output_id = OutputId {
59 component: component_key,
60 port: output.port.clone(),
61 };
62 match output.port {
63 None => {
64 let (output, rx) = Output::new_with_buffer(
65 self.buf_size,
66 DEFAULT_OUTPUT.to_owned(),
67 lag_time,
68 log_definition,
69 output_id,
70 self.timeout,
71 self.ewma_alpha,
72 );
73 self.default_output = Some(output);
74 rx
75 }
76 Some(name) => {
77 let (output, rx) = Output::new_with_buffer(
78 self.buf_size,
79 name.clone(),
80 lag_time,
81 log_definition,
82 output_id,
83 self.timeout,
84 self.ewma_alpha,
85 );
86 self.named_outputs.insert(name, output);
87 rx
88 }
89 }
90 }
91
92 pub fn build(self) -> SourceSender {
93 SourceSender {
94 default_output: self.default_output,
95 named_outputs: self.named_outputs,
96 }
97 }
98}