use std::sync::{Arc, Mutex};
use http::Uri;
use indoc::indoc;
use snafu::ResultExt;
use tokio::sync::oneshot::{channel, Sender};
use tower::ServiceBuilder;
use vector_lib::config::{proxy::ProxyConfig, AcknowledgementsConfig};
use vector_lib::configurable::configurable_component;
use super::{
apm_stats::{flush_apm_stats_thread, Aggregator},
service::TraceApiRetry,
};
use crate::common::datadog;
use crate::{
config::{GenerateConfig, Input, SinkConfig, SinkContext},
http::HttpClient,
sinks::{
datadog::{
traces::{
request_builder::DatadogTracesRequestBuilder, service::TraceApiService,
sink::TracesSink,
},
DatadogCommonConfig, LocalDatadogCommonConfig,
},
util::{
service::ServiceBuilderExt, BatchConfig, Compression, SinkBatchSettings,
TowerRequestConfig,
},
Healthcheck, UriParseSnafu, VectorSink,
},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};
pub const BATCH_GOAL_BYTES: usize = 3_000_000;
pub const BATCH_MAX_EVENTS: usize = 1_000;
pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;
pub const PAYLOAD_LIMIT: usize = 3_200_000;
#[derive(Clone, Copy, Debug, Default)]
pub struct DatadogTracesDefaultBatchSettings;
impl SinkBatchSettings for DatadogTracesDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
}
#[configurable_component(sink("datadog_traces", "Publish trace events to Datadog."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct DatadogTracesConfig {
#[serde(flatten)]
pub local_dd_common: LocalDatadogCommonConfig,
#[configurable(derived)]
#[serde(default)]
pub compression: Option<Compression>,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<DatadogTracesDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
}
impl GenerateConfig for DatadogTracesConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
"#})
.unwrap()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DatadogTracesEndpoint {
Traces,
#[allow(dead_code)] APMStats,
}
#[derive(Clone)]
pub struct DatadogTracesEndpointConfiguration {
traces_endpoint: Uri,
stats_endpoint: Uri,
}
impl DatadogTracesEndpointConfiguration {
pub fn get_uri_for_endpoint(&self, endpoint: DatadogTracesEndpoint) -> Uri {
match endpoint {
DatadogTracesEndpoint::Traces => self.traces_endpoint.clone(),
DatadogTracesEndpoint::APMStats => self.stats_endpoint.clone(),
}
}
}
impl DatadogTracesConfig {
fn get_base_uri(&self, dd_common: &DatadogCommonConfig) -> String {
dd_common
.endpoint
.clone()
.unwrap_or_else(|| format!("https://trace.agent.{}", dd_common.site))
}
fn generate_traces_endpoint_configuration(
&self,
dd_common: &DatadogCommonConfig,
) -> crate::Result<DatadogTracesEndpointConfiguration> {
let base_uri = self.get_base_uri(dd_common);
let traces_endpoint = build_uri(&base_uri, "/api/v0.2/traces")?;
let stats_endpoint = build_uri(&base_uri, "/api/v0.2/stats")?;
Ok(DatadogTracesEndpointConfiguration {
traces_endpoint,
stats_endpoint,
})
}
pub fn build_sink(
&self,
dd_common: &DatadogCommonConfig,
client: HttpClient,
) -> crate::Result<VectorSink> {
let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
let request_limits = self.request.into_settings();
let endpoints = self.generate_traces_endpoint_configuration(dd_common)?;
let batcher_settings = self
.batch
.validate()?
.limit_max_bytes(BATCH_GOAL_BYTES)?
.limit_max_events(BATCH_MAX_EVENTS)?
.into_batcher_settings()?;
let service = ServiceBuilder::new()
.settings(request_limits, TraceApiRetry)
.service(TraceApiService::new(client.clone()));
let apm_stats_aggregator =
Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key))));
let compression = self.compression.unwrap_or_else(Compression::gzip_default);
let request_builder = DatadogTracesRequestBuilder::new(
Arc::clone(&default_api_key),
endpoints.clone(),
compression,
PAYLOAD_LIMIT,
Arc::clone(&apm_stats_aggregator),
)?;
let (shutdown, tripwire) = channel::<Sender<()>>();
let sink = TracesSink::new(
service,
request_builder,
batcher_settings,
shutdown,
self.get_protocol(dd_common),
);
tokio::spawn(flush_apm_stats_thread(
tripwire,
client,
compression,
endpoints,
Arc::clone(&apm_stats_aggregator),
));
Ok(VectorSink::from_event_streamsink(sink))
}
pub fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let tls_settings = MaybeTlsSettings::from_config(
&Some(
self.local_dd_common
.tls
.clone()
.unwrap_or_else(TlsEnableableConfig::enabled),
),
false,
)?;
Ok(HttpClient::new(tls_settings, proxy)?)
}
fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
build_uri(&self.get_base_uri(dd_common), "")
.unwrap()
.scheme_str()
.unwrap_or("http")
.to_string()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "datadog_traces")]
impl SinkConfig for DatadogTracesConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let client = self.build_client(&cx.proxy)?;
let global = cx.extra_context.get_or_default::<datadog::Options>();
let dd_common = self.local_dd_common.with_globals(global)?;
let healthcheck = dd_common.build_healthcheck(client.clone())?;
let sink = self.build_sink(&dd_common, client)?;
Ok((sink, healthcheck))
}
fn input(&self) -> Input {
Input::trace()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.local_dd_common.acknowledgements
}
}
fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
let result = format!("{}{}", host, endpoint)
.parse::<Uri>()
.context(UriParseSnafu)?;
Ok(result)
}
#[cfg(test)]
mod test {
use super::DatadogTracesConfig;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<DatadogTracesConfig>();
}
}