vector_core/config/
global_options.rs

1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::{
8    super::default_data_dir, AcknowledgementsConfig, LogSchema, Telemetry,
9    metrics_expiration::PerMetricSetExpiration, proxy::ProxyConfig,
10};
11use crate::serde::bool_or_struct;
12
13#[expect(
14    clippy::ref_option,
15    reason = "we have to follow the serde calling convention"
16)]
17fn is_default_buffer_utilization_ewma_half_life_seconds(value: &Option<f64>) -> bool {
18    value.is_none_or(|seconds| {
19        seconds == vector_buffers::topology::channel::DEFAULT_EWMA_HALF_LIFE_SECONDS
20    })
21}
22
23#[derive(Debug, Snafu)]
24pub(crate) enum DataDirError {
25    #[snafu(display("data_dir option required, but not given here or globally"))]
26    MissingDataDir,
27    #[snafu(display("data_dir {:?} does not exist", data_dir))]
28    DoesNotExist { data_dir: PathBuf },
29    #[snafu(display("data_dir {:?} is not writable", data_dir))]
30    NotWritable { data_dir: PathBuf },
31    #[snafu(display(
32        "Could not create subdirectory {:?} inside of data dir {:?}: {}",
33        subdir,
34        data_dir,
35        source
36    ))]
37    CouldNotCreate {
38        subdir: PathBuf,
39        data_dir: PathBuf,
40        source: std::io::Error,
41    },
42}
43
44/// Specifies the wildcard matching mode, relaxed allows configurations where wildcard doesn not match any existing inputs
45#[configurable_component]
46#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
47#[serde(rename_all = "lowercase")]
48pub enum WildcardMatching {
49    /// Strict matching (must match at least one existing input)
50    #[default]
51    Strict,
52
53    /// Relaxed matching (must match 0 or more inputs)
54    Relaxed,
55}
56
57/// Global configuration options.
58//
59// If this is modified, make sure those changes are reflected in the `ConfigBuilder::append`
60// function!
61#[configurable_component]
62#[derive(Clone, Debug, Default, PartialEq)]
63pub struct GlobalOptions {
64    /// The directory used for persisting Vector state data.
65    ///
66    /// This is the directory where Vector will store any state data, such as disk buffers, file
67    /// checkpoints, and more.
68    ///
69    /// Vector must have write permissions to this directory.
70    #[serde(default = "crate::default_data_dir")]
71    #[configurable(metadata(docs::common = false))]
72    pub data_dir: Option<PathBuf>,
73
74    /// Set wildcard matching mode for inputs
75    ///
76    /// Setting this to "relaxed" allows configurations with wildcards that do not match any inputs
77    /// to be accepted without causing an error.
78    #[serde(skip_serializing_if = "crate::serde::is_default")]
79    #[configurable(metadata(docs::common = false, docs::required = false))]
80    pub wildcard_matching: Option<WildcardMatching>,
81
82    /// Default log schema for all events.
83    ///
84    /// This is used if a component does not have its own specific log schema. All events use a log
85    /// schema, whether or not the default is used, to assign event fields on incoming events.
86    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
87    #[configurable(metadata(docs::common = false, docs::required = false))]
88    pub log_schema: LogSchema,
89
90    /// Telemetry options.
91    ///
92    /// Determines whether `source` and `service` tags should be emitted with the
93    /// `component_sent_*` and `component_received_*` events.
94    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95    #[configurable(metadata(docs::common = false, docs::required = false))]
96    pub telemetry: Telemetry,
97
98    /// The name of the time zone to apply to timestamp conversions that do not contain an explicit time zone.
99    ///
100    /// The time zone name may be any name in the [TZ database][tzdb] or `local` to indicate system
101    /// local time.
102    ///
103    /// Note that in Vector/VRL all timestamps are represented in UTC.
104    ///
105    /// [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
106    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
107    #[configurable(metadata(docs::common = false))]
108    pub timezone: Option<TimeZone>,
109
110    #[configurable(derived)]
111    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
112    #[configurable(metadata(docs::common = false, docs::required = false))]
113    pub proxy: ProxyConfig,
114
115    /// Controls how acknowledgements are handled for all sinks by default.
116    ///
117    /// See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event
118    /// acknowledgement.
119    ///
120    /// [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/
121    #[serde(
122        default,
123        deserialize_with = "bool_or_struct",
124        skip_serializing_if = "crate::serde::is_default"
125    )]
126    #[configurable(metadata(docs::common = true, docs::required = false))]
127    pub acknowledgements: AcknowledgementsConfig,
128
129    /// The amount of time, in seconds, that internal metrics will persist after having not been
130    /// updated before they expire and are removed.
131    ///
132    /// Deprecated: use `expire_metrics_secs` instead
133    #[configurable(deprecated)]
134    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
135    #[configurable(metadata(docs::hidden))]
136    pub expire_metrics: Option<Duration>,
137
138    /// The amount of time, in seconds, that internal metrics will persist after having not been
139    /// updated before they expire and are removed.
140    ///
141    /// Set this to a value larger than your `internal_metrics` scrape interval (default 5 minutes)
142    /// so metrics live long enough to be emitted and captured.
143    #[serde(skip_serializing_if = "crate::serde::is_default")]
144    #[configurable(metadata(docs::common = false, docs::required = false))]
145    pub expire_metrics_secs: Option<f64>,
146
147    /// This allows configuring different expiration intervals for different metric sets.
148    /// By default this is empty and any metric not matched by one of these sets will use
149    /// the global default value, defined using `expire_metrics_secs`.
150    #[serde(skip_serializing_if = "crate::serde::is_default")]
151    pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
152
153    /// The half-life, in seconds, for the exponential weighted moving average (EWMA) of source
154    /// and transform buffer utilization metrics.
155    ///
156    /// This controls how quickly the `*_buffer_utilization_mean` gauges respond to new
157    /// observations. Longer half-lives retain more of the previous value, leading to slower
158    /// adjustments.
159    ///
160    /// - Lower values (< 1): Metrics update quickly but may be volatile
161    /// - Default (5): Balanced between responsiveness and stability
162    /// - Higher values (> 5): Smooth, stable metrics that update slowly
163    ///
164    /// Adjust based on whether you need fast detection of buffer issues (lower)
165    /// or want to see sustained trends without noise (higher).
166    ///
167    /// Must be greater than 0.
168    #[serde(skip_serializing_if = "is_default_buffer_utilization_ewma_half_life_seconds")]
169    #[configurable(validation(range(min = 0.0)))]
170    #[configurable(metadata(docs::advanced))]
171    pub buffer_utilization_ewma_half_life_seconds: Option<f64>,
172
173    /// The alpha value for the exponential weighted moving average (EWMA) of transform latency
174    /// metrics.
175    ///
176    /// This controls how quickly the `component_latency_mean_seconds` gauge responds to new
177    /// observations. Values closer to 1.0 retain more of the previous value, leading to slower
178    /// adjustments. The default value of 0.9 is equivalent to a "half life" of 6-7 measurements.
179    ///
180    /// Must be between 0 and 1 exclusively (0 < alpha < 1).
181    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
182    #[configurable(validation(range(min = 0.0, max = 1.0)))]
183    #[configurable(metadata(docs::advanced))]
184    pub latency_ewma_alpha: Option<f64>,
185
186    /// The interval, in seconds, at which the internal metrics cache for VRL is refreshed.
187    /// This must be set to be able to access metrics in VRL functions.
188    ///
189    /// Higher values lead to stale metric values from `get_vector_metric`,
190    /// `find_vector_metrics`, and `aggregate_vector_metrics` functions.
191    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
192    pub metrics_storage_refresh_period: Option<f64>,
193}
194
195impl_generate_config_from_default!(GlobalOptions);
196
197impl GlobalOptions {
198    /// Resolve the `data_dir` option in either the global or local config, and
199    /// validate that it exists and is writable.
200    ///
201    /// # Errors
202    ///
203    /// Function will error if it is unable to make data directory.
204    pub fn resolve_and_validate_data_dir(
205        &self,
206        local_data_dir: Option<&PathBuf>,
207    ) -> crate::Result<PathBuf> {
208        let data_dir = local_data_dir
209            .or(self.data_dir.as_ref())
210            .ok_or(DataDirError::MissingDataDir)
211            .map_err(Box::new)?
212            .clone();
213        if !data_dir.exists() {
214            return Err(DataDirError::DoesNotExist { data_dir }.into());
215        }
216        let readonly = std::fs::metadata(&data_dir)
217            .map(|meta| meta.permissions().readonly())
218            .unwrap_or(true);
219        if readonly {
220            return Err(DataDirError::NotWritable { data_dir }.into());
221        }
222        Ok(data_dir)
223    }
224
225    /// Resolve the `data_dir` option using `resolve_and_validate_data_dir` and
226    /// then ensure a named subdirectory exists.
227    ///
228    /// # Errors
229    ///
230    /// Function will error if it is unable to make data subdirectory.
231    pub fn resolve_and_make_data_subdir(
232        &self,
233        local: Option<&PathBuf>,
234        subdir: &str,
235    ) -> crate::Result<PathBuf> {
236        let data_dir = self.resolve_and_validate_data_dir(local)?;
237
238        let mut data_subdir = data_dir.clone();
239        data_subdir.push(subdir);
240
241        DirBuilder::new()
242            .recursive(true)
243            .create(&data_subdir)
244            .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
245        Ok(data_subdir)
246    }
247
248    /// Merge a second global configuration into self, and return the new merged data.
249    ///
250    /// # Errors
251    ///
252    /// Returns a list of textual errors if there is a merge conflict between the two global
253    /// configs.
254    pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
255        let mut errors = Vec::new();
256
257        if conflicts(
258            self.wildcard_matching.as_ref(),
259            with.wildcard_matching.as_ref(),
260        ) {
261            errors.push("conflicting values for 'wildcard_matching' found".to_owned());
262        }
263
264        if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
265            errors.push("conflicting values for 'proxy.http' found".to_owned());
266        }
267
268        if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
269            errors.push("conflicting values for 'proxy.https' found".to_owned());
270        }
271
272        if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
273            errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
274        }
275
276        if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
277            errors.push("conflicting values for 'timezone' found".to_owned());
278        }
279
280        if conflicts(
281            self.acknowledgements.enabled.as_ref(),
282            with.acknowledgements.enabled.as_ref(),
283        ) {
284            errors.push("conflicting values for 'acknowledgements' found".to_owned());
285        }
286
287        if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
288            errors.push("conflicting values for 'expire_metrics' found".to_owned());
289        }
290
291        if conflicts(
292            self.expire_metrics_secs.as_ref(),
293            with.expire_metrics_secs.as_ref(),
294        ) {
295            errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
296        }
297
298        let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
299            with.data_dir
300        } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
301            // If two configs both set 'data_dir' and have conflicting values
302            // we consider this an error.
303            errors.push("conflicting values for 'data_dir' found".to_owned());
304            None
305        } else {
306            self.data_dir.clone()
307        };
308
309        // If the user has multiple config files, we must *merge* log schemas
310        // until we meet a conflict, then we are allowed to error.
311        let mut log_schema = self.log_schema.clone();
312        if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
313            errors.extend(merge_errors);
314        }
315
316        let mut telemetry = self.telemetry.clone();
317        telemetry.merge(&with.telemetry);
318
319        let merged_expire_metrics_per_metric_set = match (
320            &self.expire_metrics_per_metric_set,
321            &with.expire_metrics_per_metric_set,
322        ) {
323            (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
324            (Some(a), None) => Some(a.clone()),
325            (None, Some(b)) => Some(b.clone()),
326            (None, None) => None,
327        };
328
329        if errors.is_empty() {
330            Ok(Self {
331                data_dir,
332                wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
333                log_schema,
334                telemetry,
335                acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
336                timezone: self.timezone.or(with.timezone),
337                proxy: self.proxy.merge(&with.proxy),
338                expire_metrics: self.expire_metrics.or(with.expire_metrics),
339                expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
340                expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
341                buffer_utilization_ewma_half_life_seconds: self
342                    .buffer_utilization_ewma_half_life_seconds
343                    .or(with.buffer_utilization_ewma_half_life_seconds),
344                latency_ewma_alpha: self.latency_ewma_alpha.or(with.latency_ewma_alpha),
345                metrics_storage_refresh_period: self
346                    .metrics_storage_refresh_period
347                    .or(with.metrics_storage_refresh_period),
348            })
349        } else {
350            Err(errors)
351        }
352    }
353
354    /// Get the configured time zone, using "local" time if none is set.
355    pub fn timezone(&self) -> TimeZone {
356        self.timezone.unwrap_or(TimeZone::Local)
357    }
358
359    /// Returns a list of top-level field names that differ between two [`GlobalOptions`] values.
360    ///
361    /// This function performs a shallow comparison by serializing both configs to JSON
362    /// and comparing their top-level keys.
363    ///
364    /// Useful for logging which global fields changed during config reload attempts.
365    ///
366    /// # Errors
367    ///
368    /// Returns a [`serde_json::Error`] if either of the [`GlobalOptions`] values
369    /// cannot be serialized into a JSON object. This is unlikely under normal usage,
370    /// but may occur if serialization fails due to unexpected data structures or changes
371    /// in the type definition.
372    pub fn diff(&self, other: &Self) -> Result<Vec<String>, serde_json::Error> {
373        let old_value = serde_json::to_value(self)?;
374        let new_value = serde_json::to_value(other)?;
375
376        let serde_json::Value::Object(old_map) = old_value else {
377            return Ok(vec![]);
378        };
379        let serde_json::Value::Object(new_map) = new_value else {
380            return Ok(vec![]);
381        };
382
383        Ok(old_map
384            .iter()
385            .filter_map(|(k, v_old)| match new_map.get(k) {
386                Some(v_new) if v_new != v_old => Some(k.clone()),
387                _ => None,
388            })
389            .collect())
390    }
391}
392
393fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
394    matches!((this, that), (Some(this), Some(that)) if this != that)
395}
396
397#[cfg(test)]
398mod tests {
399    use std::fmt::Debug;
400
401    use chrono_tz::Tz;
402
403    use super::*;
404
405    #[test]
406    fn merges_data_dir() {
407        let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
408
409        assert_eq!(merge(None, None), Ok(default_data_dir()));
410        assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
411        assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
412        assert_eq!(
413            merge(Some("/test3"), Some("/test3")),
414            Ok(Some("/test3".into()))
415        );
416        assert_eq!(
417            merge(Some("/test4"), Some("/test5")),
418            Err(vec!["conflicting values for 'data_dir' found".into()])
419        );
420    }
421
422    #[test]
423    fn merges_timezones() {
424        let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
425
426        assert_eq!(merge(None, None), Ok(TimeZone::Local));
427        assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
428        assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
429        assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
430        assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
431        assert_eq!(
432            merge(None, Some("EST5EDT")),
433            Ok(TimeZone::Named(Tz::EST5EDT))
434        );
435        assert_eq!(
436            merge(Some("UTC"), Some("UTC")),
437            Ok(TimeZone::Named(Tz::UTC))
438        );
439        assert_eq!(
440            merge(Some("CST6CDT"), Some("GMT")),
441            Err(vec!["conflicting values for 'timezone' found".into()])
442        );
443    }
444
445    #[test]
446    fn merges_proxy() {
447        // We use the `.http` settings as a proxy for the other settings, as they are all compared
448        // for equality above.
449        let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
450
451        assert_eq!(merge(None, None), Ok(None));
452        assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
453        assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
454        assert_eq!(
455            merge(Some("test3"), Some("test3")),
456            Ok(Some("test3".into()))
457        );
458        assert_eq!(
459            merge(Some("test4"), Some("test5")),
460            Err(vec!["conflicting values for 'proxy.http' found".into()])
461        );
462    }
463
464    #[test]
465    fn merges_acknowledgements() {
466        let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
467
468        assert_eq!(merge(None, None), Ok(None.into()));
469        assert_eq!(merge(Some(false), None), Ok(false.into()));
470        assert_eq!(merge(Some(true), None), Ok(true.into()));
471        assert_eq!(merge(None, Some(false)), Ok(false.into()));
472        assert_eq!(merge(None, Some(true)), Ok(true.into()));
473        assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
474        assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
475        assert_eq!(
476            merge(Some(false), Some(true)),
477            Err(vec![
478                "conflicting values for 'acknowledgements' found".into()
479            ])
480        );
481        assert_eq!(
482            merge(Some(true), Some(false)),
483            Err(vec![
484                "conflicting values for 'acknowledgements' found".into()
485            ])
486        );
487    }
488
489    #[test]
490    fn merges_expire_metrics() {
491        let merge = |a, b| {
492            merge("expire_metrics_secs", a, b, |result| {
493                result.expire_metrics_secs
494            })
495        };
496
497        assert_eq!(merge(None, None), Ok(None));
498        assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
499        assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
500        assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
501        assert_eq!(
502            merge(Some(4.0), Some(5.0)),
503            Err(vec![
504                "conflicting values for 'expire_metrics_secs' found".into()
505            ])
506        );
507    }
508
509    #[test]
510    fn diff_detects_changed_keys() {
511        let old = GlobalOptions {
512            data_dir: Some(std::path::PathBuf::from("/path1")),
513            ..Default::default()
514        };
515        let new = GlobalOptions {
516            data_dir: Some(std::path::PathBuf::from("/path2")),
517            ..Default::default()
518        };
519        assert_eq!(
520            old.diff(&new).expect("diff failed"),
521            vec!["data_dir".to_string()]
522        );
523    }
524
525    fn merge<P: Debug, T>(
526        name: &str,
527        dd1: Option<P>,
528        dd2: Option<P>,
529        result: impl Fn(GlobalOptions) -> T,
530    ) -> Result<T, Vec<String>> {
531        // Use TOML parsing to match the behavior of what a user would actually configure.
532        make_config(name, dd1)
533            .merge(make_config(name, dd2))
534            .map(result)
535    }
536
537    fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
538        toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
539            .unwrap()
540    }
541}