vector/config/
builder.rs

1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::config::GlobalOptions;
5use vector_lib::configurable::configurable_component;
6
7use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
8
9#[cfg(feature = "api")]
10use super::api;
11use super::{
12    compiler, schema, BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config,
13    EnrichmentTableOuter, HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition,
14    TransformOuter,
15};
16
17/// A complete Vector configuration.
18#[configurable_component]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct ConfigBuilder {
22    #[serde(flatten)]
23    pub global: GlobalOptions,
24
25    #[cfg(feature = "api")]
26    #[configurable(derived)]
27    #[serde(default)]
28    pub api: api::Options,
29
30    #[configurable(derived)]
31    #[configurable(metadata(docs::hidden))]
32    #[serde(default)]
33    pub schema: schema::Options,
34
35    #[configurable(derived)]
36    #[serde(default)]
37    pub healthchecks: HealthcheckOptions,
38
39    /// All configured enrichment tables.
40    #[serde(default)]
41    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
42
43    /// All configured sources.
44    #[serde(default)]
45    pub sources: IndexMap<ComponentKey, SourceOuter>,
46
47    /// All configured sinks.
48    #[serde(default)]
49    pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
50
51    /// All configured transforms.
52    #[serde(default)]
53    pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
54
55    /// All configured unit tests.
56    #[serde(default)]
57    pub tests: Vec<TestDefinition<String>>,
58
59    /// Optional configuration provider to use.
60    ///
61    /// Configuration providers allow sourcing configuration information from a source other than
62    /// the typical configuration files that must be passed to Vector.
63    pub provider: Option<Providers>,
64
65    /// All configured secrets backends.
66    #[serde(default)]
67    pub secret: IndexMap<ComponentKey, SecretBackends>,
68
69    /// The duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
70    /// After the duration has passed, Vector will force shutdown. Default value is 60 seconds. This
71    /// value can be set using a [cli arg](crate::cli::RootOpts::graceful_shutdown_limit_secs).
72    #[serde(default, skip)]
73    #[doc(hidden)]
74    pub graceful_shutdown_duration: Option<Duration>,
75
76    /// Allow the configuration to be empty, resulting in a topology with no components.
77    #[serde(default, skip)]
78    #[doc(hidden)]
79    pub allow_empty: bool,
80}
81
82impl From<Config> for ConfigBuilder {
83    fn from(config: Config) -> Self {
84        let Config {
85            global,
86            #[cfg(feature = "api")]
87            api,
88            schema,
89            healthchecks,
90            enrichment_tables,
91            sources,
92            sinks,
93            transforms,
94            tests,
95            secret,
96            graceful_shutdown_duration,
97        } = config;
98
99        let transforms = transforms
100            .into_iter()
101            .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
102            .collect();
103
104        let sinks = sinks
105            .into_iter()
106            .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
107            .collect();
108
109        let enrichment_tables = enrichment_tables
110            .into_iter()
111            .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
112            .collect();
113
114        let tests = tests.into_iter().map(TestDefinition::stringify).collect();
115
116        ConfigBuilder {
117            global,
118            #[cfg(feature = "api")]
119            api,
120            schema,
121            healthchecks,
122            enrichment_tables,
123            sources,
124            sinks,
125            transforms,
126            provider: None,
127            tests,
128            secret,
129            graceful_shutdown_duration,
130            allow_empty: false,
131        }
132    }
133}
134
135impl ConfigBuilder {
136    pub fn build(self) -> Result<Config, Vec<String>> {
137        let (config, warnings) = self.build_with_warnings()?;
138
139        for warning in warnings {
140            warn!("{}", warning);
141        }
142
143        Ok(config)
144    }
145
146    pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
147        compiler::compile(self)
148    }
149
150    pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
151        &mut self,
152        key: K,
153        inputs: &[&str],
154        enrichment_table: E,
155    ) {
156        let inputs = inputs
157            .iter()
158            .map(|value| value.to_string())
159            .collect::<Vec<_>>();
160        self.enrichment_tables.insert(
161            ComponentKey::from(key.into()),
162            EnrichmentTableOuter::new(inputs, enrichment_table),
163        );
164    }
165
166    pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
167        self.sources
168            .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
169    }
170
171    pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
172        &mut self,
173        key: K,
174        inputs: &[&str],
175        sink: S,
176    ) {
177        let inputs = inputs
178            .iter()
179            .map(|value| value.to_string())
180            .collect::<Vec<_>>();
181        let sink = SinkOuter::new(inputs, sink);
182        self.add_sink_outer(key, sink);
183    }
184
185    pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
186        self.sinks.insert(ComponentKey::from(key.into()), sink);
187    }
188
189    // For some feature sets, no transforms are compiled, which leads to no callers using this
190    // method, and in turn, annoying errors about unused variables.
191    pub fn add_transform(
192        &mut self,
193        key: impl Into<String>,
194        inputs: &[&str],
195        transform: impl Into<BoxedTransform>,
196    ) {
197        let inputs = inputs
198            .iter()
199            .map(|value| value.to_string())
200            .collect::<Vec<_>>();
201        let transform = TransformOuter::new(inputs, transform);
202
203        self.transforms
204            .insert(ComponentKey::from(key.into()), transform);
205    }
206
207    pub fn set_data_dir(&mut self, path: &Path) {
208        self.global.data_dir = Some(path.to_owned());
209    }
210
211    pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
212        let mut errors = Vec::new();
213
214        #[cfg(feature = "api")]
215        if let Err(error) = self.api.merge(with.api) {
216            errors.push(error);
217        }
218
219        self.provider = with.provider;
220
221        match self.global.merge(with.global) {
222            Err(errs) => errors.extend(errs),
223            Ok(new_global) => self.global = new_global,
224        }
225
226        self.schema.append(with.schema, &mut errors);
227
228        self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
229
230        self.healthchecks.merge(with.healthchecks);
231
232        with.enrichment_tables.keys().for_each(|k| {
233            if self.enrichment_tables.contains_key(k) {
234                errors.push(format!("duplicate enrichment_table name found: {k}"));
235            }
236        });
237        with.sources.keys().for_each(|k| {
238            if self.sources.contains_key(k) {
239                errors.push(format!("duplicate source id found: {k}"));
240            }
241        });
242        with.sinks.keys().for_each(|k| {
243            if self.sinks.contains_key(k) {
244                errors.push(format!("duplicate sink id found: {k}"));
245            }
246        });
247        with.transforms.keys().for_each(|k| {
248            if self.transforms.contains_key(k) {
249                errors.push(format!("duplicate transform id found: {k}"));
250            }
251        });
252        with.tests.iter().for_each(|wt| {
253            if self.tests.iter().any(|t| t.name == wt.name) {
254                errors.push(format!("duplicate test name found: {}", wt.name));
255            }
256        });
257        with.secret.keys().for_each(|k| {
258            if self.secret.contains_key(k) {
259                errors.push(format!("duplicate secret id found: {k}"));
260            }
261        });
262        if !errors.is_empty() {
263            return Err(errors);
264        }
265
266        self.enrichment_tables.extend(with.enrichment_tables);
267        self.sources.extend(with.sources);
268        self.sinks.extend(with.sinks);
269        self.transforms.extend(with.transforms);
270        self.tests.extend(with.tests);
271        self.secret.extend(with.secret);
272
273        Ok(())
274    }
275
276    #[cfg(test)]
277    pub fn from_toml(input: &str) -> Self {
278        crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
279    }
280
281    #[cfg(test)]
282    pub fn from_json(input: &str) -> Self {
283        crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
284    }
285}