1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::{config::GlobalOptions, configurable::configurable_component};
5
6#[cfg(feature = "api")]
7use super::api;
8use super::{
9    BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config, EnrichmentTableOuter,
10    HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition, TransformOuter, compiler, schema,
11};
12use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
13
14#[configurable_component]
16#[derive(Clone, Debug, Default)]
17#[serde(deny_unknown_fields)]
18pub struct ConfigBuilder {
19    #[serde(flatten)]
20    pub global: GlobalOptions,
21
22    #[cfg(feature = "api")]
23    #[configurable(derived)]
24    #[serde(default)]
25    pub api: api::Options,
26
27    #[configurable(derived)]
28    #[configurable(metadata(docs::hidden))]
29    #[serde(default)]
30    pub schema: schema::Options,
31
32    #[configurable(derived)]
33    #[serde(default)]
34    pub healthchecks: HealthcheckOptions,
35
36    #[serde(default)]
38    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
39
40    #[serde(default)]
42    pub sources: IndexMap<ComponentKey, SourceOuter>,
43
44    #[serde(default)]
46    pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
47
48    #[serde(default)]
50    pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
51
52    #[serde(default)]
54    pub tests: Vec<TestDefinition<String>>,
55
56    pub provider: Option<Providers>,
61
62    #[serde(default)]
64    pub secret: IndexMap<ComponentKey, SecretBackends>,
65
66    #[serde(default, skip)]
70    #[doc(hidden)]
71    pub graceful_shutdown_duration: Option<Duration>,
72
73    #[serde(default, skip)]
75    #[doc(hidden)]
76    pub allow_empty: bool,
77}
78
79impl From<Config> for ConfigBuilder {
80    fn from(config: Config) -> Self {
81        let Config {
82            global,
83            #[cfg(feature = "api")]
84            api,
85            schema,
86            healthchecks,
87            enrichment_tables,
88            sources,
89            sinks,
90            transforms,
91            tests,
92            secret,
93            graceful_shutdown_duration,
94        } = config;
95
96        let transforms = transforms
97            .into_iter()
98            .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
99            .collect();
100
101        let sinks = sinks
102            .into_iter()
103            .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
104            .collect();
105
106        let enrichment_tables = enrichment_tables
107            .into_iter()
108            .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
109            .collect();
110
111        let tests = tests.into_iter().map(TestDefinition::stringify).collect();
112
113        ConfigBuilder {
114            global,
115            #[cfg(feature = "api")]
116            api,
117            schema,
118            healthchecks,
119            enrichment_tables,
120            sources,
121            sinks,
122            transforms,
123            provider: None,
124            tests,
125            secret,
126            graceful_shutdown_duration,
127            allow_empty: false,
128        }
129    }
130}
131
132impl ConfigBuilder {
133    pub fn build(self) -> Result<Config, Vec<String>> {
134        let (config, warnings) = self.build_with_warnings()?;
135
136        for warning in warnings {
137            warn!("{}", warning);
138        }
139
140        Ok(config)
141    }
142
143    pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
144        compiler::compile(self)
145    }
146
147    pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
148        &mut self,
149        key: K,
150        inputs: &[&str],
151        enrichment_table: E,
152    ) {
153        let inputs = inputs
154            .iter()
155            .map(|value| value.to_string())
156            .collect::<Vec<_>>();
157        self.enrichment_tables.insert(
158            ComponentKey::from(key.into()),
159            EnrichmentTableOuter::new(inputs, enrichment_table),
160        );
161    }
162
163    pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
164        self.sources
165            .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
166    }
167
168    pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
169        &mut self,
170        key: K,
171        inputs: &[&str],
172        sink: S,
173    ) {
174        let inputs = inputs
175            .iter()
176            .map(|value| value.to_string())
177            .collect::<Vec<_>>();
178        let sink = SinkOuter::new(inputs, sink);
179        self.add_sink_outer(key, sink);
180    }
181
182    pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
183        self.sinks.insert(ComponentKey::from(key.into()), sink);
184    }
185
186    pub fn add_transform(
189        &mut self,
190        key: impl Into<String>,
191        inputs: &[&str],
192        transform: impl Into<BoxedTransform>,
193    ) {
194        let inputs = inputs
195            .iter()
196            .map(|value| value.to_string())
197            .collect::<Vec<_>>();
198        let transform = TransformOuter::new(inputs, transform);
199
200        self.transforms
201            .insert(ComponentKey::from(key.into()), transform);
202    }
203
204    pub fn set_data_dir(&mut self, path: &Path) {
205        self.global.data_dir = Some(path.to_owned());
206    }
207
208    pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
209        let mut errors = Vec::new();
210
211        #[cfg(feature = "api")]
212        if let Err(error) = self.api.merge(with.api) {
213            errors.push(error);
214        }
215
216        self.provider = with.provider;
217
218        match self.global.merge(with.global) {
219            Err(errs) => errors.extend(errs),
220            Ok(new_global) => self.global = new_global,
221        }
222
223        self.schema.append(with.schema, &mut errors);
224
225        self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
226
227        self.healthchecks.merge(with.healthchecks);
228
229        with.enrichment_tables.keys().for_each(|k| {
230            if self.enrichment_tables.contains_key(k) {
231                errors.push(format!("duplicate enrichment_table name found: {k}"));
232            }
233        });
234        with.sources.keys().for_each(|k| {
235            if self.sources.contains_key(k) {
236                errors.push(format!("duplicate source id found: {k}"));
237            }
238        });
239        with.sinks.keys().for_each(|k| {
240            if self.sinks.contains_key(k) {
241                errors.push(format!("duplicate sink id found: {k}"));
242            }
243        });
244        with.transforms.keys().for_each(|k| {
245            if self.transforms.contains_key(k) {
246                errors.push(format!("duplicate transform id found: {k}"));
247            }
248        });
249        with.tests.iter().for_each(|wt| {
250            if self.tests.iter().any(|t| t.name == wt.name) {
251                errors.push(format!("duplicate test name found: {}", wt.name));
252            }
253        });
254        with.secret.keys().for_each(|k| {
255            if self.secret.contains_key(k) {
256                errors.push(format!("duplicate secret id found: {k}"));
257            }
258        });
259        if !errors.is_empty() {
260            return Err(errors);
261        }
262
263        self.enrichment_tables.extend(with.enrichment_tables);
264        self.sources.extend(with.sources);
265        self.sinks.extend(with.sinks);
266        self.transforms.extend(with.transforms);
267        self.tests.extend(with.tests);
268        self.secret.extend(with.secret);
269
270        Ok(())
271    }
272
273    #[cfg(test)]
274    pub fn from_toml(input: &str) -> Self {
275        crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
276    }
277
278    #[cfg(test)]
279    pub fn from_json(input: &str) -> Self {
280        crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
281    }
282}