use futures_util::FutureExt;
use http::{Request, StatusCode, Uri};
use hyper::body::Body;
use snafu::Snafu;
use vector_lib::{
config::AcknowledgementsConfig, configurable::configurable_component,
sensitive_string::SensitiveString, tls::TlsEnableableConfig,
};
use super::Healthcheck;
use crate::{
common::datadog,
http::{HttpClient, HttpError},
sinks::HealthcheckError,
};
#[cfg(feature = "sinks-datadog_events")]
pub mod events;
#[cfg(feature = "sinks-datadog_logs")]
pub mod logs;
#[cfg(feature = "sinks-datadog_metrics")]
pub mod metrics;
#[cfg(any(
all(feature = "sinks-datadog_logs", feature = "test-utils"),
all(feature = "sinks-datadog_metrics", feature = "test-utils"),
all(feature = "sinks-datadog_logs", test),
all(feature = "sinks-datadog_metrics", test)
))]
pub mod test_utils;
#[cfg(feature = "sinks-datadog_traces")]
pub mod traces;
#[configurable_component]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct LocalDatadogCommonConfig {
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "http://127.0.0.1:8080"))]
#[configurable(metadata(docs::examples = "http://example.com:12345"))]
#[serde(default)]
pub endpoint: Option<String>,
#[configurable(metadata(docs::examples = "us3.datadoghq.com"))]
#[configurable(metadata(docs::examples = "datadoghq.eu"))]
pub site: Option<String>,
#[configurable(metadata(docs::examples = "${DATADOG_API_KEY_ENV_VAR}"))]
#[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
pub default_api_key: Option<SensitiveString>,
#[configurable(derived)]
#[serde(default)]
pub tls: Option<TlsEnableableConfig>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}
impl LocalDatadogCommonConfig {
pub fn new(
endpoint: Option<String>,
site: Option<String>,
default_api_key: Option<SensitiveString>,
) -> Self {
Self {
endpoint,
site,
default_api_key,
..Default::default()
}
}
pub fn with_globals(
&self,
config: datadog::Options,
) -> Result<DatadogCommonConfig, ConfigurationError> {
Ok(DatadogCommonConfig {
endpoint: self.endpoint.clone(),
site: self.site.clone().unwrap_or(config.site),
default_api_key: self
.default_api_key
.clone()
.or(config.api_key)
.ok_or(ConfigurationError::ApiKeyRequired)?,
acknowledgements: self.acknowledgements,
})
}
}
#[derive(Debug, Snafu, PartialEq, Eq)]
pub enum ConfigurationError {
#[snafu(display("API Key must be specified."))]
ApiKeyRequired,
}
#[derive(Clone, Debug, Default)]
pub struct DatadogCommonConfig {
pub endpoint: Option<String>,
pub site: String,
pub default_api_key: SensitiveString,
pub acknowledgements: AcknowledgementsConfig,
}
impl DatadogCommonConfig {
pub fn build_healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
let validate_endpoint = self.get_api_endpoint("/api/v1/validate")?;
let api_key: String = self.default_api_key.clone().into();
Ok(build_healthcheck_future(client, validate_endpoint, api_key).boxed())
}
fn get_api_endpoint(&self, path: &str) -> crate::Result<Uri> {
let base = datadog::get_api_base_endpoint(self.endpoint.as_deref(), self.site.as_str());
[&base, path].join("").parse().map_err(Into::into)
}
}
async fn build_healthcheck_future(
client: HttpClient,
validate_endpoint: Uri,
api_key: String,
) -> crate::Result<()> {
let request = Request::get(validate_endpoint)
.header("DD-API-KEY", api_key)
.body(hyper::Body::empty())
.unwrap();
let response = client.send(request).await?;
match response.status() {
StatusCode::OK => Ok(()),
other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
}
}
#[derive(Debug, Snafu)]
pub enum DatadogApiError {
#[snafu(display("Failed to make HTTP(S) request: {}", error))]
HttpError { error: HttpError },
#[snafu(display("Client request was not valid for unknown reasons."))]
BadRequest,
#[snafu(display("Client request was unauthorized."))]
Unauthorized,
#[snafu(display("Client request was forbidden."))]
Forbidden,
#[snafu(display("Client request timed out."))]
RequestTimeout,
#[snafu(display("Client sent a payload that is too large."))]
PayloadTooLarge,
#[snafu(display("Client sent too many requests (rate limiting)."))]
TooManyRequests,
#[snafu(display("Client request was invalid."))]
ClientError,
#[snafu(display("Server responded with an error."))]
ServerError,
}
impl DatadogApiError {
pub fn from_result(
result: Result<http::Response<Body>, HttpError>,
) -> Result<http::Response<Body>, DatadogApiError> {
match result {
Ok(response) => {
match response.status() {
s if s.is_success() => Ok(response),
StatusCode::BAD_REQUEST => Err(DatadogApiError::BadRequest),
StatusCode::UNAUTHORIZED => Err(DatadogApiError::Unauthorized),
StatusCode::FORBIDDEN => Err(DatadogApiError::Forbidden),
StatusCode::REQUEST_TIMEOUT => Err(DatadogApiError::RequestTimeout),
StatusCode::PAYLOAD_TOO_LARGE => Err(DatadogApiError::PayloadTooLarge),
StatusCode::TOO_MANY_REQUESTS => Err(DatadogApiError::TooManyRequests),
s if s.is_client_error() => Err(DatadogApiError::ClientError),
_ => Err(DatadogApiError::ServerError),
}
}
Err(error) => Err(DatadogApiError::HttpError { error }),
}
}
pub const fn is_retriable(&self) -> bool {
match self {
DatadogApiError::HttpError { error } => error.is_retriable(),
DatadogApiError::BadRequest | DatadogApiError::PayloadTooLarge => false,
DatadogApiError::ServerError
| DatadogApiError::ClientError
| DatadogApiError::Unauthorized
| DatadogApiError::Forbidden
| DatadogApiError::RequestTimeout
| DatadogApiError::TooManyRequests => true,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn local_config_with_no_overrides() {
let local = LocalDatadogCommonConfig::new(
None,
Some("potato.com".into()),
Some("key".to_string().into()),
);
let global = datadog::Options {
api_key: Some("more key".to_string().into()),
site: "tomato.com".into(),
};
let overriden = local.with_globals(global).unwrap();
assert_eq!(None, overriden.endpoint);
assert_eq!("potato.com".to_string(), overriden.site);
assert_eq!(
SensitiveString::from("key".to_string()),
overriden.default_api_key
);
}
#[test]
fn local_config_with_overrides() {
let local = LocalDatadogCommonConfig::new(None, None, None);
let global = datadog::Options {
api_key: Some("more key".to_string().into()),
site: "tomato.com".into(),
};
let overriden = local.with_globals(global).unwrap();
assert_eq!(None, overriden.endpoint);
assert_eq!("tomato.com".to_string(), overriden.site);
assert_eq!(
SensitiveString::from("more key".to_string()),
overriden.default_api_key
);
}
#[test]
fn no_api_key() {
let local = LocalDatadogCommonConfig::new(None, None, None);
let global = datadog::Options {
api_key: None,
site: "tomato.com".into(),
};
let error = local.with_globals(global).unwrap_err();
assert_eq!(ConfigurationError::ApiKeyRequired, error);
}
}