vector/config/
builder.rs

1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::{config::GlobalOptions, configurable::configurable_component};
5
6#[cfg(feature = "api")]
7use super::api;
8use super::{
9    BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config, EnrichmentTableOuter,
10    HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition, TransformOuter, compiler, schema,
11};
12use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
13
14/// A complete Vector configuration.
15#[configurable_component]
16#[derive(Clone, Debug, Default)]
17#[serde(deny_unknown_fields)]
18pub struct ConfigBuilder {
19    #[serde(flatten)]
20    pub global: GlobalOptions,
21
22    #[cfg(feature = "api")]
23    #[configurable(derived)]
24    #[serde(default)]
25    pub api: api::Options,
26
27    #[configurable(derived)]
28    #[serde(default)]
29    pub schema: schema::Options,
30
31    #[configurable(derived)]
32    #[serde(default)]
33    pub healthchecks: HealthcheckOptions,
34
35    /// All configured enrichment tables.
36    #[configurable(metadata(docs::additional_props_description = "An enrichment table."))]
37    #[serde(default)]
38    pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
39
40    /// All configured sources.
41    #[configurable(metadata(docs::additional_props_description = "A source."))]
42    #[serde(default)]
43    pub sources: IndexMap<ComponentKey, SourceOuter>,
44
45    /// All configured sinks.
46    #[configurable(metadata(docs::additional_props_description = "A sink."))]
47    #[serde(default)]
48    pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
49
50    /// All configured transforms.
51    #[configurable(metadata(docs::additional_props_description = "A transform."))]
52    #[serde(default)]
53    pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
54
55    /// All configured unit tests.
56    #[configurable(metadata(docs::hidden))]
57    #[serde(default)]
58    pub tests: Vec<TestDefinition<String>>,
59
60    /// Optional configuration provider to use.
61    ///
62    /// Configuration providers allow sourcing configuration information from a source other than
63    /// the typical configuration files that must be passed to Vector.
64    #[configurable(metadata(docs::hidden))]
65    pub provider: Option<Providers>,
66
67    /// All configured secrets backends.
68    #[configurable(metadata(docs::additional_props_description = "A secret backend."))]
69    #[serde(default)]
70    pub secret: IndexMap<ComponentKey, SecretBackends>,
71
72    /// The duration in seconds to wait for graceful shutdown after SIGINT or SIGTERM are received.
73    /// After the duration has passed, Vector will force shutdown. Default value is 60 seconds. This
74    /// value can be set using a [cli arg](crate::cli::RootOpts::graceful_shutdown_limit_secs).
75    #[serde(default, skip)]
76    #[doc(hidden)]
77    pub graceful_shutdown_duration: Option<Duration>,
78
79    /// Allow the configuration to be empty, resulting in a topology with no components.
80    #[serde(default, skip)]
81    #[doc(hidden)]
82    pub allow_empty: bool,
83}
84
85impl From<Config> for ConfigBuilder {
86    fn from(config: Config) -> Self {
87        let Config {
88            global,
89            #[cfg(feature = "api")]
90            api,
91            schema,
92            healthchecks,
93            enrichment_tables,
94            sources,
95            sinks,
96            transforms,
97            tests,
98            secret,
99            graceful_shutdown_duration,
100        } = config;
101
102        let transforms = transforms
103            .into_iter()
104            .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
105            .collect();
106
107        let sinks = sinks
108            .into_iter()
109            .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
110            .collect();
111
112        let enrichment_tables = enrichment_tables
113            .into_iter()
114            .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
115            .collect();
116
117        let tests = tests.into_iter().map(TestDefinition::stringify).collect();
118
119        ConfigBuilder {
120            global,
121            #[cfg(feature = "api")]
122            api,
123            schema,
124            healthchecks,
125            enrichment_tables,
126            sources,
127            sinks,
128            transforms,
129            provider: None,
130            tests,
131            secret,
132            graceful_shutdown_duration,
133            allow_empty: false,
134        }
135    }
136}
137
138impl ConfigBuilder {
139    pub fn build(self) -> Result<Config, Vec<String>> {
140        let (config, warnings) = self.build_with_warnings()?;
141
142        for warning in warnings {
143            warn!("{}", warning);
144        }
145
146        Ok(config)
147    }
148
149    pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
150        compiler::compile(self)
151    }
152
153    pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
154        &mut self,
155        key: K,
156        inputs: &[&str],
157        enrichment_table: E,
158    ) {
159        let inputs = inputs
160            .iter()
161            .map(|value| value.to_string())
162            .collect::<Vec<_>>();
163        self.enrichment_tables.insert(
164            ComponentKey::from(key.into()),
165            EnrichmentTableOuter::new(inputs, enrichment_table),
166        );
167    }
168
169    pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
170        self.sources
171            .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
172    }
173
174    pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
175        &mut self,
176        key: K,
177        inputs: &[&str],
178        sink: S,
179    ) {
180        let inputs = inputs
181            .iter()
182            .map(|value| value.to_string())
183            .collect::<Vec<_>>();
184        let sink = SinkOuter::new(inputs, sink);
185        self.add_sink_outer(key, sink);
186    }
187
188    pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
189        self.sinks.insert(ComponentKey::from(key.into()), sink);
190    }
191
192    // For some feature sets, no transforms are compiled, which leads to no callers using this
193    // method, and in turn, annoying errors about unused variables.
194    pub fn add_transform(
195        &mut self,
196        key: impl Into<String>,
197        inputs: &[&str],
198        transform: impl Into<BoxedTransform>,
199    ) {
200        let inputs = inputs
201            .iter()
202            .map(|value| value.to_string())
203            .collect::<Vec<_>>();
204        let transform = TransformOuter::new(inputs, transform);
205
206        self.transforms
207            .insert(ComponentKey::from(key.into()), transform);
208    }
209
210    pub fn set_data_dir(&mut self, path: &Path) {
211        self.global.data_dir = Some(path.to_owned());
212    }
213
214    pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
215        let mut errors = Vec::new();
216
217        #[cfg(feature = "api")]
218        if let Err(error) = self.api.merge(with.api) {
219            errors.push(error);
220        }
221
222        self.provider = with.provider;
223
224        match self.global.merge(with.global) {
225            Err(errs) => errors.extend(errs),
226            Ok(new_global) => self.global = new_global,
227        }
228
229        self.schema.append(with.schema, &mut errors);
230
231        self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
232
233        self.healthchecks.merge(with.healthchecks);
234
235        with.enrichment_tables.keys().for_each(|k| {
236            if self.enrichment_tables.contains_key(k) {
237                errors.push(format!("duplicate enrichment_table name found: {k}"));
238            }
239        });
240        with.sources.keys().for_each(|k| {
241            if self.sources.contains_key(k) {
242                errors.push(format!("duplicate source id found: {k}"));
243            }
244        });
245        with.sinks.keys().for_each(|k| {
246            if self.sinks.contains_key(k) {
247                errors.push(format!("duplicate sink id found: {k}"));
248            }
249        });
250        with.transforms.keys().for_each(|k| {
251            if self.transforms.contains_key(k) {
252                errors.push(format!("duplicate transform id found: {k}"));
253            }
254        });
255        with.tests.iter().for_each(|wt| {
256            if self.tests.iter().any(|t| t.name == wt.name) {
257                errors.push(format!("duplicate test name found: {}", wt.name));
258            }
259        });
260        with.secret.keys().for_each(|k| {
261            if self.secret.contains_key(k) {
262                errors.push(format!("duplicate secret id found: {k}"));
263            }
264        });
265        if !errors.is_empty() {
266            return Err(errors);
267        }
268
269        self.enrichment_tables.extend(with.enrichment_tables);
270        self.sources.extend(with.sources);
271        self.sinks.extend(with.sinks);
272        self.transforms.extend(with.transforms);
273        self.tests.extend(with.tests);
274        self.secret.extend(with.secret);
275
276        Ok(())
277    }
278
279    #[cfg(test)]
280    pub fn from_toml(input: &str) -> Self {
281        crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
282    }
283
284    #[cfg(test)]
285    pub fn from_json(input: &str) -> Self {
286        crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
287    }
288}