1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::{
8 super::default_data_dir, AcknowledgementsConfig, LogSchema, Telemetry,
9 metrics_expiration::PerMetricSetExpiration, proxy::ProxyConfig,
10};
11use crate::serde::bool_or_struct;
12
13#[derive(Debug, Snafu)]
14pub(crate) enum DataDirError {
15 #[snafu(display("data_dir option required, but not given here or globally"))]
16 MissingDataDir,
17 #[snafu(display("data_dir {:?} does not exist", data_dir))]
18 DoesNotExist { data_dir: PathBuf },
19 #[snafu(display("data_dir {:?} is not writable", data_dir))]
20 NotWritable { data_dir: PathBuf },
21 #[snafu(display(
22 "Could not create subdirectory {:?} inside of data dir {:?}: {}",
23 subdir,
24 data_dir,
25 source
26 ))]
27 CouldNotCreate {
28 subdir: PathBuf,
29 data_dir: PathBuf,
30 source: std::io::Error,
31 },
32}
33
34#[configurable_component]
36#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
37#[serde(rename_all = "lowercase")]
38pub enum WildcardMatching {
39 #[default]
41 Strict,
42
43 Relaxed,
45}
46
47#[configurable_component(global_option("global_option"))]
52#[derive(Clone, Debug, Default, PartialEq)]
53pub struct GlobalOptions {
54 #[serde(default = "crate::default_data_dir")]
61 #[configurable(metadata(docs::common = false))]
62 pub data_dir: Option<PathBuf>,
63
64 #[serde(skip_serializing_if = "crate::serde::is_default")]
69 #[configurable(metadata(docs::common = false, docs::required = false))]
70 pub wildcard_matching: Option<WildcardMatching>,
71
72 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
77 #[configurable(metadata(docs::common = false, docs::required = false))]
78 pub log_schema: LogSchema,
79
80 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85 #[configurable(metadata(docs::common = false, docs::required = false))]
86 pub telemetry: Telemetry,
87
88 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
97 #[configurable(metadata(docs::common = false))]
98 pub timezone: Option<TimeZone>,
99
100 #[configurable(derived)]
101 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102 #[configurable(metadata(docs::common = false, docs::required = false))]
103 pub proxy: ProxyConfig,
104
105 #[serde(
112 default,
113 deserialize_with = "bool_or_struct",
114 skip_serializing_if = "crate::serde::is_default"
115 )]
116 #[configurable(metadata(docs::common = true, docs::required = false))]
117 pub acknowledgements: AcknowledgementsConfig,
118
119 #[configurable(deprecated)]
124 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
125 #[configurable(metadata(docs::hidden))]
126 pub expire_metrics: Option<Duration>,
127
128 #[serde(skip_serializing_if = "crate::serde::is_default")]
134 #[configurable(metadata(docs::common = false, docs::required = false))]
135 pub expire_metrics_secs: Option<f64>,
136
137 #[serde(skip_serializing_if = "crate::serde::is_default")]
141 pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
142}
143
144impl_generate_config_from_default!(GlobalOptions);
145
146impl GlobalOptions {
147 pub fn resolve_and_validate_data_dir(
154 &self,
155 local_data_dir: Option<&PathBuf>,
156 ) -> crate::Result<PathBuf> {
157 let data_dir = local_data_dir
158 .or(self.data_dir.as_ref())
159 .ok_or(DataDirError::MissingDataDir)
160 .map_err(Box::new)?
161 .clone();
162 if !data_dir.exists() {
163 return Err(DataDirError::DoesNotExist { data_dir }.into());
164 }
165 let readonly = std::fs::metadata(&data_dir)
166 .map(|meta| meta.permissions().readonly())
167 .unwrap_or(true);
168 if readonly {
169 return Err(DataDirError::NotWritable { data_dir }.into());
170 }
171 Ok(data_dir)
172 }
173
174 pub fn resolve_and_make_data_subdir(
181 &self,
182 local: Option<&PathBuf>,
183 subdir: &str,
184 ) -> crate::Result<PathBuf> {
185 let data_dir = self.resolve_and_validate_data_dir(local)?;
186
187 let mut data_subdir = data_dir.clone();
188 data_subdir.push(subdir);
189
190 DirBuilder::new()
191 .recursive(true)
192 .create(&data_subdir)
193 .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
194 Ok(data_subdir)
195 }
196
197 pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
204 let mut errors = Vec::new();
205
206 if conflicts(
207 self.wildcard_matching.as_ref(),
208 with.wildcard_matching.as_ref(),
209 ) {
210 errors.push("conflicting values for 'wildcard_matching' found".to_owned());
211 }
212
213 if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
214 errors.push("conflicting values for 'proxy.http' found".to_owned());
215 }
216
217 if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
218 errors.push("conflicting values for 'proxy.https' found".to_owned());
219 }
220
221 if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
222 errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
223 }
224
225 if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
226 errors.push("conflicting values for 'timezone' found".to_owned());
227 }
228
229 if conflicts(
230 self.acknowledgements.enabled.as_ref(),
231 with.acknowledgements.enabled.as_ref(),
232 ) {
233 errors.push("conflicting values for 'acknowledgements' found".to_owned());
234 }
235
236 if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
237 errors.push("conflicting values for 'expire_metrics' found".to_owned());
238 }
239
240 if conflicts(
241 self.expire_metrics_secs.as_ref(),
242 with.expire_metrics_secs.as_ref(),
243 ) {
244 errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
245 }
246
247 let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
248 with.data_dir
249 } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
250 errors.push("conflicting values for 'data_dir' found".to_owned());
253 None
254 } else {
255 self.data_dir.clone()
256 };
257
258 let mut log_schema = self.log_schema.clone();
261 if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
262 errors.extend(merge_errors);
263 }
264
265 let mut telemetry = self.telemetry.clone();
266 telemetry.merge(&with.telemetry);
267
268 let merged_expire_metrics_per_metric_set = match (
269 &self.expire_metrics_per_metric_set,
270 &with.expire_metrics_per_metric_set,
271 ) {
272 (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
273 (Some(a), None) => Some(a.clone()),
274 (None, Some(b)) => Some(b.clone()),
275 (None, None) => None,
276 };
277
278 if errors.is_empty() {
279 Ok(Self {
280 data_dir,
281 wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
282 log_schema,
283 telemetry,
284 acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
285 timezone: self.timezone.or(with.timezone),
286 proxy: self.proxy.merge(&with.proxy),
287 expire_metrics: self.expire_metrics.or(with.expire_metrics),
288 expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
289 expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
290 })
291 } else {
292 Err(errors)
293 }
294 }
295
296 pub fn timezone(&self) -> TimeZone {
298 self.timezone.unwrap_or(TimeZone::Local)
299 }
300
301 pub fn diff(&self, other: &Self) -> Result<Vec<String>, serde_json::Error> {
315 let old_value = serde_json::to_value(self)?;
316 let new_value = serde_json::to_value(other)?;
317
318 let serde_json::Value::Object(old_map) = old_value else {
319 return Ok(vec![]);
320 };
321 let serde_json::Value::Object(new_map) = new_value else {
322 return Ok(vec![]);
323 };
324
325 Ok(old_map
326 .iter()
327 .filter_map(|(k, v_old)| match new_map.get(k) {
328 Some(v_new) if v_new != v_old => Some(k.clone()),
329 _ => None,
330 })
331 .collect())
332 }
333}
334
335fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
336 matches!((this, that), (Some(this), Some(that)) if this != that)
337}
338
339#[cfg(test)]
340mod tests {
341 use std::fmt::Debug;
342
343 use chrono_tz::Tz;
344
345 use super::*;
346
347 #[test]
348 fn merges_data_dir() {
349 let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
350
351 assert_eq!(merge(None, None), Ok(default_data_dir()));
352 assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
353 assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
354 assert_eq!(
355 merge(Some("/test3"), Some("/test3")),
356 Ok(Some("/test3".into()))
357 );
358 assert_eq!(
359 merge(Some("/test4"), Some("/test5")),
360 Err(vec!["conflicting values for 'data_dir' found".into()])
361 );
362 }
363
364 #[test]
365 fn merges_timezones() {
366 let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
367
368 assert_eq!(merge(None, None), Ok(TimeZone::Local));
369 assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
370 assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
371 assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
372 assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
373 assert_eq!(
374 merge(None, Some("EST5EDT")),
375 Ok(TimeZone::Named(Tz::EST5EDT))
376 );
377 assert_eq!(
378 merge(Some("UTC"), Some("UTC")),
379 Ok(TimeZone::Named(Tz::UTC))
380 );
381 assert_eq!(
382 merge(Some("CST6CDT"), Some("GMT")),
383 Err(vec!["conflicting values for 'timezone' found".into()])
384 );
385 }
386
387 #[test]
388 fn merges_proxy() {
389 let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
392
393 assert_eq!(merge(None, None), Ok(None));
394 assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
395 assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
396 assert_eq!(
397 merge(Some("test3"), Some("test3")),
398 Ok(Some("test3".into()))
399 );
400 assert_eq!(
401 merge(Some("test4"), Some("test5")),
402 Err(vec!["conflicting values for 'proxy.http' found".into()])
403 );
404 }
405
406 #[test]
407 fn merges_acknowledgements() {
408 let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
409
410 assert_eq!(merge(None, None), Ok(None.into()));
411 assert_eq!(merge(Some(false), None), Ok(false.into()));
412 assert_eq!(merge(Some(true), None), Ok(true.into()));
413 assert_eq!(merge(None, Some(false)), Ok(false.into()));
414 assert_eq!(merge(None, Some(true)), Ok(true.into()));
415 assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
416 assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
417 assert_eq!(
418 merge(Some(false), Some(true)),
419 Err(vec![
420 "conflicting values for 'acknowledgements' found".into()
421 ])
422 );
423 assert_eq!(
424 merge(Some(true), Some(false)),
425 Err(vec![
426 "conflicting values for 'acknowledgements' found".into()
427 ])
428 );
429 }
430
431 #[test]
432 fn merges_expire_metrics() {
433 let merge = |a, b| {
434 merge("expire_metrics_secs", a, b, |result| {
435 result.expire_metrics_secs
436 })
437 };
438
439 assert_eq!(merge(None, None), Ok(None));
440 assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
441 assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
442 assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
443 assert_eq!(
444 merge(Some(4.0), Some(5.0)),
445 Err(vec![
446 "conflicting values for 'expire_metrics_secs' found".into()
447 ])
448 );
449 }
450
451 #[test]
452 fn diff_detects_changed_keys() {
453 let old = GlobalOptions {
454 data_dir: Some(std::path::PathBuf::from("/path1")),
455 ..Default::default()
456 };
457 let new = GlobalOptions {
458 data_dir: Some(std::path::PathBuf::from("/path2")),
459 ..Default::default()
460 };
461 assert_eq!(
462 old.diff(&new).expect("diff failed"),
463 vec!["data_dir".to_string()]
464 );
465 }
466
467 fn merge<P: Debug, T>(
468 name: &str,
469 dd1: Option<P>,
470 dd2: Option<P>,
471 result: impl Fn(GlobalOptions) -> T,
472 ) -> Result<T, Vec<String>> {
473 make_config(name, dd1)
475 .merge(make_config(name, dd2))
476 .map(result)
477 }
478
479 fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
480 toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
481 .unwrap()
482 }
483}