vector/sinks/datadog/traces/
config.rs1use std::sync::{Arc, Mutex};
2
3use http::Uri;
4use indoc::indoc;
5use snafu::ResultExt;
6use tokio::sync::oneshot::{Sender, channel};
7use tower::ServiceBuilder;
8use vector_lib::{
9 config::{AcknowledgementsConfig, proxy::ProxyConfig},
10 configurable::configurable_component,
11};
12
13use super::{
14 apm_stats::{Aggregator, flush_apm_stats_thread},
15 service::TraceApiRetry,
16};
17use crate::{
18 common::datadog,
19 config::{GenerateConfig, Input, SinkConfig, SinkContext},
20 http::HttpClient,
21 sinks::{
22 Healthcheck, UriParseSnafu, VectorSink,
23 datadog::{
24 DatadogCommonConfig, LocalDatadogCommonConfig,
25 traces::{
26 request_builder::DatadogTracesRequestBuilder, service::TraceApiService,
27 sink::TracesSink,
28 },
29 },
30 util::{
31 BatchConfig, Compression, SinkBatchSettings, TowerRequestConfig,
32 service::ServiceBuilderExt,
33 },
34 },
35 tls::{MaybeTlsSettings, TlsEnableableConfig},
36};
37
38pub const BATCH_GOAL_BYTES: usize = 3_000_000;
42pub const BATCH_MAX_EVENTS: usize = 1_000;
43pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;
44
45pub const PAYLOAD_LIMIT: usize = 3_200_000;
46
47#[derive(Clone, Copy, Debug, Default)]
48pub struct DatadogTracesDefaultBatchSettings;
49
50impl SinkBatchSettings for DatadogTracesDefaultBatchSettings {
51 const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
52 const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
53 const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
54}
55
56#[configurable_component(sink("datadog_traces", "Publish trace events to Datadog."))]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields)]
60pub struct DatadogTracesConfig {
61 #[serde(flatten)]
62 pub local_dd_common: LocalDatadogCommonConfig,
63
64 #[configurable(derived)]
65 #[serde(default)]
66 pub compression: Option<Compression>,
67
68 #[configurable(derived)]
69 #[serde(default)]
70 pub batch: BatchConfig<DatadogTracesDefaultBatchSettings>,
71
72 #[configurable(derived)]
73 #[serde(default)]
74 pub request: TowerRequestConfig,
75}
76
77impl GenerateConfig for DatadogTracesConfig {
78 fn generate_config() -> toml::Value {
79 toml::from_str(indoc! {r#"
80 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
81 "#})
82 .unwrap()
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum DatadogTracesEndpoint {
89 Traces,
90 #[allow(dead_code)] APMStats,
92}
93
94#[derive(Clone)]
96pub struct DatadogTracesEndpointConfiguration {
97 traces_endpoint: Uri,
98 stats_endpoint: Uri,
99}
100
101impl DatadogTracesEndpointConfiguration {
102 pub fn get_uri_for_endpoint(&self, endpoint: DatadogTracesEndpoint) -> Uri {
103 match endpoint {
104 DatadogTracesEndpoint::Traces => self.traces_endpoint.clone(),
105 DatadogTracesEndpoint::APMStats => self.stats_endpoint.clone(),
106 }
107 }
108}
109
110impl DatadogTracesConfig {
111 fn get_base_uri(&self, dd_common: &DatadogCommonConfig) -> String {
112 dd_common
113 .endpoint
114 .clone()
115 .unwrap_or_else(|| format!("https://trace.agent.{}", dd_common.site))
116 }
117
118 fn generate_traces_endpoint_configuration(
119 &self,
120 dd_common: &DatadogCommonConfig,
121 ) -> crate::Result<DatadogTracesEndpointConfiguration> {
122 let base_uri = self.get_base_uri(dd_common);
123 let traces_endpoint = build_uri(&base_uri, "/api/v0.2/traces")?;
124 let stats_endpoint = build_uri(&base_uri, "/api/v0.2/stats")?;
125
126 Ok(DatadogTracesEndpointConfiguration {
127 traces_endpoint,
128 stats_endpoint,
129 })
130 }
131
132 pub fn build_sink(
133 &self,
134 dd_common: &DatadogCommonConfig,
135 client: HttpClient,
136 ) -> crate::Result<VectorSink> {
137 let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
138 let request_limits = self.request.into_settings();
139 let endpoints = self.generate_traces_endpoint_configuration(dd_common)?;
140
141 let batcher_settings = self
142 .batch
143 .validate()?
144 .limit_max_bytes(BATCH_GOAL_BYTES)?
145 .limit_max_events(BATCH_MAX_EVENTS)?
146 .into_batcher_settings()?;
147
148 let service = ServiceBuilder::new()
149 .settings(request_limits, TraceApiRetry)
150 .service(TraceApiService::new(client.clone()));
151
152 let apm_stats_aggregator =
154 Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key))));
155
156 let compression = self.compression.unwrap_or_else(Compression::gzip_default);
157
158 let request_builder = DatadogTracesRequestBuilder::new(
159 Arc::clone(&default_api_key),
160 endpoints.clone(),
161 compression,
162 PAYLOAD_LIMIT,
163 Arc::clone(&apm_stats_aggregator),
164 )?;
165
166 let (shutdown, tripwire) = channel::<Sender<()>>();
169
170 let sink = TracesSink::new(
171 service,
172 request_builder,
173 batcher_settings,
174 shutdown,
175 self.get_protocol(dd_common),
176 );
177
178 tokio::spawn(flush_apm_stats_thread(
182 tripwire,
183 client,
184 compression,
185 endpoints,
186 Arc::clone(&apm_stats_aggregator),
187 ));
188
189 Ok(VectorSink::from_event_streamsink(sink))
190 }
191
192 pub fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
193 let default_tls_config;
194
195 let tls_settings = MaybeTlsSettings::from_config(
196 Some(match self.local_dd_common.tls.as_ref() {
197 Some(config) => config,
198 None => {
199 default_tls_config = TlsEnableableConfig::enabled();
200 &default_tls_config
201 }
202 }),
203 false,
204 )?;
205 Ok(HttpClient::new(tls_settings, proxy)?)
206 }
207
208 fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
209 build_uri(&self.get_base_uri(dd_common), "")
210 .unwrap()
211 .scheme_str()
212 .unwrap_or("http")
213 .to_string()
214 }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "datadog_traces")]
219impl SinkConfig for DatadogTracesConfig {
220 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
221 let client = self.build_client(&cx.proxy)?;
222 let global = cx.extra_context.get_or_default::<datadog::Options>();
223 let dd_common = self.local_dd_common.with_globals(global)?;
224 let healthcheck = dd_common.build_healthcheck(client.clone())?;
225 let sink = self.build_sink(&dd_common, client)?;
226
227 Ok((sink, healthcheck))
228 }
229
230 fn input(&self) -> Input {
231 Input::trace()
232 }
233
234 fn acknowledgements(&self) -> &AcknowledgementsConfig {
235 &self.local_dd_common.acknowledgements
236 }
237}
238
239fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
240 let result = format!("{host}{endpoint}")
241 .parse::<Uri>()
242 .context(UriParseSnafu)?;
243 Ok(result)
244}
245
246#[cfg(test)]
247mod test {
248 use super::DatadogTracesConfig;
249
250 #[test]
251 fn generate_config() {
252 crate::test_util::test_generate_config::<DatadogTracesConfig>();
253 }
254}