vector/sinks/datadog/traces/
config.rs

1use std::sync::{Arc, Mutex};
2
3use http::Uri;
4use indoc::indoc;
5use snafu::ResultExt;
6use tokio::sync::oneshot::{Sender, channel};
7use tower::ServiceBuilder;
8use vector_lib::{
9    config::{AcknowledgementsConfig, proxy::ProxyConfig},
10    configurable::configurable_component,
11};
12
13use super::{
14    apm_stats::{Aggregator, flush_apm_stats_thread},
15    service::TraceApiRetry,
16};
17use crate::{
18    common::datadog,
19    config::{GenerateConfig, Input, SinkConfig, SinkContext},
20    http::HttpClient,
21    sinks::{
22        Healthcheck, UriParseSnafu, VectorSink,
23        datadog::{
24            DatadogCommonConfig, LocalDatadogCommonConfig,
25            traces::{
26                request_builder::DatadogTracesRequestBuilder, service::TraceApiService,
27                sink::TracesSink,
28            },
29        },
30        util::{
31            BatchConfig, Compression, SinkBatchSettings, TowerRequestConfig,
32            service::ServiceBuilderExt,
33        },
34    },
35    tls::{MaybeTlsSettings, TlsEnableableConfig},
36};
37
38// The Datadog API has a hard limit of 3.2MB for uncompressed payloads.
39// Beyond this limit the payload will be ignored, enforcing a slight lower
40// limit as a safety margin.
41pub const BATCH_GOAL_BYTES: usize = 3_000_000;
42pub const BATCH_MAX_EVENTS: usize = 1_000;
43pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;
44
45pub const PAYLOAD_LIMIT: usize = 3_200_000;
46
47#[derive(Clone, Copy, Debug, Default)]
48pub struct DatadogTracesDefaultBatchSettings;
49
50impl SinkBatchSettings for DatadogTracesDefaultBatchSettings {
51    const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
52    const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
53    const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
54}
55
56/// Configuration for the `datadog_traces` sink.
57#[configurable_component(sink("datadog_traces", "Publish trace events to Datadog."))]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields)]
60pub struct DatadogTracesConfig {
61    #[serde(flatten)]
62    pub local_dd_common: LocalDatadogCommonConfig,
63
64    #[configurable(derived)]
65    #[serde(default)]
66    pub compression: Option<Compression>,
67
68    #[configurable(derived)]
69    #[serde(default)]
70    pub batch: BatchConfig<DatadogTracesDefaultBatchSettings>,
71
72    #[configurable(derived)]
73    #[serde(default)]
74    pub request: TowerRequestConfig,
75}
76
77impl GenerateConfig for DatadogTracesConfig {
78    fn generate_config() -> toml::Value {
79        toml::from_str(indoc! {r#"
80            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
81        "#})
82        .unwrap()
83    }
84}
85
86/// Datadog traces API has two routes: one for traces and another one for stats.
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum DatadogTracesEndpoint {
89    Traces,
90    #[allow(dead_code)] // This will be used when APM stats will be generated
91    APMStats,
92}
93
94/// Store traces & APM stats endpoints actual URIs.
95#[derive(Clone)]
96pub struct DatadogTracesEndpointConfiguration {
97    traces_endpoint: Uri,
98    stats_endpoint: Uri,
99}
100
101impl DatadogTracesEndpointConfiguration {
102    pub fn get_uri_for_endpoint(&self, endpoint: DatadogTracesEndpoint) -> Uri {
103        match endpoint {
104            DatadogTracesEndpoint::Traces => self.traces_endpoint.clone(),
105            DatadogTracesEndpoint::APMStats => self.stats_endpoint.clone(),
106        }
107    }
108}
109
110impl DatadogTracesConfig {
111    fn get_base_uri(&self, dd_common: &DatadogCommonConfig) -> String {
112        dd_common
113            .endpoint
114            .clone()
115            .unwrap_or_else(|| format!("https://trace.agent.{}", dd_common.site))
116    }
117
118    fn generate_traces_endpoint_configuration(
119        &self,
120        dd_common: &DatadogCommonConfig,
121    ) -> crate::Result<DatadogTracesEndpointConfiguration> {
122        let base_uri = self.get_base_uri(dd_common);
123        let traces_endpoint = build_uri(&base_uri, "/api/v0.2/traces")?;
124        let stats_endpoint = build_uri(&base_uri, "/api/v0.2/stats")?;
125
126        Ok(DatadogTracesEndpointConfiguration {
127            traces_endpoint,
128            stats_endpoint,
129        })
130    }
131
132    pub fn build_sink(
133        &self,
134        dd_common: &DatadogCommonConfig,
135        client: HttpClient,
136    ) -> crate::Result<VectorSink> {
137        let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
138        let request_limits = self.request.into_settings();
139        let endpoints = self.generate_traces_endpoint_configuration(dd_common)?;
140
141        let batcher_settings = self
142            .batch
143            .validate()?
144            .limit_max_bytes(BATCH_GOAL_BYTES)?
145            .limit_max_events(BATCH_MAX_EVENTS)?
146            .into_batcher_settings()?;
147
148        let service = ServiceBuilder::new()
149            .settings(request_limits, TraceApiRetry)
150            .service(TraceApiService::new(client.clone()));
151
152        // Object responsible for caching/processing APM stats from incoming trace events.
153        let apm_stats_aggregator =
154            Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key))));
155
156        let compression = self.compression.unwrap_or_else(Compression::gzip_default);
157
158        let request_builder = DatadogTracesRequestBuilder::new(
159            Arc::clone(&default_api_key),
160            endpoints.clone(),
161            compression,
162            PAYLOAD_LIMIT,
163            Arc::clone(&apm_stats_aggregator),
164        )?;
165
166        // shutdown= Sender that the sink signals when input stream is exhausted.
167        // tripwire= Receiver that APM stats flush thread listens for exit signal on.
168        let (shutdown, tripwire) = channel::<Sender<()>>();
169
170        let sink = TracesSink::new(
171            service,
172            request_builder,
173            batcher_settings,
174            shutdown,
175            self.get_protocol(dd_common),
176        );
177
178        // Send the APM stats payloads independently of the sink framework.
179        // This is necessary to comply with what the APM stats backend of Datadog expects with
180        // respect to receiving stats payloads.
181        tokio::spawn(flush_apm_stats_thread(
182            tripwire,
183            client,
184            compression,
185            endpoints,
186            Arc::clone(&apm_stats_aggregator),
187        ));
188
189        Ok(VectorSink::from_event_streamsink(sink))
190    }
191
192    pub fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
193        let default_tls_config;
194
195        let tls_settings = MaybeTlsSettings::from_config(
196            Some(match self.local_dd_common.tls.as_ref() {
197                Some(config) => config,
198                None => {
199                    default_tls_config = TlsEnableableConfig::enabled();
200                    &default_tls_config
201                }
202            }),
203            false,
204        )?;
205        Ok(HttpClient::new(tls_settings, proxy)?)
206    }
207
208    fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
209        build_uri(&self.get_base_uri(dd_common), "")
210            .unwrap()
211            .scheme_str()
212            .unwrap_or("http")
213            .to_string()
214    }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "datadog_traces")]
219impl SinkConfig for DatadogTracesConfig {
220    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
221        let client = self.build_client(&cx.proxy)?;
222        let global = cx.extra_context.get_or_default::<datadog::Options>();
223        let dd_common = self.local_dd_common.with_globals(global)?;
224        let healthcheck = dd_common.build_healthcheck(client.clone())?;
225        let sink = self.build_sink(&dd_common, client)?;
226
227        Ok((sink, healthcheck))
228    }
229
230    fn input(&self) -> Input {
231        Input::trace()
232    }
233
234    fn acknowledgements(&self) -> &AcknowledgementsConfig {
235        &self.local_dd_common.acknowledgements
236    }
237}
238
239fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
240    let result = format!("{host}{endpoint}")
241        .parse::<Uri>()
242        .context(UriParseSnafu)?;
243    Ok(result)
244}
245
246#[cfg(test)]
247mod test {
248    use super::DatadogTracesConfig;
249
250    #[test]
251    fn generate_config() {
252        crate::test_util::test_generate_config::<DatadogTracesConfig>();
253    }
254}