1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::{config::GlobalOptions, configurable::configurable_component};
5
6#[cfg(feature = "api")]
7use super::api;
8use super::{
9 BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config, EnrichmentTableOuter,
10 HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition, TransformOuter, compiler, schema,
11};
12use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
13
14#[configurable_component]
16#[derive(Clone, Debug, Default)]
17#[serde(deny_unknown_fields)]
18pub struct ConfigBuilder {
19 #[serde(flatten)]
20 pub global: GlobalOptions,
21
22 #[cfg(feature = "api")]
23 #[configurable(derived)]
24 #[serde(default)]
25 pub api: api::Options,
26
27 #[configurable(derived)]
28 #[serde(default)]
29 pub schema: schema::Options,
30
31 #[configurable(derived)]
32 #[serde(default)]
33 pub healthchecks: HealthcheckOptions,
34
35 #[configurable(metadata(docs::additional_props_description = "An enrichment table."))]
37 #[serde(default)]
38 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
39
40 #[configurable(metadata(docs::additional_props_description = "A source."))]
42 #[serde(default)]
43 pub sources: IndexMap<ComponentKey, SourceOuter>,
44
45 #[configurable(metadata(docs::additional_props_description = "A sink."))]
47 #[serde(default)]
48 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
49
50 #[configurable(metadata(docs::additional_props_description = "A transform."))]
52 #[serde(default)]
53 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
54
55 #[configurable(metadata(docs::hidden))]
57 #[serde(default)]
58 pub tests: Vec<TestDefinition<String>>,
59
60 #[configurable(metadata(docs::hidden))]
65 pub provider: Option<Providers>,
66
67 #[configurable(metadata(docs::additional_props_description = "A secret backend."))]
69 #[serde(default)]
70 pub secret: IndexMap<ComponentKey, SecretBackends>,
71
72 #[serde(default, skip)]
76 #[doc(hidden)]
77 pub graceful_shutdown_duration: Option<Duration>,
78
79 #[serde(default, skip)]
81 #[doc(hidden)]
82 pub allow_empty: bool,
83}
84
85impl From<Config> for ConfigBuilder {
86 fn from(config: Config) -> Self {
87 let Config {
88 global,
89 #[cfg(feature = "api")]
90 api,
91 schema,
92 healthchecks,
93 enrichment_tables,
94 sources,
95 sinks,
96 transforms,
97 tests,
98 secret,
99 graceful_shutdown_duration,
100 } = config;
101
102 let transforms = transforms
103 .into_iter()
104 .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
105 .collect();
106
107 let sinks = sinks
108 .into_iter()
109 .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
110 .collect();
111
112 let enrichment_tables = enrichment_tables
113 .into_iter()
114 .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
115 .collect();
116
117 let tests = tests.into_iter().map(TestDefinition::stringify).collect();
118
119 ConfigBuilder {
120 global,
121 #[cfg(feature = "api")]
122 api,
123 schema,
124 healthchecks,
125 enrichment_tables,
126 sources,
127 sinks,
128 transforms,
129 provider: None,
130 tests,
131 secret,
132 graceful_shutdown_duration,
133 allow_empty: false,
134 }
135 }
136}
137
138impl ConfigBuilder {
139 pub fn build(self) -> Result<Config, Vec<String>> {
140 let (config, warnings) = self.build_with_warnings()?;
141
142 for warning in warnings {
143 warn!("{}", warning);
144 }
145
146 Ok(config)
147 }
148
149 pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
150 compiler::compile(self)
151 }
152
153 pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
154 &mut self,
155 key: K,
156 inputs: &[&str],
157 enrichment_table: E,
158 ) {
159 let inputs = inputs
160 .iter()
161 .map(|value| value.to_string())
162 .collect::<Vec<_>>();
163 self.enrichment_tables.insert(
164 ComponentKey::from(key.into()),
165 EnrichmentTableOuter::new(inputs, enrichment_table),
166 );
167 }
168
169 pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
170 self.sources
171 .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
172 }
173
174 pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
175 &mut self,
176 key: K,
177 inputs: &[&str],
178 sink: S,
179 ) {
180 let inputs = inputs
181 .iter()
182 .map(|value| value.to_string())
183 .collect::<Vec<_>>();
184 let sink = SinkOuter::new(inputs, sink);
185 self.add_sink_outer(key, sink);
186 }
187
188 pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
189 self.sinks.insert(ComponentKey::from(key.into()), sink);
190 }
191
192 pub fn add_transform(
195 &mut self,
196 key: impl Into<String>,
197 inputs: &[&str],
198 transform: impl Into<BoxedTransform>,
199 ) {
200 let inputs = inputs
201 .iter()
202 .map(|value| value.to_string())
203 .collect::<Vec<_>>();
204 let transform = TransformOuter::new(inputs, transform);
205
206 self.transforms
207 .insert(ComponentKey::from(key.into()), transform);
208 }
209
210 pub fn set_data_dir(&mut self, path: &Path) {
211 self.global.data_dir = Some(path.to_owned());
212 }
213
214 pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
215 let mut errors = Vec::new();
216
217 #[cfg(feature = "api")]
218 if let Err(error) = self.api.merge(with.api) {
219 errors.push(error);
220 }
221
222 self.provider = with.provider;
223
224 match self.global.merge(with.global) {
225 Err(errs) => errors.extend(errs),
226 Ok(new_global) => self.global = new_global,
227 }
228
229 self.schema.append(with.schema, &mut errors);
230
231 self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
232
233 self.healthchecks.merge(with.healthchecks);
234
235 with.enrichment_tables.keys().for_each(|k| {
236 if self.enrichment_tables.contains_key(k) {
237 errors.push(format!("duplicate enrichment_table name found: {k}"));
238 }
239 });
240 with.sources.keys().for_each(|k| {
241 if self.sources.contains_key(k) {
242 errors.push(format!("duplicate source id found: {k}"));
243 }
244 });
245 with.sinks.keys().for_each(|k| {
246 if self.sinks.contains_key(k) {
247 errors.push(format!("duplicate sink id found: {k}"));
248 }
249 });
250 with.transforms.keys().for_each(|k| {
251 if self.transforms.contains_key(k) {
252 errors.push(format!("duplicate transform id found: {k}"));
253 }
254 });
255 with.tests.iter().for_each(|wt| {
256 if self.tests.iter().any(|t| t.name == wt.name) {
257 errors.push(format!("duplicate test name found: {}", wt.name));
258 }
259 });
260 with.secret.keys().for_each(|k| {
261 if self.secret.contains_key(k) {
262 errors.push(format!("duplicate secret id found: {k}"));
263 }
264 });
265 if !errors.is_empty() {
266 return Err(errors);
267 }
268
269 self.enrichment_tables.extend(with.enrichment_tables);
270 self.sources.extend(with.sources);
271 self.sinks.extend(with.sinks);
272 self.transforms.extend(with.transforms);
273 self.tests.extend(with.tests);
274 self.secret.extend(with.secret);
275
276 Ok(())
277 }
278
279 #[cfg(test)]
280 pub fn from_toml(input: &str) -> Self {
281 crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
282 }
283
284 #[cfg(test)]
285 pub fn from_json(input: &str) -> Self {
286 crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
287 }
288}