1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::super::default_data_dir;
8use super::metrics_expiration::PerMetricSetExpiration;
9use super::Telemetry;
10use super::{proxy::ProxyConfig, AcknowledgementsConfig, LogSchema};
11use crate::serde::bool_or_struct;
12
13#[derive(Debug, Snafu)]
14pub(crate) enum DataDirError {
15 #[snafu(display("data_dir option required, but not given here or globally"))]
16 MissingDataDir,
17 #[snafu(display("data_dir {:?} does not exist", data_dir))]
18 DoesNotExist { data_dir: PathBuf },
19 #[snafu(display("data_dir {:?} is not writable", data_dir))]
20 NotWritable { data_dir: PathBuf },
21 #[snafu(display(
22 "Could not create subdirectory {:?} inside of data dir {:?}: {}",
23 subdir,
24 data_dir,
25 source
26 ))]
27 CouldNotCreate {
28 subdir: PathBuf,
29 data_dir: PathBuf,
30 source: std::io::Error,
31 },
32}
33
34#[configurable_component]
36#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
37#[serde(rename_all = "lowercase")]
38pub enum WildcardMatching {
39 #[default]
41 Strict,
42
43 Relaxed,
45}
46
47#[configurable_component(global_option("global_option"))]
52#[derive(Clone, Debug, Default, PartialEq)]
53pub struct GlobalOptions {
54 #[serde(default = "crate::default_data_dir")]
61 #[configurable(metadata(docs::common = false))]
62 pub data_dir: Option<PathBuf>,
63
64 #[serde(skip_serializing_if = "crate::serde::is_default")]
69 #[configurable(metadata(docs::common = false, docs::required = false))]
70 pub wildcard_matching: Option<WildcardMatching>,
71
72 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
77 #[configurable(metadata(docs::common = false, docs::required = false))]
78 pub log_schema: LogSchema,
79
80 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85 #[configurable(metadata(docs::common = false, docs::required = false))]
86 pub telemetry: Telemetry,
87
88 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
97 #[configurable(metadata(docs::common = false))]
98 pub timezone: Option<TimeZone>,
99
100 #[configurable(derived)]
101 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102 #[configurable(metadata(docs::common = false, docs::required = false))]
103 pub proxy: ProxyConfig,
104
105 #[serde(
112 default,
113 deserialize_with = "bool_or_struct",
114 skip_serializing_if = "crate::serde::is_default"
115 )]
116 #[configurable(metadata(docs::common = true, docs::required = false))]
117 pub acknowledgements: AcknowledgementsConfig,
118
119 #[configurable(deprecated)]
124 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
125 #[configurable(metadata(docs::hidden))]
126 pub expire_metrics: Option<Duration>,
127
128 #[serde(skip_serializing_if = "crate::serde::is_default")]
134 #[configurable(metadata(docs::common = false, docs::required = false))]
135 pub expire_metrics_secs: Option<f64>,
136
137 #[serde(skip_serializing_if = "crate::serde::is_default")]
141 pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
142}
143
144impl_generate_config_from_default!(GlobalOptions);
145
146impl GlobalOptions {
147 pub fn resolve_and_validate_data_dir(
154 &self,
155 local_data_dir: Option<&PathBuf>,
156 ) -> crate::Result<PathBuf> {
157 let data_dir = local_data_dir
158 .or(self.data_dir.as_ref())
159 .ok_or(DataDirError::MissingDataDir)
160 .map_err(Box::new)?
161 .clone();
162 if !data_dir.exists() {
163 return Err(DataDirError::DoesNotExist { data_dir }.into());
164 }
165 let readonly = std::fs::metadata(&data_dir)
166 .map(|meta| meta.permissions().readonly())
167 .unwrap_or(true);
168 if readonly {
169 return Err(DataDirError::NotWritable { data_dir }.into());
170 }
171 Ok(data_dir)
172 }
173
174 pub fn resolve_and_make_data_subdir(
181 &self,
182 local: Option<&PathBuf>,
183 subdir: &str,
184 ) -> crate::Result<PathBuf> {
185 let data_dir = self.resolve_and_validate_data_dir(local)?;
186
187 let mut data_subdir = data_dir.clone();
188 data_subdir.push(subdir);
189
190 DirBuilder::new()
191 .recursive(true)
192 .create(&data_subdir)
193 .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
194 Ok(data_subdir)
195 }
196
197 pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
204 let mut errors = Vec::new();
205
206 if conflicts(
207 self.wildcard_matching.as_ref(),
208 with.wildcard_matching.as_ref(),
209 ) {
210 errors.push("conflicting values for 'wildcard_matching' found".to_owned());
211 }
212
213 if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
214 errors.push("conflicting values for 'proxy.http' found".to_owned());
215 }
216
217 if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
218 errors.push("conflicting values for 'proxy.https' found".to_owned());
219 }
220
221 if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
222 errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
223 }
224
225 if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
226 errors.push("conflicting values for 'timezone' found".to_owned());
227 }
228
229 if conflicts(
230 self.acknowledgements.enabled.as_ref(),
231 with.acknowledgements.enabled.as_ref(),
232 ) {
233 errors.push("conflicting values for 'acknowledgements' found".to_owned());
234 }
235
236 if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
237 errors.push("conflicting values for 'expire_metrics' found".to_owned());
238 }
239
240 if conflicts(
241 self.expire_metrics_secs.as_ref(),
242 with.expire_metrics_secs.as_ref(),
243 ) {
244 errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
245 }
246
247 let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
248 with.data_dir
249 } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
250 errors.push("conflicting values for 'data_dir' found".to_owned());
253 None
254 } else {
255 self.data_dir.clone()
256 };
257
258 let mut log_schema = self.log_schema.clone();
261 if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
262 errors.extend(merge_errors);
263 }
264
265 let mut telemetry = self.telemetry.clone();
266 telemetry.merge(&with.telemetry);
267
268 let merged_expire_metrics_per_metric_set = match (
269 &self.expire_metrics_per_metric_set,
270 &with.expire_metrics_per_metric_set,
271 ) {
272 (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
273 (Some(a), None) => Some(a.clone()),
274 (None, Some(b)) => Some(b.clone()),
275 (None, None) => None,
276 };
277
278 if errors.is_empty() {
279 Ok(Self {
280 data_dir,
281 wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
282 log_schema,
283 telemetry,
284 acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
285 timezone: self.timezone.or(with.timezone),
286 proxy: self.proxy.merge(&with.proxy),
287 expire_metrics: self.expire_metrics.or(with.expire_metrics),
288 expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
289 expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
290 })
291 } else {
292 Err(errors)
293 }
294 }
295
296 pub fn timezone(&self) -> TimeZone {
298 self.timezone.unwrap_or(TimeZone::Local)
299 }
300}
301
302fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
303 matches!((this, that), (Some(this), Some(that)) if this != that)
304}
305
306#[cfg(test)]
307mod tests {
308 use std::fmt::Debug;
309
310 use chrono_tz::Tz;
311
312 use super::*;
313
314 #[test]
315 fn merges_data_dir() {
316 let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
317
318 assert_eq!(merge(None, None), Ok(default_data_dir()));
319 assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
320 assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
321 assert_eq!(
322 merge(Some("/test3"), Some("/test3")),
323 Ok(Some("/test3".into()))
324 );
325 assert_eq!(
326 merge(Some("/test4"), Some("/test5")),
327 Err(vec!["conflicting values for 'data_dir' found".into()])
328 );
329 }
330
331 #[test]
332 fn merges_timezones() {
333 let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
334
335 assert_eq!(merge(None, None), Ok(TimeZone::Local));
336 assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
337 assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
338 assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
339 assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
340 assert_eq!(
341 merge(None, Some("EST5EDT")),
342 Ok(TimeZone::Named(Tz::EST5EDT))
343 );
344 assert_eq!(
345 merge(Some("UTC"), Some("UTC")),
346 Ok(TimeZone::Named(Tz::UTC))
347 );
348 assert_eq!(
349 merge(Some("CST6CDT"), Some("GMT")),
350 Err(vec!["conflicting values for 'timezone' found".into()])
351 );
352 }
353
354 #[test]
355 fn merges_proxy() {
356 let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
359
360 assert_eq!(merge(None, None), Ok(None));
361 assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
362 assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
363 assert_eq!(
364 merge(Some("test3"), Some("test3")),
365 Ok(Some("test3".into()))
366 );
367 assert_eq!(
368 merge(Some("test4"), Some("test5")),
369 Err(vec!["conflicting values for 'proxy.http' found".into()])
370 );
371 }
372
373 #[test]
374 fn merges_acknowledgements() {
375 let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
376
377 assert_eq!(merge(None, None), Ok(None.into()));
378 assert_eq!(merge(Some(false), None), Ok(false.into()));
379 assert_eq!(merge(Some(true), None), Ok(true.into()));
380 assert_eq!(merge(None, Some(false)), Ok(false.into()));
381 assert_eq!(merge(None, Some(true)), Ok(true.into()));
382 assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
383 assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
384 assert_eq!(
385 merge(Some(false), Some(true)),
386 Err(vec![
387 "conflicting values for 'acknowledgements' found".into()
388 ])
389 );
390 assert_eq!(
391 merge(Some(true), Some(false)),
392 Err(vec![
393 "conflicting values for 'acknowledgements' found".into()
394 ])
395 );
396 }
397
398 #[test]
399 fn merges_expire_metrics() {
400 let merge = |a, b| {
401 merge("expire_metrics_secs", a, b, |result| {
402 result.expire_metrics_secs
403 })
404 };
405
406 assert_eq!(merge(None, None), Ok(None));
407 assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
408 assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
409 assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
410 assert_eq!(
411 merge(Some(4.0), Some(5.0)),
412 Err(vec![
413 "conflicting values for 'expire_metrics_secs' found".into()
414 ])
415 );
416 }
417
418 fn merge<P: Debug, T>(
419 name: &str,
420 dd1: Option<P>,
421 dd2: Option<P>,
422 result: impl Fn(GlobalOptions) -> T,
423 ) -> Result<T, Vec<String>> {
424 make_config(name, dd1)
426 .merge(make_config(name, dd2))
427 .map(result)
428 }
429
430 fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
431 toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
432 .unwrap()
433 }
434}