1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::{
8 super::default_data_dir, AcknowledgementsConfig, LogSchema, Telemetry,
9 metrics_expiration::PerMetricSetExpiration, proxy::ProxyConfig,
10};
11use crate::serde::bool_or_struct;
12
13#[expect(
14 clippy::ref_option,
15 reason = "we have to follow the serde calling convention"
16)]
17fn is_default_buffer_utilization_ewma_half_life_seconds(value: &Option<f64>) -> bool {
18 value.is_none_or(|seconds| {
19 seconds == vector_buffers::topology::channel::DEFAULT_EWMA_HALF_LIFE_SECONDS
20 })
21}
22
23#[derive(Debug, Snafu)]
24pub(crate) enum DataDirError {
25 #[snafu(display("data_dir option required, but not given here or globally"))]
26 MissingDataDir,
27 #[snafu(display("data_dir {:?} does not exist", data_dir))]
28 DoesNotExist { data_dir: PathBuf },
29 #[snafu(display("data_dir {:?} is not writable", data_dir))]
30 NotWritable { data_dir: PathBuf },
31 #[snafu(display(
32 "Could not create subdirectory {:?} inside of data dir {:?}: {}",
33 subdir,
34 data_dir,
35 source
36 ))]
37 CouldNotCreate {
38 subdir: PathBuf,
39 data_dir: PathBuf,
40 source: std::io::Error,
41 },
42}
43
44#[configurable_component]
46#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
47#[serde(rename_all = "lowercase")]
48pub enum WildcardMatching {
49 #[default]
51 Strict,
52
53 Relaxed,
55}
56
57#[configurable_component]
62#[derive(Clone, Debug, Default, PartialEq)]
63pub struct GlobalOptions {
64 #[serde(default = "crate::default_data_dir")]
71 #[configurable(metadata(docs::common = false))]
72 pub data_dir: Option<PathBuf>,
73
74 #[serde(skip_serializing_if = "crate::serde::is_default")]
79 #[configurable(metadata(docs::common = false, docs::required = false))]
80 pub wildcard_matching: Option<WildcardMatching>,
81
82 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
87 #[configurable(metadata(docs::common = false, docs::required = false))]
88 pub log_schema: LogSchema,
89
90 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95 #[configurable(metadata(docs::common = false, docs::required = false))]
96 pub telemetry: Telemetry,
97
98 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
107 #[configurable(metadata(docs::common = false))]
108 pub timezone: Option<TimeZone>,
109
110 #[configurable(derived)]
111 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
112 #[configurable(metadata(docs::common = false, docs::required = false))]
113 pub proxy: ProxyConfig,
114
115 #[serde(
122 default,
123 deserialize_with = "bool_or_struct",
124 skip_serializing_if = "crate::serde::is_default"
125 )]
126 #[configurable(metadata(docs::common = true, docs::required = false))]
127 pub acknowledgements: AcknowledgementsConfig,
128
129 #[configurable(deprecated)]
134 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
135 #[configurable(metadata(docs::hidden))]
136 pub expire_metrics: Option<Duration>,
137
138 #[serde(skip_serializing_if = "crate::serde::is_default")]
144 #[configurable(metadata(docs::common = false, docs::required = false))]
145 pub expire_metrics_secs: Option<f64>,
146
147 #[serde(skip_serializing_if = "crate::serde::is_default")]
151 pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
152
153 #[serde(skip_serializing_if = "is_default_buffer_utilization_ewma_half_life_seconds")]
169 #[configurable(validation(range(min = 0.0)))]
170 #[configurable(metadata(docs::advanced))]
171 pub buffer_utilization_ewma_half_life_seconds: Option<f64>,
172
173 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
182 #[configurable(validation(range(min = 0.0, max = 1.0)))]
183 #[configurable(metadata(docs::advanced))]
184 pub latency_ewma_alpha: Option<f64>,
185
186 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
192 pub metrics_storage_refresh_period: Option<f64>,
193}
194
195impl_generate_config_from_default!(GlobalOptions);
196
197impl GlobalOptions {
198 pub fn resolve_and_validate_data_dir(
205 &self,
206 local_data_dir: Option<&PathBuf>,
207 ) -> crate::Result<PathBuf> {
208 let data_dir = local_data_dir
209 .or(self.data_dir.as_ref())
210 .ok_or(DataDirError::MissingDataDir)
211 .map_err(Box::new)?
212 .clone();
213 if !data_dir.exists() {
214 return Err(DataDirError::DoesNotExist { data_dir }.into());
215 }
216 let readonly = std::fs::metadata(&data_dir)
217 .map(|meta| meta.permissions().readonly())
218 .unwrap_or(true);
219 if readonly {
220 return Err(DataDirError::NotWritable { data_dir }.into());
221 }
222 Ok(data_dir)
223 }
224
225 pub fn resolve_and_make_data_subdir(
232 &self,
233 local: Option<&PathBuf>,
234 subdir: &str,
235 ) -> crate::Result<PathBuf> {
236 let data_dir = self.resolve_and_validate_data_dir(local)?;
237
238 let mut data_subdir = data_dir.clone();
239 data_subdir.push(subdir);
240
241 DirBuilder::new()
242 .recursive(true)
243 .create(&data_subdir)
244 .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
245 Ok(data_subdir)
246 }
247
248 pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
255 let mut errors = Vec::new();
256
257 if conflicts(
258 self.wildcard_matching.as_ref(),
259 with.wildcard_matching.as_ref(),
260 ) {
261 errors.push("conflicting values for 'wildcard_matching' found".to_owned());
262 }
263
264 if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
265 errors.push("conflicting values for 'proxy.http' found".to_owned());
266 }
267
268 if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
269 errors.push("conflicting values for 'proxy.https' found".to_owned());
270 }
271
272 if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
273 errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
274 }
275
276 if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
277 errors.push("conflicting values for 'timezone' found".to_owned());
278 }
279
280 if conflicts(
281 self.acknowledgements.enabled.as_ref(),
282 with.acknowledgements.enabled.as_ref(),
283 ) {
284 errors.push("conflicting values for 'acknowledgements' found".to_owned());
285 }
286
287 if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
288 errors.push("conflicting values for 'expire_metrics' found".to_owned());
289 }
290
291 if conflicts(
292 self.expire_metrics_secs.as_ref(),
293 with.expire_metrics_secs.as_ref(),
294 ) {
295 errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
296 }
297
298 let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
299 with.data_dir
300 } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
301 errors.push("conflicting values for 'data_dir' found".to_owned());
304 None
305 } else {
306 self.data_dir.clone()
307 };
308
309 let mut log_schema = self.log_schema.clone();
312 if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
313 errors.extend(merge_errors);
314 }
315
316 let mut telemetry = self.telemetry.clone();
317 telemetry.merge(&with.telemetry);
318
319 let merged_expire_metrics_per_metric_set = match (
320 &self.expire_metrics_per_metric_set,
321 &with.expire_metrics_per_metric_set,
322 ) {
323 (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
324 (Some(a), None) => Some(a.clone()),
325 (None, Some(b)) => Some(b.clone()),
326 (None, None) => None,
327 };
328
329 if errors.is_empty() {
330 Ok(Self {
331 data_dir,
332 wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
333 log_schema,
334 telemetry,
335 acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
336 timezone: self.timezone.or(with.timezone),
337 proxy: self.proxy.merge(&with.proxy),
338 expire_metrics: self.expire_metrics.or(with.expire_metrics),
339 expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
340 expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
341 buffer_utilization_ewma_half_life_seconds: self
342 .buffer_utilization_ewma_half_life_seconds
343 .or(with.buffer_utilization_ewma_half_life_seconds),
344 latency_ewma_alpha: self.latency_ewma_alpha.or(with.latency_ewma_alpha),
345 metrics_storage_refresh_period: self
346 .metrics_storage_refresh_period
347 .or(with.metrics_storage_refresh_period),
348 })
349 } else {
350 Err(errors)
351 }
352 }
353
354 pub fn timezone(&self) -> TimeZone {
356 self.timezone.unwrap_or(TimeZone::Local)
357 }
358
359 pub fn diff(&self, other: &Self) -> Result<Vec<String>, serde_json::Error> {
373 let old_value = serde_json::to_value(self)?;
374 let new_value = serde_json::to_value(other)?;
375
376 let serde_json::Value::Object(old_map) = old_value else {
377 return Ok(vec![]);
378 };
379 let serde_json::Value::Object(new_map) = new_value else {
380 return Ok(vec![]);
381 };
382
383 Ok(old_map
384 .iter()
385 .filter_map(|(k, v_old)| match new_map.get(k) {
386 Some(v_new) if v_new != v_old => Some(k.clone()),
387 _ => None,
388 })
389 .collect())
390 }
391}
392
393fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
394 matches!((this, that), (Some(this), Some(that)) if this != that)
395}
396
397#[cfg(test)]
398mod tests {
399 use std::fmt::Debug;
400
401 use chrono_tz::Tz;
402
403 use super::*;
404
405 #[test]
406 fn merges_data_dir() {
407 let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
408
409 assert_eq!(merge(None, None), Ok(default_data_dir()));
410 assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
411 assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
412 assert_eq!(
413 merge(Some("/test3"), Some("/test3")),
414 Ok(Some("/test3".into()))
415 );
416 assert_eq!(
417 merge(Some("/test4"), Some("/test5")),
418 Err(vec!["conflicting values for 'data_dir' found".into()])
419 );
420 }
421
422 #[test]
423 fn merges_timezones() {
424 let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
425
426 assert_eq!(merge(None, None), Ok(TimeZone::Local));
427 assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
428 assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
429 assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
430 assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
431 assert_eq!(
432 merge(None, Some("EST5EDT")),
433 Ok(TimeZone::Named(Tz::EST5EDT))
434 );
435 assert_eq!(
436 merge(Some("UTC"), Some("UTC")),
437 Ok(TimeZone::Named(Tz::UTC))
438 );
439 assert_eq!(
440 merge(Some("CST6CDT"), Some("GMT")),
441 Err(vec!["conflicting values for 'timezone' found".into()])
442 );
443 }
444
445 #[test]
446 fn merges_proxy() {
447 let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
450
451 assert_eq!(merge(None, None), Ok(None));
452 assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
453 assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
454 assert_eq!(
455 merge(Some("test3"), Some("test3")),
456 Ok(Some("test3".into()))
457 );
458 assert_eq!(
459 merge(Some("test4"), Some("test5")),
460 Err(vec!["conflicting values for 'proxy.http' found".into()])
461 );
462 }
463
464 #[test]
465 fn merges_acknowledgements() {
466 let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
467
468 assert_eq!(merge(None, None), Ok(None.into()));
469 assert_eq!(merge(Some(false), None), Ok(false.into()));
470 assert_eq!(merge(Some(true), None), Ok(true.into()));
471 assert_eq!(merge(None, Some(false)), Ok(false.into()));
472 assert_eq!(merge(None, Some(true)), Ok(true.into()));
473 assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
474 assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
475 assert_eq!(
476 merge(Some(false), Some(true)),
477 Err(vec![
478 "conflicting values for 'acknowledgements' found".into()
479 ])
480 );
481 assert_eq!(
482 merge(Some(true), Some(false)),
483 Err(vec![
484 "conflicting values for 'acknowledgements' found".into()
485 ])
486 );
487 }
488
489 #[test]
490 fn merges_expire_metrics() {
491 let merge = |a, b| {
492 merge("expire_metrics_secs", a, b, |result| {
493 result.expire_metrics_secs
494 })
495 };
496
497 assert_eq!(merge(None, None), Ok(None));
498 assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
499 assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
500 assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
501 assert_eq!(
502 merge(Some(4.0), Some(5.0)),
503 Err(vec![
504 "conflicting values for 'expire_metrics_secs' found".into()
505 ])
506 );
507 }
508
509 #[test]
510 fn diff_detects_changed_keys() {
511 let old = GlobalOptions {
512 data_dir: Some(std::path::PathBuf::from("/path1")),
513 ..Default::default()
514 };
515 let new = GlobalOptions {
516 data_dir: Some(std::path::PathBuf::from("/path2")),
517 ..Default::default()
518 };
519 assert_eq!(
520 old.diff(&new).expect("diff failed"),
521 vec!["data_dir".to_string()]
522 );
523 }
524
525 fn merge<P: Debug, T>(
526 name: &str,
527 dd1: Option<P>,
528 dd2: Option<P>,
529 result: impl Fn(GlobalOptions) -> T,
530 ) -> Result<T, Vec<String>> {
531 make_config(name, dd1)
533 .merge(make_config(name, dd2))
534 .map(result)
535 }
536
537 fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
538 toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
539 .unwrap()
540 }
541}