vector/sinks/datadog/metrics/
config.rs1use http::Uri;
2use snafu::ResultExt;
3use tower::ServiceBuilder;
4use vector_lib::{
5 config::proxy::ProxyConfig, configurable::configurable_component, stream::BatcherSettings,
6};
7
8use super::{
9 request_builder::DatadogMetricsRequestBuilder,
10 service::{DatadogMetricsRetryLogic, DatadogMetricsService},
11 sink::DatadogMetricsSink,
12};
13use crate::{
14 common::datadog,
15 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16 http::HttpClient,
17 sinks::{
18 Healthcheck, UriParseSnafu, VectorSink,
19 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20 util::{ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, batch::BatchConfig},
21 },
22 tls::{MaybeTlsSettings, TlsEnableableConfig},
23};
24#[derive(Clone, Copy, Debug, Default)]
25pub struct DatadogMetricsDefaultBatchSettings;
26
27impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
28 const MAX_EVENTS: Option<usize> = Some(100_000);
29 const MAX_BYTES: Option<usize> = None;
32 const TIMEOUT_SECS: f64 = 2.0;
33}
34
35pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
36pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
37pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
38
39#[configurable_component]
41#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
42#[serde(rename_all = "snake_case")]
43pub enum SeriesApiVersion {
44 #[configurable(deprecated)]
48 V1,
49
50 #[default]
54 V2,
55}
56
57impl SeriesApiVersion {
58 pub const fn get_path(self) -> &'static str {
59 match self {
60 Self::V1 => SERIES_V1_PATH,
61 Self::V2 => SERIES_V2_PATH,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum DatadogMetricsEndpoint {
71 Series(SeriesApiVersion),
72 Sketches,
73}
74
75pub(super) struct DatadogMetricsPayloadLimits {
77 pub(super) uncompressed: usize,
78 pub(super) compressed: usize,
79}
80
81impl DatadogMetricsEndpoint {
82 pub const fn content_type(self) -> &'static str {
84 match self {
85 Self::Series(SeriesApiVersion::V1) => "application/json",
86 Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
87 }
88 }
89
90 pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
91 let (uncompressed, compressed) = match self {
93 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
95 | DatadogMetricsEndpoint::Sketches => (
96 62_914_560, 3_200_000, ),
99 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
100 5_242_880, 512_000, ),
103 };
104
105 DatadogMetricsPayloadLimits {
106 uncompressed,
107 compressed,
108 }
109 }
110
111 pub(super) const fn compression(self) -> DatadogMetricsCompression {
113 match self {
114 Self::Series(SeriesApiVersion::V1) => DatadogMetricsCompression::Zlib,
115 _ => DatadogMetricsCompression::Zstd,
116 }
117 }
118}
119
120#[derive(Clone, Copy, Debug)]
122pub(super) enum DatadogMetricsCompression {
123 Zlib,
125 Zstd,
127}
128
129impl DatadogMetricsCompression {
130 pub(super) const fn content_encoding(self) -> &'static str {
131 match self {
132 Self::Zstd => "zstd",
133 Self::Zlib => "deflate",
134 }
135 }
136}
137
138pub struct DatadogMetricsEndpointConfiguration {
140 series_endpoint: Uri,
141 sketches_endpoint: Uri,
142}
143
144impl DatadogMetricsEndpointConfiguration {
145 pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
147 Self {
148 series_endpoint,
149 sketches_endpoint,
150 }
151 }
152
153 pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
155 match endpoint {
156 DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
157 DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
158 }
159 }
160}
161
162#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
164#[derive(Clone, Debug, Default)]
165#[serde(deny_unknown_fields)]
166pub struct DatadogMetricsConfig {
167 #[serde(flatten)]
168 pub local_dd_common: LocalDatadogCommonConfig,
169
170 #[configurable(metadata(docs::examples = "myservice"))]
175 #[serde(default)]
176 pub default_namespace: Option<String>,
177
178 #[serde(default)]
183 pub series_api_version: SeriesApiVersion,
184
185 #[configurable(derived)]
186 #[serde(default)]
187 pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
188
189 #[configurable(derived)]
190 #[serde(default)]
191 pub request: TowerRequestConfig,
192}
193
194impl_generate_config_from_default!(DatadogMetricsConfig);
195
196#[async_trait::async_trait]
197#[typetag::serde(name = "datadog_metrics")]
198impl SinkConfig for DatadogMetricsConfig {
199 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
200 let client = self.build_client(&cx.proxy)?;
201 let global = cx.extra_context.get_or_default::<datadog::Options>();
202 let dd_common = self.local_dd_common.with_globals(global)?;
203 let healthcheck = dd_common.build_healthcheck(client.clone())?;
204 let sink = self.build_sink(&dd_common, client)?;
205
206 Ok((sink, healthcheck))
207 }
208
209 fn input(&self) -> Input {
210 Input::metric()
211 }
212
213 fn acknowledgements(&self) -> &AcknowledgementsConfig {
214 &self.local_dd_common.acknowledgements
215 }
216}
217
218impl DatadogMetricsConfig {
219 fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
228 dd_common.endpoint.clone().unwrap_or_else(|| {
229 let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
230 format!(
231 "https://{}-vector.agent.{}",
232 version,
233 dd_common.site.as_str()
234 )
235 })
236 }
237
238 fn generate_metrics_endpoint_configuration(
240 &self,
241 dd_common: &DatadogCommonConfig,
242 ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
243 let base_uri = self.get_base_agent_endpoint(dd_common);
244
245 let series_endpoint = build_uri(&base_uri, self.series_api_version.get_path())?;
246 let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
247
248 Ok(DatadogMetricsEndpointConfiguration::new(
249 series_endpoint,
250 sketches_endpoint,
251 ))
252 }
253
254 fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
255 let default_tls_config;
256
257 let tls_settings = MaybeTlsSettings::from_config(
258 Some(match self.local_dd_common.tls.as_ref() {
259 Some(config) => config,
260 None => {
261 default_tls_config = TlsEnableableConfig::enabled();
262 &default_tls_config
263 }
264 }),
265 false,
266 )?;
267 let client = HttpClient::new(tls_settings, proxy)?;
268 Ok(client)
269 }
270
271 fn build_sink(
272 &self,
273 dd_common: &DatadogCommonConfig,
274 client: HttpClient,
275 ) -> crate::Result<VectorSink> {
276 let (batcher_settings, sketches_batcher_settings) =
277 resolve_endpoint_batch_settings(self.batch, self.series_api_version)?;
278
279 let request_limits = self.request.into_settings();
281
282 let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
283 let service = ServiceBuilder::new()
284 .settings(request_limits, DatadogMetricsRetryLogic)
285 .service(DatadogMetricsService::new(
286 client,
287 dd_common.default_api_key.inner(),
288 ));
289
290 let request_builder = DatadogMetricsRequestBuilder::new(
291 endpoint_configuration,
292 self.default_namespace.clone(),
293 self.series_api_version,
294 );
295
296 let protocol = self.get_protocol(dd_common);
297 let sink = DatadogMetricsSink::new(
298 service,
299 request_builder,
300 batcher_settings,
301 sketches_batcher_settings,
302 protocol,
303 self.series_api_version,
304 );
305
306 Ok(VectorSink::from_event_streamsink(sink))
307 }
308
309 fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
310 self.get_base_agent_endpoint(dd_common)
311 .parse::<Uri>()
312 .unwrap()
313 .scheme_str()
314 .unwrap_or("http")
315 .to_string()
316 }
317}
318
319fn resolve_endpoint_batch_settings(
325 batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
326 series_version: SeriesApiVersion,
327) -> crate::Result<(BatcherSettings, BatcherSettings)> {
328 let mut series = batch.into_batcher_settings()?;
329 let mut sketches = series;
330 if series.size_limit == usize::MAX {
331 series.size_limit = DatadogMetricsEndpoint::Series(series_version)
332 .payload_limits()
333 .uncompressed;
334 sketches.size_limit = DatadogMetricsEndpoint::Sketches
335 .payload_limits()
336 .uncompressed;
337 }
338 Ok((series, sketches))
339}
340
341fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
342 let result = format!("{host}{endpoint}")
343 .parse::<Uri>()
344 .context(UriParseSnafu)?;
345 Ok(result)
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[test]
353 fn generate_config() {
354 crate::test_util::test_generate_config::<DatadogMetricsConfig>();
355 }
356
357 #[test]
359 fn default_batch_config_uses_endpoint_specific_size_limits() {
360 let (series, sketches) =
361 resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V2).unwrap();
362
363 assert_eq!(series.size_limit, 5_242_880); assert_eq!(sketches.size_limit, 62_914_560); }
366
367 #[test]
368 fn v1_batch_config_uses_v1_size_limit() {
369 let (series, sketches) =
370 resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V1).unwrap();
371
372 assert_eq!(series.size_limit, 62_914_560); assert_eq!(sketches.size_limit, 62_914_560); }
375
376 #[test]
378 fn explicit_max_bytes_applies_to_both_endpoints() {
379 let mut config = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default();
380 config.max_bytes = Some(1_000_000);
381
382 let (series, sketches) =
383 resolve_endpoint_batch_settings(config, SeriesApiVersion::V2).unwrap();
384
385 assert_eq!(series.size_limit, 1_000_000);
386 assert_eq!(sketches.size_limit, 1_000_000);
387 }
388}