use std::{fs::DirBuilder, path::PathBuf, time::Duration};
use snafu::{ResultExt, Snafu};
use vector_common::TimeZone;
use vector_config::configurable_component;
use super::super::default_data_dir;
use super::Telemetry;
use super::{proxy::ProxyConfig, AcknowledgementsConfig, LogSchema};
use crate::serde::bool_or_struct;
#[derive(Debug, Snafu)]
pub(crate) enum DataDirError {
#[snafu(display("data_dir option required, but not given here or globally"))]
MissingDataDir,
#[snafu(display("data_dir {:?} does not exist", data_dir))]
DoesNotExist { data_dir: PathBuf },
#[snafu(display("data_dir {:?} is not writable", data_dir))]
NotWritable { data_dir: PathBuf },
#[snafu(display(
"Could not create subdirectory {:?} inside of data dir {:?}: {}",
subdir,
data_dir,
source
))]
CouldNotCreate {
subdir: PathBuf,
data_dir: PathBuf,
source: std::io::Error,
},
}
#[configurable_component]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct GlobalOptions {
#[serde(default = "crate::default_data_dir")]
pub data_dir: Option<PathBuf>,
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub log_schema: LogSchema,
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub telemetry: Telemetry,
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub timezone: Option<TimeZone>,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub proxy: ProxyConfig,
#[serde(
default,
deserialize_with = "bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
#[configurable(deprecated)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub expire_metrics: Option<Duration>,
#[serde(skip_serializing_if = "crate::serde::is_default")]
pub expire_metrics_secs: Option<f64>,
}
impl GlobalOptions {
pub fn resolve_and_validate_data_dir(
&self,
local_data_dir: Option<&PathBuf>,
) -> crate::Result<PathBuf> {
let data_dir = local_data_dir
.or(self.data_dir.as_ref())
.ok_or(DataDirError::MissingDataDir)
.map_err(Box::new)?
.clone();
if !data_dir.exists() {
return Err(DataDirError::DoesNotExist { data_dir }.into());
}
let readonly = std::fs::metadata(&data_dir)
.map(|meta| meta.permissions().readonly())
.unwrap_or(true);
if readonly {
return Err(DataDirError::NotWritable { data_dir }.into());
}
Ok(data_dir)
}
pub fn resolve_and_make_data_subdir(
&self,
local: Option<&PathBuf>,
subdir: &str,
) -> crate::Result<PathBuf> {
let data_dir = self.resolve_and_validate_data_dir(local)?;
let mut data_subdir = data_dir.clone();
data_subdir.push(subdir);
DirBuilder::new()
.recursive(true)
.create(&data_subdir)
.with_context(|_| CouldNotCreateSnafu { subdir, data_dir })?;
Ok(data_subdir)
}
pub fn merge(&self, with: Self) -> Result<Self, Vec<String>> {
let mut errors = Vec::new();
if conflicts(self.proxy.http.as_ref(), with.proxy.http.as_ref()) {
errors.push("conflicting values for 'proxy.http' found".to_owned());
}
if conflicts(self.proxy.https.as_ref(), with.proxy.https.as_ref()) {
errors.push("conflicting values for 'proxy.https' found".to_owned());
}
if !self.proxy.no_proxy.is_empty() && !with.proxy.no_proxy.is_empty() {
errors.push("conflicting values for 'proxy.no_proxy' found".to_owned());
}
if conflicts(self.timezone.as_ref(), with.timezone.as_ref()) {
errors.push("conflicting values for 'timezone' found".to_owned());
}
if conflicts(
self.acknowledgements.enabled.as_ref(),
with.acknowledgements.enabled.as_ref(),
) {
errors.push("conflicting values for 'acknowledgements' found".to_owned());
}
if conflicts(self.expire_metrics.as_ref(), with.expire_metrics.as_ref()) {
errors.push("conflicting values for 'expire_metrics' found".to_owned());
}
if conflicts(
self.expire_metrics_secs.as_ref(),
with.expire_metrics_secs.as_ref(),
) {
errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
}
let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
with.data_dir
} else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
errors.push("conflicting values for 'data_dir' found".to_owned());
None
} else {
self.data_dir.clone()
};
let mut log_schema = self.log_schema.clone();
if let Err(merge_errors) = log_schema.merge(&with.log_schema) {
errors.extend(merge_errors);
}
let mut telemetry = self.telemetry.clone();
telemetry.merge(&with.telemetry);
if errors.is_empty() {
Ok(Self {
data_dir,
log_schema,
telemetry,
acknowledgements: self.acknowledgements.merge_default(&with.acknowledgements),
timezone: self.timezone.or(with.timezone),
proxy: self.proxy.merge(&with.proxy),
expire_metrics: self.expire_metrics.or(with.expire_metrics),
expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
})
} else {
Err(errors)
}
}
pub fn timezone(&self) -> TimeZone {
self.timezone.unwrap_or(TimeZone::Local)
}
}
fn conflicts<T: PartialEq>(this: Option<&T>, that: Option<&T>) -> bool {
matches!((this, that), (Some(this), Some(that)) if this != that)
}
#[cfg(test)]
mod tests {
use std::fmt::Debug;
use chrono_tz::Tz;
use super::*;
#[test]
fn merges_data_dir() {
let merge = |a, b| merge("data_dir", a, b, |result| result.data_dir);
assert_eq!(merge(None, None), Ok(default_data_dir()));
assert_eq!(merge(Some("/test1"), None), Ok(Some("/test1".into())));
assert_eq!(merge(None, Some("/test2")), Ok(Some("/test2".into())));
assert_eq!(
merge(Some("/test3"), Some("/test3")),
Ok(Some("/test3".into()))
);
assert_eq!(
merge(Some("/test4"), Some("/test5")),
Err(vec!["conflicting values for 'data_dir' found".into()])
);
}
#[test]
fn merges_timezones() {
let merge = |a, b| merge("timezone", a, b, |result| result.timezone());
assert_eq!(merge(None, None), Ok(TimeZone::Local));
assert_eq!(merge(Some("local"), None), Ok(TimeZone::Local));
assert_eq!(merge(None, Some("local")), Ok(TimeZone::Local));
assert_eq!(merge(Some("local"), Some("local")), Ok(TimeZone::Local),);
assert_eq!(merge(Some("UTC"), None), Ok(TimeZone::Named(Tz::UTC)));
assert_eq!(
merge(None, Some("EST5EDT")),
Ok(TimeZone::Named(Tz::EST5EDT))
);
assert_eq!(
merge(Some("UTC"), Some("UTC")),
Ok(TimeZone::Named(Tz::UTC))
);
assert_eq!(
merge(Some("CST6CDT"), Some("GMT")),
Err(vec!["conflicting values for 'timezone' found".into()])
);
}
#[test]
fn merges_proxy() {
let merge = |a, b| merge("proxy.http", a, b, |result| result.proxy.http);
assert_eq!(merge(None, None), Ok(None));
assert_eq!(merge(Some("test1"), None), Ok(Some("test1".into())));
assert_eq!(merge(None, Some("test2")), Ok(Some("test2".into())));
assert_eq!(
merge(Some("test3"), Some("test3")),
Ok(Some("test3".into()))
);
assert_eq!(
merge(Some("test4"), Some("test5")),
Err(vec!["conflicting values for 'proxy.http' found".into()])
);
}
#[test]
fn merges_acknowledgements() {
let merge = |a, b| merge("acknowledgements", a, b, |result| result.acknowledgements);
assert_eq!(merge(None, None), Ok(None.into()));
assert_eq!(merge(Some(false), None), Ok(false.into()));
assert_eq!(merge(Some(true), None), Ok(true.into()));
assert_eq!(merge(None, Some(false)), Ok(false.into()));
assert_eq!(merge(None, Some(true)), Ok(true.into()));
assert_eq!(merge(Some(false), Some(false)), Ok(false.into()));
assert_eq!(merge(Some(true), Some(true)), Ok(true.into()));
assert_eq!(
merge(Some(false), Some(true)),
Err(vec![
"conflicting values for 'acknowledgements' found".into()
])
);
assert_eq!(
merge(Some(true), Some(false)),
Err(vec![
"conflicting values for 'acknowledgements' found".into()
])
);
}
#[test]
fn merges_expire_metrics() {
let merge = |a, b| {
merge("expire_metrics_secs", a, b, |result| {
result.expire_metrics_secs
})
};
assert_eq!(merge(None, None), Ok(None));
assert_eq!(merge(Some(1.0), None), Ok(Some(1.0)));
assert_eq!(merge(None, Some(2.0)), Ok(Some(2.0)));
assert_eq!(merge(Some(3.0), Some(3.0)), Ok(Some(3.0)));
assert_eq!(
merge(Some(4.0), Some(5.0)),
Err(vec![
"conflicting values for 'expire_metrics_secs' found".into()
])
);
}
fn merge<P: Debug, T>(
name: &str,
dd1: Option<P>,
dd2: Option<P>,
result: impl Fn(GlobalOptions) -> T,
) -> Result<T, Vec<String>> {
make_config(name, dd1)
.merge(make_config(name, dd2))
.map(result)
}
fn make_config<P: Debug>(name: &str, value: Option<P>) -> GlobalOptions {
toml::from_str(&value.map_or(String::new(), |value| format!(r#"{name} = {value:?}"#)))
.unwrap()
}
}