1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::config::GlobalOptions;
5use vector_lib::configurable::configurable_component;
6
7use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
8
9#[cfg(feature = "api")]
10use super::api;
11use super::{
12 compiler, schema, BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config,
13 EnrichmentTableOuter, HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition,
14 TransformOuter,
15};
16
17#[configurable_component]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct ConfigBuilder {
22 #[serde(flatten)]
23 pub global: GlobalOptions,
24
25 #[cfg(feature = "api")]
26 #[configurable(derived)]
27 #[serde(default)]
28 pub api: api::Options,
29
30 #[configurable(derived)]
31 #[configurable(metadata(docs::hidden))]
32 #[serde(default)]
33 pub schema: schema::Options,
34
35 #[configurable(derived)]
36 #[serde(default)]
37 pub healthchecks: HealthcheckOptions,
38
39 #[serde(default)]
41 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
42
43 #[serde(default)]
45 pub sources: IndexMap<ComponentKey, SourceOuter>,
46
47 #[serde(default)]
49 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
50
51 #[serde(default)]
53 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
54
55 #[serde(default)]
57 pub tests: Vec<TestDefinition<String>>,
58
59 pub provider: Option<Providers>,
64
65 #[serde(default)]
67 pub secret: IndexMap<ComponentKey, SecretBackends>,
68
69 #[serde(default, skip)]
73 #[doc(hidden)]
74 pub graceful_shutdown_duration: Option<Duration>,
75
76 #[serde(default, skip)]
78 #[doc(hidden)]
79 pub allow_empty: bool,
80}
81
82impl From<Config> for ConfigBuilder {
83 fn from(config: Config) -> Self {
84 let Config {
85 global,
86 #[cfg(feature = "api")]
87 api,
88 schema,
89 healthchecks,
90 enrichment_tables,
91 sources,
92 sinks,
93 transforms,
94 tests,
95 secret,
96 graceful_shutdown_duration,
97 } = config;
98
99 let transforms = transforms
100 .into_iter()
101 .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
102 .collect();
103
104 let sinks = sinks
105 .into_iter()
106 .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
107 .collect();
108
109 let enrichment_tables = enrichment_tables
110 .into_iter()
111 .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
112 .collect();
113
114 let tests = tests.into_iter().map(TestDefinition::stringify).collect();
115
116 ConfigBuilder {
117 global,
118 #[cfg(feature = "api")]
119 api,
120 schema,
121 healthchecks,
122 enrichment_tables,
123 sources,
124 sinks,
125 transforms,
126 provider: None,
127 tests,
128 secret,
129 graceful_shutdown_duration,
130 allow_empty: false,
131 }
132 }
133}
134
135impl ConfigBuilder {
136 pub fn build(self) -> Result<Config, Vec<String>> {
137 let (config, warnings) = self.build_with_warnings()?;
138
139 for warning in warnings {
140 warn!("{}", warning);
141 }
142
143 Ok(config)
144 }
145
146 pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
147 compiler::compile(self)
148 }
149
150 pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
151 &mut self,
152 key: K,
153 inputs: &[&str],
154 enrichment_table: E,
155 ) {
156 let inputs = inputs
157 .iter()
158 .map(|value| value.to_string())
159 .collect::<Vec<_>>();
160 self.enrichment_tables.insert(
161 ComponentKey::from(key.into()),
162 EnrichmentTableOuter::new(inputs, enrichment_table),
163 );
164 }
165
166 pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
167 self.sources
168 .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
169 }
170
171 pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
172 &mut self,
173 key: K,
174 inputs: &[&str],
175 sink: S,
176 ) {
177 let inputs = inputs
178 .iter()
179 .map(|value| value.to_string())
180 .collect::<Vec<_>>();
181 let sink = SinkOuter::new(inputs, sink);
182 self.add_sink_outer(key, sink);
183 }
184
185 pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
186 self.sinks.insert(ComponentKey::from(key.into()), sink);
187 }
188
189 pub fn add_transform(
192 &mut self,
193 key: impl Into<String>,
194 inputs: &[&str],
195 transform: impl Into<BoxedTransform>,
196 ) {
197 let inputs = inputs
198 .iter()
199 .map(|value| value.to_string())
200 .collect::<Vec<_>>();
201 let transform = TransformOuter::new(inputs, transform);
202
203 self.transforms
204 .insert(ComponentKey::from(key.into()), transform);
205 }
206
207 pub fn set_data_dir(&mut self, path: &Path) {
208 self.global.data_dir = Some(path.to_owned());
209 }
210
211 pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
212 let mut errors = Vec::new();
213
214 #[cfg(feature = "api")]
215 if let Err(error) = self.api.merge(with.api) {
216 errors.push(error);
217 }
218
219 self.provider = with.provider;
220
221 match self.global.merge(with.global) {
222 Err(errs) => errors.extend(errs),
223 Ok(new_global) => self.global = new_global,
224 }
225
226 self.schema.append(with.schema, &mut errors);
227
228 self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
229
230 self.healthchecks.merge(with.healthchecks);
231
232 with.enrichment_tables.keys().for_each(|k| {
233 if self.enrichment_tables.contains_key(k) {
234 errors.push(format!("duplicate enrichment_table name found: {k}"));
235 }
236 });
237 with.sources.keys().for_each(|k| {
238 if self.sources.contains_key(k) {
239 errors.push(format!("duplicate source id found: {k}"));
240 }
241 });
242 with.sinks.keys().for_each(|k| {
243 if self.sinks.contains_key(k) {
244 errors.push(format!("duplicate sink id found: {k}"));
245 }
246 });
247 with.transforms.keys().for_each(|k| {
248 if self.transforms.contains_key(k) {
249 errors.push(format!("duplicate transform id found: {k}"));
250 }
251 });
252 with.tests.iter().for_each(|wt| {
253 if self.tests.iter().any(|t| t.name == wt.name) {
254 errors.push(format!("duplicate test name found: {}", wt.name));
255 }
256 });
257 with.secret.keys().for_each(|k| {
258 if self.secret.contains_key(k) {
259 errors.push(format!("duplicate secret id found: {k}"));
260 }
261 });
262 if !errors.is_empty() {
263 return Err(errors);
264 }
265
266 self.enrichment_tables.extend(with.enrichment_tables);
267 self.sources.extend(with.sources);
268 self.sinks.extend(with.sinks);
269 self.transforms.extend(with.transforms);
270 self.tests.extend(with.tests);
271 self.secret.extend(with.secret);
272
273 Ok(())
274 }
275
276 #[cfg(test)]
277 pub fn from_toml(input: &str) -> Self {
278 crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
279 }
280
281 #[cfg(test)]
282 pub fn from_json(input: &str) -> Self {
283 crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
284 }
285}