vector/sinks/datadog/metrics/
config.rs

1use http::Uri;
2use snafu::ResultExt;
3use tower::ServiceBuilder;
4use vector_lib::{
5    config::proxy::ProxyConfig, configurable::configurable_component, stream::BatcherSettings,
6};
7
8use super::{
9    request_builder::DatadogMetricsRequestBuilder,
10    service::{DatadogMetricsRetryLogic, DatadogMetricsService},
11    sink::DatadogMetricsSink,
12};
13use crate::{
14    common::datadog,
15    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16    http::HttpClient,
17    sinks::{
18        Healthcheck, UriParseSnafu, VectorSink,
19        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20        util::{ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, batch::BatchConfig},
21    },
22    tls::{MaybeTlsSettings, TlsEnableableConfig},
23};
24#[derive(Clone, Copy, Debug, Default)]
25pub struct DatadogMetricsDefaultBatchSettings;
26
27impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
28    const MAX_EVENTS: Option<usize> = Some(100_000);
29    // No default byte cap here; the appropriate limit (v1: 60 MiB, v2: 5 MiB) is applied at
30    // sink build time based on the active series API version.
31    const MAX_BYTES: Option<usize> = None;
32    const TIMEOUT_SECS: f64 = 2.0;
33}
34
35pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
36pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
37pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
38
39/// The API version to use when submitting series metrics to Datadog.
40#[configurable_component]
41#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
42#[serde(rename_all = "snake_case")]
43pub enum SeriesApiVersion {
44    /// Use the v1 series endpoint (`/api/v1/series`).
45    ///
46    /// This is a legacy endpoint. Prefer `v2` unless you have a specific reason to use v1.
47    #[configurable(deprecated)]
48    V1,
49
50    /// Use the v2 series endpoint (`/api/v2/series`).
51    ///
52    /// This is the recommended and default endpoint.
53    #[default]
54    V2,
55}
56
57impl SeriesApiVersion {
58    pub const fn get_path(self) -> &'static str {
59        match self {
60            Self::V1 => SERIES_V1_PATH,
61            Self::V2 => SERIES_V2_PATH,
62        }
63    }
64}
65
66/// Various metric type-specific API types.
67///
68/// Each of these corresponds to a specific request path when making a request to the agent API.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum DatadogMetricsEndpoint {
71    Series(SeriesApiVersion),
72    Sketches,
73}
74
75/// Payload limits for metrics are endpoint-dependent.
76pub(super) struct DatadogMetricsPayloadLimits {
77    pub(super) uncompressed: usize,
78    pub(super) compressed: usize,
79}
80
81impl DatadogMetricsEndpoint {
82    /// Gets the content type associated with the specific encoder for a given metric endpoint.
83    pub const fn content_type(self) -> &'static str {
84        match self {
85            Self::Series(SeriesApiVersion::V1) => "application/json",
86            Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
87        }
88    }
89
90    pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
91        // from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
92        let (uncompressed, compressed) = match self {
93            // Sketches use the same payload size limits as v1 series
94            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
95            | DatadogMetricsEndpoint::Sketches => (
96                62_914_560, // 60 MiB
97                3_200_000,  // 3.2 MB
98            ),
99            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
100                5_242_880, // 5 MiB
101                512_000,   // 512 KB
102            ),
103        };
104
105        DatadogMetricsPayloadLimits {
106            uncompressed,
107            compressed,
108        }
109    }
110
111    /// Returns the compression scheme used for this endpoint.
112    pub(super) const fn compression(self) -> DatadogMetricsCompression {
113        match self {
114            Self::Series(SeriesApiVersion::V1) => DatadogMetricsCompression::Zlib,
115            _ => DatadogMetricsCompression::Zstd,
116        }
117    }
118}
119
120/// Selects the compressor for a given Datadog metrics endpoint.
121#[derive(Clone, Copy, Debug)]
122pub(super) enum DatadogMetricsCompression {
123    /// zlib (deflate) — used by Series v1.
124    Zlib,
125    /// zstd — used by Series v2 and Sketches.
126    Zstd,
127}
128
129impl DatadogMetricsCompression {
130    pub(super) const fn content_encoding(self) -> &'static str {
131        match self {
132            Self::Zstd => "zstd",
133            Self::Zlib => "deflate",
134        }
135    }
136}
137
138/// Maps Datadog metric endpoints to their actual URI.
139pub struct DatadogMetricsEndpointConfiguration {
140    series_endpoint: Uri,
141    sketches_endpoint: Uri,
142}
143
144impl DatadogMetricsEndpointConfiguration {
145    /// Creates a new `DatadogMEtricsEndpointConfiguration`.
146    pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
147        Self {
148            series_endpoint,
149            sketches_endpoint,
150        }
151    }
152
153    /// Gets the URI for the given Datadog metrics endpoint.
154    pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
155        match endpoint {
156            DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
157            DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
158        }
159    }
160}
161
162/// Configuration for the `datadog_metrics` sink.
163#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
164#[derive(Clone, Debug, Default)]
165#[serde(deny_unknown_fields)]
166pub struct DatadogMetricsConfig {
167    #[serde(flatten)]
168    pub local_dd_common: LocalDatadogCommonConfig,
169
170    /// Sets the default namespace for any metrics sent.
171    ///
172    /// This namespace is only used if a metric has no existing namespace. When a namespace is
173    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
174    #[configurable(metadata(docs::examples = "myservice"))]
175    #[serde(default)]
176    pub default_namespace: Option<String>,
177
178    /// Controls which Datadog series API endpoint is used to submit metrics.
179    ///
180    /// Defaults to `v2` (`/api/v2/series`). Set to `v1` (`/api/v1/series`) only if you need to
181    /// fall back to the legacy endpoint.
182    #[serde(default)]
183    pub series_api_version: SeriesApiVersion,
184
185    #[configurable(derived)]
186    #[serde(default)]
187    pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
188
189    #[configurable(derived)]
190    #[serde(default)]
191    pub request: TowerRequestConfig,
192}
193
194impl_generate_config_from_default!(DatadogMetricsConfig);
195
196#[async_trait::async_trait]
197#[typetag::serde(name = "datadog_metrics")]
198impl SinkConfig for DatadogMetricsConfig {
199    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
200        let client = self.build_client(&cx.proxy)?;
201        let global = cx.extra_context.get_or_default::<datadog::Options>();
202        let dd_common = self.local_dd_common.with_globals(global)?;
203        let healthcheck = dd_common.build_healthcheck(client.clone())?;
204        let sink = self.build_sink(&dd_common, client)?;
205
206        Ok((sink, healthcheck))
207    }
208
209    fn input(&self) -> Input {
210        Input::metric()
211    }
212
213    fn acknowledgements(&self) -> &AcknowledgementsConfig {
214        &self.local_dd_common.acknowledgements
215    }
216}
217
218impl DatadogMetricsConfig {
219    /// Gets the base URI of the Datadog agent API.
220    ///
221    /// Per the Datadog agent convention, we should include a unique identifier as part of the
222    /// domain to indicate that these metrics are being submitted by Vector, including the version,
223    /// likely useful for detecting if a specific version of the agent (Vector, in this case) is
224    /// doing something wrong, for understanding issues from the API side.
225    ///
226    /// The `endpoint` configuration field will be used here if it is present.
227    fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
228        dd_common.endpoint.clone().unwrap_or_else(|| {
229            let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
230            format!(
231                "https://{}-vector.agent.{}",
232                version,
233                dd_common.site.as_str()
234            )
235        })
236    }
237
238    /// Generates the `DatadogMetricsEndpointConfiguration`, used for mapping endpoints to their URI.
239    fn generate_metrics_endpoint_configuration(
240        &self,
241        dd_common: &DatadogCommonConfig,
242    ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
243        let base_uri = self.get_base_agent_endpoint(dd_common);
244
245        let series_endpoint = build_uri(&base_uri, self.series_api_version.get_path())?;
246        let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
247
248        Ok(DatadogMetricsEndpointConfiguration::new(
249            series_endpoint,
250            sketches_endpoint,
251        ))
252    }
253
254    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
255        let default_tls_config;
256
257        let tls_settings = MaybeTlsSettings::from_config(
258            Some(match self.local_dd_common.tls.as_ref() {
259                Some(config) => config,
260                None => {
261                    default_tls_config = TlsEnableableConfig::enabled();
262                    &default_tls_config
263                }
264            }),
265            false,
266        )?;
267        let client = HttpClient::new(tls_settings, proxy)?;
268        Ok(client)
269    }
270
271    fn build_sink(
272        &self,
273        dd_common: &DatadogCommonConfig,
274        client: HttpClient,
275    ) -> crate::Result<VectorSink> {
276        let (batcher_settings, sketches_batcher_settings) =
277            resolve_endpoint_batch_settings(self.batch, self.series_api_version)?;
278
279        // TODO: revisit our concurrency and batching defaults
280        let request_limits = self.request.into_settings();
281
282        let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
283        let service = ServiceBuilder::new()
284            .settings(request_limits, DatadogMetricsRetryLogic)
285            .service(DatadogMetricsService::new(
286                client,
287                dd_common.default_api_key.inner(),
288            ));
289
290        let request_builder = DatadogMetricsRequestBuilder::new(
291            endpoint_configuration,
292            self.default_namespace.clone(),
293            self.series_api_version,
294        );
295
296        let protocol = self.get_protocol(dd_common);
297        let sink = DatadogMetricsSink::new(
298            service,
299            request_builder,
300            batcher_settings,
301            sketches_batcher_settings,
302            protocol,
303            self.series_api_version,
304        );
305
306        Ok(VectorSink::from_event_streamsink(sink))
307    }
308
309    fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
310        self.get_base_agent_endpoint(dd_common)
311            .parse::<Uri>()
312            .unwrap()
313            .scheme_str()
314            .unwrap_or("http")
315            .to_string()
316    }
317}
318
319/// Returns `(series_settings, sketches_settings)`.
320///
321/// When the user has not set an explicit `max_bytes`, each endpoint is capped to its own
322/// uncompressed payload limit (5 MiB for Series v2, 60 MiB for Sketches). When an explicit
323/// limit is configured, both endpoints share it.
324fn resolve_endpoint_batch_settings(
325    batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
326    series_version: SeriesApiVersion,
327) -> crate::Result<(BatcherSettings, BatcherSettings)> {
328    let mut series = batch.into_batcher_settings()?;
329    let mut sketches = series;
330    if series.size_limit == usize::MAX {
331        series.size_limit = DatadogMetricsEndpoint::Series(series_version)
332            .payload_limits()
333            .uncompressed;
334        sketches.size_limit = DatadogMetricsEndpoint::Sketches
335            .payload_limits()
336            .uncompressed;
337    }
338    Ok((series, sketches))
339}
340
341fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
342    let result = format!("{host}{endpoint}")
343        .parse::<Uri>()
344        .context(UriParseSnafu)?;
345    Ok(result)
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[test]
353    fn generate_config() {
354        crate::test_util::test_generate_config::<DatadogMetricsConfig>();
355    }
356
357    // When max_bytes is unset, each endpoint gets its own API payload limit.
358    #[test]
359    fn default_batch_config_uses_endpoint_specific_size_limits() {
360        let (series, sketches) =
361            resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V2).unwrap();
362
363        assert_eq!(series.size_limit, 5_242_880); // 5 MiB — Series v2 limit
364        assert_eq!(sketches.size_limit, 62_914_560); // 60 MiB — Sketches limit
365    }
366
367    #[test]
368    fn v1_batch_config_uses_v1_size_limit() {
369        let (series, sketches) =
370            resolve_endpoint_batch_settings(BatchConfig::default(), SeriesApiVersion::V1).unwrap();
371
372        assert_eq!(series.size_limit, 62_914_560); // 60 MiB — Series v1 limit
373        assert_eq!(sketches.size_limit, 62_914_560); // 60 MiB — Sketches limit
374    }
375
376    // When the user sets max_bytes, both endpoints share that limit unchanged.
377    #[test]
378    fn explicit_max_bytes_applies_to_both_endpoints() {
379        let mut config = BatchConfig::<DatadogMetricsDefaultBatchSettings>::default();
380        config.max_bytes = Some(1_000_000);
381
382        let (series, sketches) =
383            resolve_endpoint_batch_settings(config, SeriesApiVersion::V2).unwrap();
384
385        assert_eq!(series.size_limit, 1_000_000);
386        assert_eq!(sketches.size_limit, 1_000_000);
387    }
388}