vector/sinks/datadog/traces/
config.rs

1use std::sync::{Arc, Mutex};
2
3use http::Uri;
4use indoc::indoc;
5use snafu::ResultExt;
6use tokio::sync::oneshot::{channel, Sender};
7use tower::ServiceBuilder;
8use vector_lib::config::{proxy::ProxyConfig, AcknowledgementsConfig};
9use vector_lib::configurable::configurable_component;
10
11use super::{
12    apm_stats::{flush_apm_stats_thread, Aggregator},
13    service::TraceApiRetry,
14};
15use crate::common::datadog;
16use crate::{
17    config::{GenerateConfig, Input, SinkConfig, SinkContext},
18    http::HttpClient,
19    sinks::{
20        datadog::{
21            traces::{
22                request_builder::DatadogTracesRequestBuilder, service::TraceApiService,
23                sink::TracesSink,
24            },
25            DatadogCommonConfig, LocalDatadogCommonConfig,
26        },
27        util::{
28            service::ServiceBuilderExt, BatchConfig, Compression, SinkBatchSettings,
29            TowerRequestConfig,
30        },
31        Healthcheck, UriParseSnafu, VectorSink,
32    },
33    tls::{MaybeTlsSettings, TlsEnableableConfig},
34};
35
36// The Datadog API has a hard limit of 3.2MB for uncompressed payloads.
37// Beyond this limit the payload will be ignored, enforcing a slight lower
38// limit as a safety margin.
39pub const BATCH_GOAL_BYTES: usize = 3_000_000;
40pub const BATCH_MAX_EVENTS: usize = 1_000;
41pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;
42
43pub const PAYLOAD_LIMIT: usize = 3_200_000;
44
45#[derive(Clone, Copy, Debug, Default)]
46pub struct DatadogTracesDefaultBatchSettings;
47
48impl SinkBatchSettings for DatadogTracesDefaultBatchSettings {
49    const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
50    const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
51    const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
52}
53
54/// Configuration for the `datadog_traces` sink.
55#[configurable_component(sink("datadog_traces", "Publish trace events to Datadog."))]
56#[derive(Clone, Debug, Default)]
57#[serde(deny_unknown_fields)]
58pub struct DatadogTracesConfig {
59    #[serde(flatten)]
60    pub local_dd_common: LocalDatadogCommonConfig,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub compression: Option<Compression>,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub batch: BatchConfig<DatadogTracesDefaultBatchSettings>,
69
70    #[configurable(derived)]
71    #[serde(default)]
72    pub request: TowerRequestConfig,
73}
74
75impl GenerateConfig for DatadogTracesConfig {
76    fn generate_config() -> toml::Value {
77        toml::from_str(indoc! {r#"
78            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
79        "#})
80        .unwrap()
81    }
82}
83
84/// Datadog traces API has two routes: one for traces and another one for stats.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
86pub enum DatadogTracesEndpoint {
87    Traces,
88    #[allow(dead_code)] // This will be used when APM stats will be generated
89    APMStats,
90}
91
92/// Store traces & APM stats endpoints actual URIs.
93#[derive(Clone)]
94pub struct DatadogTracesEndpointConfiguration {
95    traces_endpoint: Uri,
96    stats_endpoint: Uri,
97}
98
99impl DatadogTracesEndpointConfiguration {
100    pub fn get_uri_for_endpoint(&self, endpoint: DatadogTracesEndpoint) -> Uri {
101        match endpoint {
102            DatadogTracesEndpoint::Traces => self.traces_endpoint.clone(),
103            DatadogTracesEndpoint::APMStats => self.stats_endpoint.clone(),
104        }
105    }
106}
107
108impl DatadogTracesConfig {
109    fn get_base_uri(&self, dd_common: &DatadogCommonConfig) -> String {
110        dd_common
111            .endpoint
112            .clone()
113            .unwrap_or_else(|| format!("https://trace.agent.{}", dd_common.site))
114    }
115
116    fn generate_traces_endpoint_configuration(
117        &self,
118        dd_common: &DatadogCommonConfig,
119    ) -> crate::Result<DatadogTracesEndpointConfiguration> {
120        let base_uri = self.get_base_uri(dd_common);
121        let traces_endpoint = build_uri(&base_uri, "/api/v0.2/traces")?;
122        let stats_endpoint = build_uri(&base_uri, "/api/v0.2/stats")?;
123
124        Ok(DatadogTracesEndpointConfiguration {
125            traces_endpoint,
126            stats_endpoint,
127        })
128    }
129
130    pub fn build_sink(
131        &self,
132        dd_common: &DatadogCommonConfig,
133        client: HttpClient,
134    ) -> crate::Result<VectorSink> {
135        let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
136        let request_limits = self.request.into_settings();
137        let endpoints = self.generate_traces_endpoint_configuration(dd_common)?;
138
139        let batcher_settings = self
140            .batch
141            .validate()?
142            .limit_max_bytes(BATCH_GOAL_BYTES)?
143            .limit_max_events(BATCH_MAX_EVENTS)?
144            .into_batcher_settings()?;
145
146        let service = ServiceBuilder::new()
147            .settings(request_limits, TraceApiRetry)
148            .service(TraceApiService::new(client.clone()));
149
150        // Object responsible for caching/processing APM stats from incoming trace events.
151        let apm_stats_aggregator =
152            Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key))));
153
154        let compression = self.compression.unwrap_or_else(Compression::gzip_default);
155
156        let request_builder = DatadogTracesRequestBuilder::new(
157            Arc::clone(&default_api_key),
158            endpoints.clone(),
159            compression,
160            PAYLOAD_LIMIT,
161            Arc::clone(&apm_stats_aggregator),
162        )?;
163
164        // shutdown= Sender that the sink signals when input stream is exhausted.
165        // tripwire= Receiver that APM stats flush thread listens for exit signal on.
166        let (shutdown, tripwire) = channel::<Sender<()>>();
167
168        let sink = TracesSink::new(
169            service,
170            request_builder,
171            batcher_settings,
172            shutdown,
173            self.get_protocol(dd_common),
174        );
175
176        // Send the APM stats payloads independently of the sink framework.
177        // This is necessary to comply with what the APM stats backend of Datadog expects with
178        // respect to receiving stats payloads.
179        tokio::spawn(flush_apm_stats_thread(
180            tripwire,
181            client,
182            compression,
183            endpoints,
184            Arc::clone(&apm_stats_aggregator),
185        ));
186
187        Ok(VectorSink::from_event_streamsink(sink))
188    }
189
190    pub fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
191        let default_tls_config;
192
193        let tls_settings = MaybeTlsSettings::from_config(
194            Some(match self.local_dd_common.tls.as_ref() {
195                Some(config) => config,
196                None => {
197                    default_tls_config = TlsEnableableConfig::enabled();
198                    &default_tls_config
199                }
200            }),
201            false,
202        )?;
203        Ok(HttpClient::new(tls_settings, proxy)?)
204    }
205
206    fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
207        build_uri(&self.get_base_uri(dd_common), "")
208            .unwrap()
209            .scheme_str()
210            .unwrap_or("http")
211            .to_string()
212    }
213}
214
215#[async_trait::async_trait]
216#[typetag::serde(name = "datadog_traces")]
217impl SinkConfig for DatadogTracesConfig {
218    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
219        let client = self.build_client(&cx.proxy)?;
220        let global = cx.extra_context.get_or_default::<datadog::Options>();
221        let dd_common = self.local_dd_common.with_globals(global)?;
222        let healthcheck = dd_common.build_healthcheck(client.clone())?;
223        let sink = self.build_sink(&dd_common, client)?;
224
225        Ok((sink, healthcheck))
226    }
227
228    fn input(&self) -> Input {
229        Input::trace()
230    }
231
232    fn acknowledgements(&self) -> &AcknowledgementsConfig {
233        &self.local_dd_common.acknowledgements
234    }
235}
236
237fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
238    let result = format!("{host}{endpoint}")
239        .parse::<Uri>()
240        .context(UriParseSnafu)?;
241    Ok(result)
242}
243
244#[cfg(test)]
245mod test {
246    use super::DatadogTracesConfig;
247
248    #[test]
249    fn generate_config() {
250        crate::test_util::test_generate_config::<DatadogTracesConfig>();
251    }
252}