vector_core/source_sender/
builder.rs

1use std::{collections::HashMap, time::Duration};
2
3use metrics::histogram;
4use vector_buffers::topology::channel::LimitedReceiver;
5use vector_common::internal_event::DEFAULT_OUTPUT;
6
7use super::{
8    CHUNK_SIZE, LAG_TIME_NAME, Output, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME,
9    SourceSender, SourceSenderItem,
10};
11use crate::config::{ComponentKey, OutputId, SourceOutput};
12
13pub struct Builder {
14    buf_size: usize,
15    default_output: Option<Output>,
16    named_outputs: HashMap<String, Output>,
17    output_metrics: OutputMetrics,
18    timeout: Option<Duration>,
19    ewma_half_life_seconds: Option<f64>,
20}
21
22impl Default for Builder {
23    fn default() -> Self {
24        Self {
25            buf_size: CHUNK_SIZE,
26            default_output: None,
27            named_outputs: Default::default(),
28            output_metrics: OutputMetrics::new(
29                Some(histogram!(LAG_TIME_NAME)),
30                Some(histogram!(SEND_LATENCY_NAME)),
31                Some(histogram!(SEND_BATCH_LATENCY_NAME)),
32            ),
33            timeout: None,
34            ewma_half_life_seconds: None,
35        }
36    }
37}
38
39impl Builder {
40    #[must_use]
41    pub fn with_buffer(mut self, n: usize) -> Self {
42        self.buf_size = n;
43        self
44    }
45
46    #[must_use]
47    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
48        self.timeout = timeout;
49        self
50    }
51
52    #[must_use]
53    pub fn with_ewma_half_life_seconds(mut self, half_life_seconds: Option<f64>) -> Self {
54        self.ewma_half_life_seconds = half_life_seconds;
55        self
56    }
57
58    pub fn add_source_output(
59        &mut self,
60        output: SourceOutput,
61        component_key: ComponentKey,
62    ) -> LimitedReceiver<SourceSenderItem> {
63        let log_definition = output.schema_definition.clone();
64        let output_id = OutputId {
65            component: component_key,
66            port: output.port.clone(),
67        };
68        match output.port {
69            None => {
70                let (output, rx) = Output::new_with_buffer(
71                    self.buf_size,
72                    DEFAULT_OUTPUT.to_owned(),
73                    self.output_metrics.clone(),
74                    log_definition,
75                    output_id,
76                    self.timeout,
77                    self.ewma_half_life_seconds,
78                );
79                self.default_output = Some(output);
80                rx
81            }
82            Some(name) => {
83                let (output, rx) = Output::new_with_buffer(
84                    self.buf_size,
85                    name.clone(),
86                    self.output_metrics.clone(),
87                    log_definition,
88                    output_id,
89                    self.timeout,
90                    self.ewma_half_life_seconds,
91                );
92                self.named_outputs.insert(name, output);
93                rx
94            }
95        }
96    }
97
98    pub fn build(self) -> SourceSender {
99        SourceSender {
100            default_output: self.default_output,
101            named_outputs: self.named_outputs,
102        }
103    }
104}