1use std::{path::Path, time::Duration};
2
3use indexmap::IndexMap;
4use vector_lib::{config::GlobalOptions, configurable::configurable_component};
5
6#[cfg(feature = "api")]
7use super::api;
8use super::{
9 BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config, EnrichmentTableOuter,
10 HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition, TransformOuter, compiler, schema,
11};
12use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};
13
14#[configurable_component]
16#[derive(Clone, Debug, Default)]
17#[serde(deny_unknown_fields)]
18pub struct ConfigBuilder {
19 #[serde(flatten)]
20 pub global: GlobalOptions,
21
22 #[cfg(feature = "api")]
23 #[configurable(derived)]
24 #[serde(default)]
25 pub api: api::Options,
26
27 #[configurable(derived)]
28 #[configurable(metadata(docs::hidden))]
29 #[serde(default)]
30 pub schema: schema::Options,
31
32 #[configurable(derived)]
33 #[serde(default)]
34 pub healthchecks: HealthcheckOptions,
35
36 #[serde(default)]
38 pub enrichment_tables: IndexMap<ComponentKey, EnrichmentTableOuter<String>>,
39
40 #[serde(default)]
42 pub sources: IndexMap<ComponentKey, SourceOuter>,
43
44 #[serde(default)]
46 pub sinks: IndexMap<ComponentKey, SinkOuter<String>>,
47
48 #[serde(default)]
50 pub transforms: IndexMap<ComponentKey, TransformOuter<String>>,
51
52 #[serde(default)]
54 pub tests: Vec<TestDefinition<String>>,
55
56 pub provider: Option<Providers>,
61
62 #[serde(default)]
64 pub secret: IndexMap<ComponentKey, SecretBackends>,
65
66 #[serde(default, skip)]
70 #[doc(hidden)]
71 pub graceful_shutdown_duration: Option<Duration>,
72
73 #[serde(default, skip)]
75 #[doc(hidden)]
76 pub allow_empty: bool,
77}
78
79impl From<Config> for ConfigBuilder {
80 fn from(config: Config) -> Self {
81 let Config {
82 global,
83 #[cfg(feature = "api")]
84 api,
85 schema,
86 healthchecks,
87 enrichment_tables,
88 sources,
89 sinks,
90 transforms,
91 tests,
92 secret,
93 graceful_shutdown_duration,
94 } = config;
95
96 let transforms = transforms
97 .into_iter()
98 .map(|(key, transform)| (key, transform.map_inputs(ToString::to_string)))
99 .collect();
100
101 let sinks = sinks
102 .into_iter()
103 .map(|(key, sink)| (key, sink.map_inputs(ToString::to_string)))
104 .collect();
105
106 let enrichment_tables = enrichment_tables
107 .into_iter()
108 .map(|(key, table)| (key, table.map_inputs(ToString::to_string)))
109 .collect();
110
111 let tests = tests.into_iter().map(TestDefinition::stringify).collect();
112
113 ConfigBuilder {
114 global,
115 #[cfg(feature = "api")]
116 api,
117 schema,
118 healthchecks,
119 enrichment_tables,
120 sources,
121 sinks,
122 transforms,
123 provider: None,
124 tests,
125 secret,
126 graceful_shutdown_duration,
127 allow_empty: false,
128 }
129 }
130}
131
132impl ConfigBuilder {
133 pub fn build(self) -> Result<Config, Vec<String>> {
134 let (config, warnings) = self.build_with_warnings()?;
135
136 for warning in warnings {
137 warn!("{}", warning);
138 }
139
140 Ok(config)
141 }
142
143 pub fn build_with_warnings(self) -> Result<(Config, Vec<String>), Vec<String>> {
144 compiler::compile(self)
145 }
146
147 pub fn add_enrichment_table<K: Into<String>, E: Into<EnrichmentTables>>(
148 &mut self,
149 key: K,
150 inputs: &[&str],
151 enrichment_table: E,
152 ) {
153 let inputs = inputs
154 .iter()
155 .map(|value| value.to_string())
156 .collect::<Vec<_>>();
157 self.enrichment_tables.insert(
158 ComponentKey::from(key.into()),
159 EnrichmentTableOuter::new(inputs, enrichment_table),
160 );
161 }
162
163 pub fn add_source<K: Into<String>, S: Into<BoxedSource>>(&mut self, key: K, source: S) {
164 self.sources
165 .insert(ComponentKey::from(key.into()), SourceOuter::new(source));
166 }
167
168 pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
169 &mut self,
170 key: K,
171 inputs: &[&str],
172 sink: S,
173 ) {
174 let inputs = inputs
175 .iter()
176 .map(|value| value.to_string())
177 .collect::<Vec<_>>();
178 let sink = SinkOuter::new(inputs, sink);
179 self.add_sink_outer(key, sink);
180 }
181
182 pub fn add_sink_outer<K: Into<String>>(&mut self, key: K, sink: SinkOuter<String>) {
183 self.sinks.insert(ComponentKey::from(key.into()), sink);
184 }
185
186 pub fn add_transform(
189 &mut self,
190 key: impl Into<String>,
191 inputs: &[&str],
192 transform: impl Into<BoxedTransform>,
193 ) {
194 let inputs = inputs
195 .iter()
196 .map(|value| value.to_string())
197 .collect::<Vec<_>>();
198 let transform = TransformOuter::new(inputs, transform);
199
200 self.transforms
201 .insert(ComponentKey::from(key.into()), transform);
202 }
203
204 pub fn set_data_dir(&mut self, path: &Path) {
205 self.global.data_dir = Some(path.to_owned());
206 }
207
208 pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
209 let mut errors = Vec::new();
210
211 #[cfg(feature = "api")]
212 if let Err(error) = self.api.merge(with.api) {
213 errors.push(error);
214 }
215
216 self.provider = with.provider;
217
218 match self.global.merge(with.global) {
219 Err(errs) => errors.extend(errs),
220 Ok(new_global) => self.global = new_global,
221 }
222
223 self.schema.append(with.schema, &mut errors);
224
225 self.schema.log_namespace = self.schema.log_namespace.or(with.schema.log_namespace);
226
227 self.healthchecks.merge(with.healthchecks);
228
229 with.enrichment_tables.keys().for_each(|k| {
230 if self.enrichment_tables.contains_key(k) {
231 errors.push(format!("duplicate enrichment_table name found: {k}"));
232 }
233 });
234 with.sources.keys().for_each(|k| {
235 if self.sources.contains_key(k) {
236 errors.push(format!("duplicate source id found: {k}"));
237 }
238 });
239 with.sinks.keys().for_each(|k| {
240 if self.sinks.contains_key(k) {
241 errors.push(format!("duplicate sink id found: {k}"));
242 }
243 });
244 with.transforms.keys().for_each(|k| {
245 if self.transforms.contains_key(k) {
246 errors.push(format!("duplicate transform id found: {k}"));
247 }
248 });
249 with.tests.iter().for_each(|wt| {
250 if self.tests.iter().any(|t| t.name == wt.name) {
251 errors.push(format!("duplicate test name found: {}", wt.name));
252 }
253 });
254 with.secret.keys().for_each(|k| {
255 if self.secret.contains_key(k) {
256 errors.push(format!("duplicate secret id found: {k}"));
257 }
258 });
259 if !errors.is_empty() {
260 return Err(errors);
261 }
262
263 self.enrichment_tables.extend(with.enrichment_tables);
264 self.sources.extend(with.sources);
265 self.sinks.extend(with.sinks);
266 self.transforms.extend(with.transforms);
267 self.tests.extend(with.tests);
268 self.secret.extend(with.secret);
269
270 Ok(())
271 }
272
273 #[cfg(test)]
274 pub fn from_toml(input: &str) -> Self {
275 crate::config::format::deserialize(input, crate::config::format::Format::Toml).unwrap()
276 }
277
278 #[cfg(test)]
279 pub fn from_json(input: &str) -> Self {
280 crate::config::format::deserialize(input, crate::config::format::Format::Json).unwrap()
281 }
282}