vector_core/source_sender/
builder.rs1use std::{collections::HashMap, time::Duration};
2
3use metrics::histogram;
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{
8 CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME,
9 SourceSender, SourceSenderItem,
10};
11use crate::config::{ComponentKey, OutputId, SourceOutput};
12
13pub struct Builder {
14 buf_size: usize,
15 default_output: Option<Output>,
16 named_outputs: HashMap<String, Output>,
17 output_metrics: OutputMetrics,
18 timeout: Option<Duration>,
19 ewma_half_life_seconds: Option<f64>,
20}
21
22impl Default for Builder {
23 fn default() -> Self {
24 Self {
25 buf_size: CHUNK_SIZE,
26 default_output: None,
27 named_outputs: Default::default(),
28 output_metrics: OutputMetrics::new(
29 Some(histogram!(LAG_TIME_NAME)),
30 Some(histogram!(SEND_LATENCY_NAME)),
31 Some(histogram!(SEND_BATCH_LATENCY_NAME)),
32 ),
33 timeout: None,
34 ewma_half_life_seconds: None,
35 }
36 }
37}
38
39impl Builder {
40 #[must_use]
41 pub fn with_buffer(mut self, n: usize) -> Self {
42 self.buf_size = n;
43 self
44 }
45
46 #[must_use]
47 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
48 self.timeout = timeout;
49 self
50 }
51
52 #[must_use]
53 pub fn with_ewma_half_life_seconds(mut self, half_life_seconds: Option<f64>) -> Self {
54 self.ewma_half_life_seconds = half_life_seconds;
55 self
56 }
57
58 pub fn add_source_output(
59 &mut self,
60 output: SourceOutput,
61 component_key: ComponentKey,
62 ) -> LimitedReceiver<SourceSenderItem> {
63 let log_definition = output.schema_definition.clone();
64 let output_id = OutputId {
65 component: component_key,
66 port: output.port.clone(),
67 };
68 match output.port {
69 None => {
70 let (output, rx) = Output::new_with_buffer(
71 self.buf_size,
72 DEFAULT_OUTPUT.to_owned(),
73 self.output_metrics.clone(),
74 log_definition,
75 output_id,
76 self.timeout,
77 self.ewma_half_life_seconds,
78 );
79 self.default_output = Some(output);
80 rx
81 }
82 Some(name) => {
83 let (output, rx) = Output::new_with_buffer(
84 self.buf_size,
85 name.clone(),
86 self.output_metrics.clone(),
87 log_definition,
88 output_id,
89 self.timeout,
90 self.ewma_half_life_seconds,
91 );
92 self.named_outputs.insert(name, output);
93 rx
94 }
95 }
96 }
97
98 pub fn build(self) -> SourceSender {
99 SourceSender {
100 default_output: self.default_output,
101 named_outputs: self.named_outputs,
102 }
103 }
104}