vector/sinks/datadog/metrics/
config.rs

1use std::sync::OnceLock;
2
3use http::Uri;
4use snafu::ResultExt;
5use tower::ServiceBuilder;
6use vector_lib::config::proxy::ProxyConfig;
7use vector_lib::configurable::configurable_component;
8
9use super::{
10    request_builder::DatadogMetricsRequestBuilder,
11    service::{DatadogMetricsRetryLogic, DatadogMetricsService},
12    sink::DatadogMetricsSink,
13};
14use crate::{
15    common::datadog,
16    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
17    http::HttpClient,
18    sinks::{
19        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20        util::{batch::BatchConfig, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig},
21        Healthcheck, UriParseSnafu, VectorSink,
22    },
23    tls::{MaybeTlsSettings, TlsEnableableConfig},
24};
25
26#[derive(Clone, Copy, Debug, Default)]
27pub struct DatadogMetricsDefaultBatchSettings;
28
29// This default is centered around "series" data, which should be the lion's share of what we
30// process.  Given that a single series, when encoded, is in the 150-300 byte range, we can fit a
31// lot of these into a single request, something like 150-200K series.  Simply to be a little more
32// conservative, though, we use 100K here.  This will also get a little more tricky when it comes to
33// distributions and sketches, but we're going to have to implement incremental encoding to handle
34// "we've exceeded our maximum payload size, split this batch" scenarios anyways.
35impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
36    const MAX_EVENTS: Option<usize> = Some(100_000);
37    const MAX_BYTES: Option<usize> = None;
38    const TIMEOUT_SECS: f64 = 2.0;
39}
40
41pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
42pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
43pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub enum SeriesApiVersion {
47    V1,
48    V2,
49}
50
51impl SeriesApiVersion {
52    pub const fn get_path(self) -> &'static str {
53        match self {
54            Self::V1 => SERIES_V1_PATH,
55            Self::V2 => SERIES_V2_PATH,
56        }
57    }
58    fn get_api_version() -> Self {
59        static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
60        *API_VERSION.get_or_init(|| {
61            match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
62                Ok(_) => Self::V2,
63                Err(_) => Self::V1,
64            }
65        })
66    }
67}
68
69/// Various metric type-specific API types.
70///
71/// Each of these corresponds to a specific request path when making a request to the agent API.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73pub enum DatadogMetricsEndpoint {
74    Series(SeriesApiVersion),
75    Sketches,
76}
77
78/// Payload limits for metrics are endpoint-dependent.
79pub(super) struct DatadogMetricsPayloadLimits {
80    pub(super) uncompressed: usize,
81    pub(super) compressed: usize,
82}
83
84impl DatadogMetricsEndpoint {
85    /// Gets the content type associated with the specific encoder for a given metric endpoint.
86    pub const fn content_type(self) -> &'static str {
87        match self {
88            Self::Series(SeriesApiVersion::V1) => "application/json",
89            Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
90        }
91    }
92
93    // Gets whether or not this is a series endpoint.
94    pub const fn is_series(self) -> bool {
95        matches!(self, Self::Series { .. })
96    }
97
98    // Creates an instance of the `Series` variant with the default API version.
99    pub fn series() -> Self {
100        Self::Series(SeriesApiVersion::get_api_version())
101    }
102
103    pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
104        // from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
105
106        let (uncompressed, compressed) = match self {
107            // Sketches use the same payload size limits as v1 series
108            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
109            | DatadogMetricsEndpoint::Sketches => (
110                62_914_560, // 60 MiB
111                3_200_000,  // 3.2 MB
112            ),
113            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
114                5_242_880, // 5 MiB
115                512_000,   // 512 KB
116            ),
117        };
118
119        DatadogMetricsPayloadLimits {
120            uncompressed,
121            compressed,
122        }
123    }
124}
125
126/// Maps Datadog metric endpoints to their actual URI.
127pub struct DatadogMetricsEndpointConfiguration {
128    series_endpoint: Uri,
129    sketches_endpoint: Uri,
130}
131
132impl DatadogMetricsEndpointConfiguration {
133    /// Creates a new `DatadogMEtricsEndpointConfiguration`.
134    pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
135        Self {
136            series_endpoint,
137            sketches_endpoint,
138        }
139    }
140
141    /// Gets the URI for the given Datadog metrics endpoint.
142    pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
143        match endpoint {
144            DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
145            DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
146        }
147    }
148}
149
150/// Configuration for the `datadog_metrics` sink.
151#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
152#[derive(Clone, Debug, Default)]
153#[serde(deny_unknown_fields)]
154pub struct DatadogMetricsConfig {
155    #[serde(flatten)]
156    pub local_dd_common: LocalDatadogCommonConfig,
157
158    /// Sets the default namespace for any metrics sent.
159    ///
160    /// This namespace is only used if a metric has no existing namespace. When a namespace is
161    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
162    #[configurable(metadata(docs::examples = "myservice"))]
163    #[serde(default)]
164    pub default_namespace: Option<String>,
165
166    #[configurable(derived)]
167    #[serde(default)]
168    pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    pub request: TowerRequestConfig,
173}
174
175impl_generate_config_from_default!(DatadogMetricsConfig);
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "datadog_metrics")]
179impl SinkConfig for DatadogMetricsConfig {
180    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
181        let client = self.build_client(&cx.proxy)?;
182        let global = cx.extra_context.get_or_default::<datadog::Options>();
183        let dd_common = self.local_dd_common.with_globals(global)?;
184        let healthcheck = dd_common.build_healthcheck(client.clone())?;
185        let sink = self.build_sink(&dd_common, client)?;
186
187        Ok((sink, healthcheck))
188    }
189
190    fn input(&self) -> Input {
191        Input::metric()
192    }
193
194    fn acknowledgements(&self) -> &AcknowledgementsConfig {
195        &self.local_dd_common.acknowledgements
196    }
197}
198
199impl DatadogMetricsConfig {
200    /// Gets the base URI of the Datadog agent API.
201    ///
202    /// Per the Datadog agent convention, we should include a unique identifier as part of the
203    /// domain to indicate that these metrics are being submitted by Vector, including the version,
204    /// likely useful for detecting if a specific version of the agent (Vector, in this case) is
205    /// doing something wrong, for understanding issues from the API side.
206    ///
207    /// The `endpoint` configuration field will be used here if it is present.
208    fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
209        dd_common.endpoint.clone().unwrap_or_else(|| {
210            let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
211            format!(
212                "https://{}-vector.agent.{}",
213                version,
214                dd_common.site.as_str()
215            )
216        })
217    }
218
219    /// Generates the `DatadogMetricsEndpointConfiguration`, used for mapping endpoints to their URI.
220    fn generate_metrics_endpoint_configuration(
221        &self,
222        dd_common: &DatadogCommonConfig,
223    ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
224        let base_uri = self.get_base_agent_endpoint(dd_common);
225
226        let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
227        let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
228
229        Ok(DatadogMetricsEndpointConfiguration::new(
230            series_endpoint,
231            sketches_endpoint,
232        ))
233    }
234
235    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
236        let default_tls_config;
237
238        let tls_settings = MaybeTlsSettings::from_config(
239            Some(match self.local_dd_common.tls.as_ref() {
240                Some(config) => config,
241                None => {
242                    default_tls_config = TlsEnableableConfig::enabled();
243                    &default_tls_config
244                }
245            }),
246            false,
247        )?;
248        let client = HttpClient::new(tls_settings, proxy)?;
249        Ok(client)
250    }
251
252    fn build_sink(
253        &self,
254        dd_common: &DatadogCommonConfig,
255        client: HttpClient,
256    ) -> crate::Result<VectorSink> {
257        let batcher_settings = self.batch.into_batcher_settings()?;
258
259        // TODO: revisit our concurrency and batching defaults
260        let request_limits = self.request.into_settings();
261
262        let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
263        let service = ServiceBuilder::new()
264            .settings(request_limits, DatadogMetricsRetryLogic)
265            .service(DatadogMetricsService::new(
266                client,
267                dd_common.default_api_key.inner(),
268            ));
269
270        let request_builder = DatadogMetricsRequestBuilder::new(
271            endpoint_configuration,
272            self.default_namespace.clone(),
273        )?;
274
275        let protocol = self.get_protocol(dd_common);
276        let sink = DatadogMetricsSink::new(service, request_builder, batcher_settings, protocol);
277
278        Ok(VectorSink::from_event_streamsink(sink))
279    }
280
281    fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
282        self.get_base_agent_endpoint(dd_common)
283            .parse::<Uri>()
284            .unwrap()
285            .scheme_str()
286            .unwrap_or("http")
287            .to_string()
288    }
289}
290
291fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
292    let result = format!("{host}{endpoint}")
293        .parse::<Uri>()
294        .context(UriParseSnafu)?;
295    Ok(result)
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn generate_config() {
304        crate::test_util::test_generate_config::<DatadogMetricsConfig>();
305    }
306}