vector/sinks/datadog/traces/
config.rs1use std::sync::{Arc, Mutex};
2
3use http::Uri;
4use indoc::indoc;
5use snafu::ResultExt;
6use tokio::sync::oneshot::{channel, Sender};
7use tower::ServiceBuilder;
8use vector_lib::config::{proxy::ProxyConfig, AcknowledgementsConfig};
9use vector_lib::configurable::configurable_component;
10
11use super::{
12 apm_stats::{flush_apm_stats_thread, Aggregator},
13 service::TraceApiRetry,
14};
15use crate::common::datadog;
16use crate::{
17 config::{GenerateConfig, Input, SinkConfig, SinkContext},
18 http::HttpClient,
19 sinks::{
20 datadog::{
21 traces::{
22 request_builder::DatadogTracesRequestBuilder, service::TraceApiService,
23 sink::TracesSink,
24 },
25 DatadogCommonConfig, LocalDatadogCommonConfig,
26 },
27 util::{
28 service::ServiceBuilderExt, BatchConfig, Compression, SinkBatchSettings,
29 TowerRequestConfig,
30 },
31 Healthcheck, UriParseSnafu, VectorSink,
32 },
33 tls::{MaybeTlsSettings, TlsEnableableConfig},
34};
35
36pub const BATCH_GOAL_BYTES: usize = 3_000_000;
40pub const BATCH_MAX_EVENTS: usize = 1_000;
41pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0;
42
43pub const PAYLOAD_LIMIT: usize = 3_200_000;
44
45#[derive(Clone, Copy, Debug, Default)]
46pub struct DatadogTracesDefaultBatchSettings;
47
48impl SinkBatchSettings for DatadogTracesDefaultBatchSettings {
49 const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
50 const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
51 const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
52}
53
54#[configurable_component(sink("datadog_traces", "Publish trace events to Datadog."))]
56#[derive(Clone, Debug, Default)]
57#[serde(deny_unknown_fields)]
58pub struct DatadogTracesConfig {
59 #[serde(flatten)]
60 pub local_dd_common: LocalDatadogCommonConfig,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub compression: Option<Compression>,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub batch: BatchConfig<DatadogTracesDefaultBatchSettings>,
69
70 #[configurable(derived)]
71 #[serde(default)]
72 pub request: TowerRequestConfig,
73}
74
75impl GenerateConfig for DatadogTracesConfig {
76 fn generate_config() -> toml::Value {
77 toml::from_str(indoc! {r#"
78 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
79 "#})
80 .unwrap()
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
86pub enum DatadogTracesEndpoint {
87 Traces,
88 #[allow(dead_code)] APMStats,
90}
91
92#[derive(Clone)]
94pub struct DatadogTracesEndpointConfiguration {
95 traces_endpoint: Uri,
96 stats_endpoint: Uri,
97}
98
99impl DatadogTracesEndpointConfiguration {
100 pub fn get_uri_for_endpoint(&self, endpoint: DatadogTracesEndpoint) -> Uri {
101 match endpoint {
102 DatadogTracesEndpoint::Traces => self.traces_endpoint.clone(),
103 DatadogTracesEndpoint::APMStats => self.stats_endpoint.clone(),
104 }
105 }
106}
107
108impl DatadogTracesConfig {
109 fn get_base_uri(&self, dd_common: &DatadogCommonConfig) -> String {
110 dd_common
111 .endpoint
112 .clone()
113 .unwrap_or_else(|| format!("https://trace.agent.{}", dd_common.site))
114 }
115
116 fn generate_traces_endpoint_configuration(
117 &self,
118 dd_common: &DatadogCommonConfig,
119 ) -> crate::Result<DatadogTracesEndpointConfiguration> {
120 let base_uri = self.get_base_uri(dd_common);
121 let traces_endpoint = build_uri(&base_uri, "/api/v0.2/traces")?;
122 let stats_endpoint = build_uri(&base_uri, "/api/v0.2/stats")?;
123
124 Ok(DatadogTracesEndpointConfiguration {
125 traces_endpoint,
126 stats_endpoint,
127 })
128 }
129
130 pub fn build_sink(
131 &self,
132 dd_common: &DatadogCommonConfig,
133 client: HttpClient,
134 ) -> crate::Result<VectorSink> {
135 let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
136 let request_limits = self.request.into_settings();
137 let endpoints = self.generate_traces_endpoint_configuration(dd_common)?;
138
139 let batcher_settings = self
140 .batch
141 .validate()?
142 .limit_max_bytes(BATCH_GOAL_BYTES)?
143 .limit_max_events(BATCH_MAX_EVENTS)?
144 .into_batcher_settings()?;
145
146 let service = ServiceBuilder::new()
147 .settings(request_limits, TraceApiRetry)
148 .service(TraceApiService::new(client.clone()));
149
150 let apm_stats_aggregator =
152 Arc::new(Mutex::new(Aggregator::new(Arc::clone(&default_api_key))));
153
154 let compression = self.compression.unwrap_or_else(Compression::gzip_default);
155
156 let request_builder = DatadogTracesRequestBuilder::new(
157 Arc::clone(&default_api_key),
158 endpoints.clone(),
159 compression,
160 PAYLOAD_LIMIT,
161 Arc::clone(&apm_stats_aggregator),
162 )?;
163
164 let (shutdown, tripwire) = channel::<Sender<()>>();
167
168 let sink = TracesSink::new(
169 service,
170 request_builder,
171 batcher_settings,
172 shutdown,
173 self.get_protocol(dd_common),
174 );
175
176 tokio::spawn(flush_apm_stats_thread(
180 tripwire,
181 client,
182 compression,
183 endpoints,
184 Arc::clone(&apm_stats_aggregator),
185 ));
186
187 Ok(VectorSink::from_event_streamsink(sink))
188 }
189
190 pub fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
191 let default_tls_config;
192
193 let tls_settings = MaybeTlsSettings::from_config(
194 Some(match self.local_dd_common.tls.as_ref() {
195 Some(config) => config,
196 None => {
197 default_tls_config = TlsEnableableConfig::enabled();
198 &default_tls_config
199 }
200 }),
201 false,
202 )?;
203 Ok(HttpClient::new(tls_settings, proxy)?)
204 }
205
206 fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
207 build_uri(&self.get_base_uri(dd_common), "")
208 .unwrap()
209 .scheme_str()
210 .unwrap_or("http")
211 .to_string()
212 }
213}
214
215#[async_trait::async_trait]
216#[typetag::serde(name = "datadog_traces")]
217impl SinkConfig for DatadogTracesConfig {
218 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
219 let client = self.build_client(&cx.proxy)?;
220 let global = cx.extra_context.get_or_default::<datadog::Options>();
221 let dd_common = self.local_dd_common.with_globals(global)?;
222 let healthcheck = dd_common.build_healthcheck(client.clone())?;
223 let sink = self.build_sink(&dd_common, client)?;
224
225 Ok((sink, healthcheck))
226 }
227
228 fn input(&self) -> Input {
229 Input::trace()
230 }
231
232 fn acknowledgements(&self) -> &AcknowledgementsConfig {
233 &self.local_dd_common.acknowledgements
234 }
235}
236
237fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
238 let result = format!("{host}{endpoint}")
239 .parse::<Uri>()
240 .context(UriParseSnafu)?;
241 Ok(result)
242}
243
244#[cfg(test)]
245mod test {
246 use super::DatadogTracesConfig;
247
248 #[test]
249 fn generate_config() {
250 crate::test_util::test_generate_config::<DatadogTracesConfig>();
251 }
252}