vector_core/source_sender/
builder.rs1use std::{collections::HashMap, time::Duration};
2
3use metrics::{Histogram, histogram};
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{CHUNK_SIZE, LAG_TIME_NAME, Output, SourceSender, SourceSenderItem};
8use crate::config::{ComponentKey, OutputId, SourceOutput};
9
10pub struct Builder {
11 buf_size: usize,
12 default_output: Option<Output>,
13 named_outputs: HashMap<String, Output>,
14 lag_time: Option<Histogram>,
15 timeout: Option<Duration>,
16}
17
18impl Default for Builder {
19 fn default() -> Self {
20 Self {
21 buf_size: CHUNK_SIZE,
22 default_output: None,
23 named_outputs: Default::default(),
24 lag_time: Some(histogram!(LAG_TIME_NAME)),
25 timeout: None,
26 }
27 }
28}
29
30impl Builder {
31 #[must_use]
32 pub fn with_buffer(mut self, n: usize) -> Self {
33 self.buf_size = n;
34 self
35 }
36
37 #[must_use]
38 pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
39 self.timeout = timeout;
40 self
41 }
42
43 pub fn add_source_output(
44 &mut self,
45 output: SourceOutput,
46 component_key: ComponentKey,
47 ) -> LimitedReceiver<SourceSenderItem> {
48 let lag_time = self.lag_time.clone();
49 let log_definition = output.schema_definition.clone();
50 let output_id = OutputId {
51 component: component_key,
52 port: output.port.clone(),
53 };
54 match output.port {
55 None => {
56 let (output, rx) = Output::new_with_buffer(
57 self.buf_size,
58 DEFAULT_OUTPUT.to_owned(),
59 lag_time,
60 log_definition,
61 output_id,
62 self.timeout,
63 );
64 self.default_output = Some(output);
65 rx
66 }
67 Some(name) => {
68 let (output, rx) = Output::new_with_buffer(
69 self.buf_size,
70 name.clone(),
71 lag_time,
72 log_definition,
73 output_id,
74 self.timeout,
75 );
76 self.named_outputs.insert(name, output);
77 rx
78 }
79 }
80 }
81
82 pub fn build(self) -> SourceSender {
83 SourceSender {
84 default_output: self.default_output,
85 named_outputs: self.named_outputs,
86 }
87 }
88}