use std::{convert::TryFrom, sync::Arc};
use indoc::indoc;
use tower::ServiceBuilder;
use vector_lib::{
config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
};
use vrl::value::Kind;
use crate::{
common::datadog,
http::HttpClient,
schema,
sinks::{
datadog::{logs::service::LogApiService, DatadogCommonConfig, LocalDatadogCommonConfig},
prelude::*,
util::http::RequestConfig,
},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};
use super::{service::LogApiRetry, sink::LogSinkBuilder};
pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
pub const BATCH_GOAL_BYTES: usize = 4_250_000;
pub const BATCH_MAX_EVENTS: usize = 1_000;
pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogLogsDefaultBatchSettings;
impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
}
#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct DatadogLogsConfig {
#[serde(flatten)]
pub local_dd_common: LocalDatadogCommonConfig,
#[configurable(derived)]
#[serde(default)]
pub compression: Option<Compression>,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub encoding: Transformer,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
pub request: RequestConfig,
}
impl GenerateConfig for DatadogLogsConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
"#})
.unwrap()
}
}
impl DatadogLogsConfig {
fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
let base_url = dd_common
.endpoint
.clone()
.unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
http::Uri::try_from(format!("{}/api/v2/logs", base_url)).expect("URI not valid")
}
pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
self.get_uri(dd_common)
.scheme_str()
.unwrap_or("http")
.to_string()
}
pub fn build_processor(
&self,
dd_common: &DatadogCommonConfig,
client: HttpClient,
dd_evp_origin: String,
) -> crate::Result<VectorSink> {
let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
let request_limits = self.request.tower.into_settings();
let batch = self
.batch
.validate()?
.limit_max_bytes(BATCH_GOAL_BYTES)?
.limit_max_events(BATCH_MAX_EVENTS)?
.into_batcher_settings()?;
let service = ServiceBuilder::new()
.settings(request_limits, LogApiRetry)
.service(LogApiService::new(
client,
self.get_uri(dd_common),
self.request.headers.clone(),
dd_evp_origin,
)?);
let encoding = self.encoding.clone();
let protocol = self.get_protocol(dd_common);
let sink = LogSinkBuilder::new(encoding, service, default_api_key, batch, protocol)
.compression(self.compression.unwrap_or_default())
.build();
Ok(VectorSink::from_event_streamsink(sink))
}
pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let tls_settings = MaybeTlsSettings::from_config(
&Some(
self.local_dd_common
.tls
.clone()
.unwrap_or_else(TlsEnableableConfig::enabled),
),
false,
)?;
Ok(HttpClient::new(tls_settings, proxy)?)
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "datadog_logs")]
impl SinkConfig for DatadogLogsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let client = self.create_client(&cx.proxy)?;
let global = cx.extra_context.get_or_default::<datadog::Options>();
let dd_common = self.local_dd_common.with_globals(global)?;
let healthcheck = dd_common.build_healthcheck(client.clone())?;
let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
Ok((sink, healthcheck))
}
fn input(&self) -> Input {
let requirement = schema::Requirement::empty()
.optional_meaning(meaning::MESSAGE, Kind::bytes())
.optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
.optional_meaning(meaning::HOST, Kind::bytes())
.optional_meaning(meaning::SOURCE, Kind::bytes())
.optional_meaning(meaning::SEVERITY, Kind::bytes())
.optional_meaning(meaning::SERVICE, Kind::bytes())
.optional_meaning(meaning::TRACE_ID, Kind::bytes());
Input::log().with_schema_requirement(requirement)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.local_dd_common.acknowledgements
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::codecs::EncodingConfigWithFraming;
use crate::components::validation::prelude::*;
use vector_lib::{
codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
config::LogNamespace,
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<DatadogLogsConfig>();
}
impl ValidatableComponent for DatadogLogsConfig {
fn validation_configuration() -> ValidationConfiguration {
let endpoint = "http://127.0.0.1:9005".to_string();
let config = Self {
local_dd_common: LocalDatadogCommonConfig {
endpoint: Some(endpoint.clone()),
default_api_key: Some("unused".to_string().into()),
..Default::default()
},
..Default::default()
};
let encoding = EncodingConfigWithFraming::new(
None,
JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
.into(),
config.encoding.clone(),
);
let logs_endpoint = format!("{endpoint}/api/v2/logs");
let external_resource = ExternalResource::new(
ResourceDirection::Push,
HttpResourceConfig::from_parts(
http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
None,
),
encoding,
);
ValidationConfiguration::from_sink(
Self::NAME,
LogNamespace::Legacy,
vec![ComponentTestCaseConfig::from_sink(
config,
None,
Some(external_resource),
)],
)
}
}
register_validatable_component!(DatadogLogsConfig);
}