vector_core/config/
global_options.rs

1use std::{fs::DirBuilder, path::PathBuf, time::Duration};
2
3use snafu::{ResultExt, Snafu};
4use vector_common::TimeZone;
5use vector_config::{configurable_component, impl_generate_config_from_default};
6
7use super::super::default_data_dir;
8use super::metrics_expiration::PerMetricSetExpiration;
9use super::Telemetry;
10use super::{proxy::ProxyConfig, AcknowledgementsConfig, LogSchema};
11use crate::serde::bool_or_struct;
12
13#[derive(Debug, Snafu)]
14pub(crate) enum DataDirError {
15    #[snafu(display("data_dir option required, but not given here or globally"))]
16    MissingDataDir,
17    #[snafu(display("data_dir {:?} does not exist", data_dir))]
18    DoesNotExist { data_dir: PathBuf },
19    #[snafu(display("data_dir {:?} is not writable", data_dir))]
20    NotWritable { data_dir: PathBuf },
21    #[snafu(display(
22        "Could not create subdirectory {:?} inside of data dir {:?}: {}",
23        subdir,
24        data_dir,
25        source
26    ))]
27    CouldNotCreate {
28        subdir: PathBuf,
29        data_dir: PathBuf,
30        source: std::io::Error,
31    },
32}
33
34/// Specifies the wildcard matching mode, relaxed allows configurations where wildcard doesn not match any existing inputs
35#[configurable_component]
36#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
37#[serde(rename_all = "lowercase")]
38pub enum WildcardMatching {
39    /// Strict matching (must match at least one existing input)
40    #[default]
41    Strict,
42
43    /// Relaxed matching (must match 0 or more inputs)
44    Relaxed,
45}
46
47/// Global configuration options.
48//
49// If this is modified, make sure those changes are reflected in the `ConfigBuilder::append`
50// function!
51#[configurable_component(global_option("global_option"))]
52#[derive(Clone, Debug, Default, PartialEq)]
53pub struct GlobalOptions {
54    /// The directory used for persisting Vector state data.
55    ///
56    /// This is the directory where Vector will store any state data, such as disk buffers, file
57    /// checkpoints, and more.
58    ///
59    /// Vector must have write permissions to this directory.
60    #[serde(default = "crate::default_data_dir")]
61    #[configurable(metadata(docs::common = false))]
62    pub data_dir: Option<PathBuf>,
63
64    /// Set wildcard matching mode for inputs
65    ///
66    /// Setting this to "relaxed" allows configurations with wildcards that do not match any inputs
67    /// to be accepted without causing an error.
68    #[serde(skip_serializing_if = "crate::serde::is_default")]
69    #[configurable(metadata(docs::common = false, docs::required = false))]
70    pub wildcard_matching: Option<WildcardMatching>,
71
72    /// Default log schema for all events.
73    ///
74    /// This is used if a component does not have its own specific log schema. All events use a log
75    /// schema, whether or not the default is used, to assign event fields on incoming events.
76    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
77    #[configurable(metadata(docs::common = false, docs::required = false))]
78    pub log_schema: LogSchema,
79
80    /// Telemetry options.
81    ///
82    /// Determines whether `source` and `service` tags should be emitted with the
83    /// `component_sent_*` and `component_received_*` events.
84    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85    #[configurable(metadata(docs::common = false, docs::required = false))]
86    pub telemetry: Telemetry,
87
88    /// The name of the time zone to apply to timestamp conversions that do not contain an explicit time zone.
89    ///
90    /// The time zone name may be any name in the [TZ database][tzdb] or `local` to indicate system
91    /// local time.
92    ///
93    /// Note that in Vector/VRL all timestamps are represented in UTC.
94    ///
95    /// [tzdb]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
96    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
97    #[configurable(metadata(docs::common = false))]
98    pub timezone: Option<TimeZone>,
99
100    #[configurable(derived)]
101    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102    #[configurable(metadata(docs::common = false, docs::required = false))]
103    pub proxy: ProxyConfig,
104
105    /// Controls how acknowledgements are handled for all sinks by default.
106    ///
107    /// See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event
108    /// acknowledgement.
109    ///
110    /// [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/
111    #[serde(
112        default,
113        deserialize_with = "bool_or_struct",
114        skip_serializing_if = "crate::serde::is_default"
115    )]
116    #[configurable(metadata(docs::common = true, docs::required = false))]
117    pub acknowledgements: AcknowledgementsConfig,
118
119    /// The amount of time, in seconds, that internal metrics will persist after having not been
120    /// updated before they expire and are removed.
121    ///
122    /// Deprecated: use expire_metrics_secs instead
123    #[configurable(deprecated)]
124    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
125    #[configurable(metadata(docs::hidden))]
126    pub expire_metrics: Option<Duration>,
127
128    /// The amount of time, in seconds, that internal metrics will persist after having not been
129    /// updated before they expire and are removed.
130    ///
131    /// Set this to a value larger than your `internal_metrics` scrape interval (default 5 minutes)
132    /// so metrics live long enough to be emitted and captured.
133    #[serde(skip_serializing_if = "crate::serde::is_default")]
134    #[configurable(metadata(docs::common = false, docs::required = false))]
135    pub expire_metrics_secs: Option<f64>,
136
137    /// This allows configuring different expiration intervals for different metric sets.
138    /// By default this is empty and any metric not matched by one of these sets will use
139    /// the global default value, defined using `expire_metrics_secs`.
140    #[serde(skip_serializing_if = "crate::serde::is_default")]
141    pub expire_metrics_per_metric_set: Option<Vec<PerMetricSetExpiration>>,
142}
143
144impl_generate_config_from_default!(GlobalOptions);
145
146impl GlobalOptions {
147    /// Resolve the `data_dir` option in either the global or local config, and
148    /// validate that it exists and is writable.
149    ///
150    /// # Errors
151    ///
152    /// Function will error if it is unable to make data directory.
153    pub fn resolve_and_validate_data_dir(
154        &self,
155        local_data_dir: Option<&PathBuf>,
156    ) -> crate::Result<PathBuf> {
157        let data_dir = local_data_dir
158            .or(self.data_dir.as_ref())
159            .ok_or(DataDirError::MissingDataDir)
160            .map_err(Box::new)?
161            .clone();
162        if !data_dir.exists() {
163            return Err(DataDirError::DoesNotExist { data_dir }.into());
164        }
165        let readonly = std::fs::metadata(&data_dir)
166            .map(|meta| meta.permissions().readonly())
167            .unwrap_or(true);
168        if readonly {
169            return Err(DataDirError::NotWritable { data_dir }.into());
170        }
171        Ok(data_dir)
172    }
173
174    /// Resolve the `data_dir` option using `resolve_and_validate_data_dir` and
175    /// then ensure a named subdirectory exists.
176    ///
177    /// # Errors
178    ///
179    /// Function will error if it is unable to make data subdirectory.
180    pub fn resolve_and_make_data_subdir(
181        &self,
182        local: Option<&PathBuf>,
183        subdir: &str,
184    ) -> crate::Result<PathBuf> {
185        let data_dir = self.resolve_and_validate_data_dir(local)?;
186
187        let mut data_subdir = data_dir.clone();
188        data_subdir.push(subdir);
189
190        DirBuilder::new()
191            .recursive(true)
192            .create(&data_subdir)
193            .with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
194        Ok(data_subdir)
195    }
196
197    /// Merge a second global configuration into self, and return the new merged data.
198    ///
199    /// # Errors
200    ///
201    /// Returns a list of textual errors if there is a merge conflict between the two global
202    /// configs.
203    pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
204        let mut errors = Vec::new();
205
206        if conflicts(
207            self.wildcard_matching.as_ref(),
208            with.wildcard_matching.as_ref(),
209        ) {
210            errors.push("conflicting values for 'wildcard_matching' found".to_owned());
211        }
212
213        if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
214            errors.push("conflicting values for 'proxy.http' found".to_owned());
215        }
216
217        if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
218            errors.push("conflicting values for 'proxy.https' found".to_owned());
219        }
220
221        if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
222            errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
223        }
224
225        if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
226            errors.push("conflicting values for 'timezone' found".to_owned());
227        }
228
229        if conflicts(
230            self.acknowledgements.enabled.as_ref(),
231            with.acknowledgements.enabled.as_ref(),
232        ) {
233            errors.push("conflicting values for 'acknowledgements' found".to_owned());
234        }
235
236        if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
237            errors.push("conflicting values for 'expire_metrics' found".to_owned());
238        }
239
240        if conflicts(
241            self.expire_metrics_secs.as_ref(),
242            with.expire_metrics_secs.as_ref(),
243        ) {
244            errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
245        }
246
247        let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
248            with.data_dir
249        } else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
250            // If two configs both set 'data_dir' and have conflicting values
251            // we consider this an error.
252            errors.push("conflicting values for 'data_dir' found".to_owned());
253            None
254        } else {
255            self.data_dir.clone()
256        };
257
258        // If the user has multiple config files, we must *merge* log schemas
259        // until we meet a conflict, then we are allowed to error.
260        let mut log_schema = self.log_schema.clone();
261        if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
262            errors.extend(merge_errors);
263        }
264
265        let mut telemetry = self.telemetry.clone();
266        telemetry.merge(&with.telemetry);
267
268        let merged_expire_metrics_per_metric_set = match (
269            &self.expire_metrics_per_metric_set,
270            &with.expire_metrics_per_metric_set,
271        ) {
272            (Some(a), Some(b)) => Some(a.iter().chain(b).cloned().collect()),
273            (Some(a), None) => Some(a.clone()),
274            (None, Some(b)) => Some(b.clone()),
275            (None, None) => None,
276        };
277
278        if errors.is_empty() {
279            Ok(Self {
280                data_dir,
281                wildcard_matching: self.wildcard_matching.or(with.wildcard_matching),
282                log_schema,
283                telemetry,
284                acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
285                timezone: self.timezone.or(with.timezone),
286                proxy: self.proxy.merge(&with.proxy),
287                expire_metrics: self.expire_metrics.or(with.expire_metrics),
288                expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
289                expire_metrics_per_metric_set: merged_expire_metrics_per_metric_set,
290            })
291        } else {
292            Err(errors)
293        }
294    }
295
296    /// Get the configured time zone, using "local" time if none is set.
297    pub fn timezone(&self) -> TimeZone {
298        self.timezone.unwrap_or(TimeZone::Local)
299    }
300}
301
302fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
303    matches!((this, that), (Some(this), Some(that)) if this != that)
304}
305
306#[cfg(test)]
307mod tests {
308    use std::fmt::Debug;
309
310    use chrono_tz::Tz;
311
312    use super::*;
313
314    #[test]
315    fn merges_data_dir() {
316        let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
317
318        assert_eq!(merge(None, None), Ok(default_data_dir()));
319        assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
320        assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
321        assert_eq!(
322            merge(Some("/test3"), Some("/test3")),
323            Ok(Some("/test3".into()))
324        );
325        assert_eq!(
326            merge(Some("/test4"), Some("/test5")),
327            Err(vec!["conflicting values for 'data_dir' found".into()])
328        );
329    }
330
331    #[test]
332    fn merges_timezones() {
333        let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
334
335        assert_eq!(merge(None, None), Ok(TimeZone::Local));
336        assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
337        assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
338        assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
339        assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
340        assert_eq!(
341            merge(None, Some("EST5EDT")),
342            Ok(TimeZone::Named(Tz::EST5EDT))
343        );
344        assert_eq!(
345            merge(Some("UTC"), Some("UTC")),
346            Ok(TimeZone::Named(Tz::UTC))
347        );
348        assert_eq!(
349            merge(Some("CST6CDT"), Some("GMT")),
350            Err(vec!["conflicting values for 'timezone' found".into()])
351        );
352    }
353
354    #[test]
355    fn merges_proxy() {
356        // We use the `.http` settings as a proxy for the other settings, as they are all compared
357        // for equality above.
358        let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
359
360        assert_eq!(merge(None, None), Ok(None));
361        assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
362        assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
363        assert_eq!(
364            merge(Some("test3"), Some("test3")),
365            Ok(Some("test3".into()))
366        );
367        assert_eq!(
368            merge(Some("test4"), Some("test5")),
369            Err(vec!["conflicting values for 'proxy.http' found".into()])
370        );
371    }
372
373    #[test]
374    fn merges_acknowledgements() {
375        let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
376
377        assert_eq!(merge(None, None), Ok(None.into()));
378        assert_eq!(merge(Some(false), None), Ok(false.into()));
379        assert_eq!(merge(Some(true), None), Ok(true.into()));
380        assert_eq!(merge(None, Some(false)), Ok(false.into()));
381        assert_eq!(merge(None, Some(true)), Ok(true.into()));
382        assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
383        assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
384        assert_eq!(
385            merge(Some(false), Some(true)),
386            Err(vec![
387                "conflicting values for 'acknowledgements' found".into()
388            ])
389        );
390        assert_eq!(
391            merge(Some(true), Some(false)),
392            Err(vec![
393                "conflicting values for 'acknowledgements' found".into()
394            ])
395        );
396    }
397
398    #[test]
399    fn merges_expire_metrics() {
400        let merge = |a, b| {
401            merge("expire_metrics_secs", a, b, |result| {
402                result.expire_metrics_secs
403            })
404        };
405
406        assert_eq!(merge(None, None), Ok(None));
407        assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
408        assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
409        assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
410        assert_eq!(
411            merge(Some(4.0), Some(5.0)),
412            Err(vec![
413                "conflicting values for 'expire_metrics_secs' found".into()
414            ])
415        );
416    }
417
418    fn merge<P: Debug, T>(
419        name: &str,
420        dd1: Option<P>,
421        dd2: Option<P>,
422        result: impl Fn(GlobalOptions) -> T,
423    ) -> Result<T, Vec<String>> {
424        // Use TOML parsing to match the behavior of what a user would actually configure.
425        make_config(name, dd1)
426            .merge(make_config(name, dd2))
427            .map(result)
428    }
429
430    fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
431        toml::from_str(&value.map_or(String::new(), |value| format!(r"{name} = {value:?}")))
432            .unwrap()
433    }
434}