vector/sinks/datadog/metrics/
config.rs

1use std::sync::OnceLock;
2
3use http::Uri;
4use snafu::ResultExt;
5use tower::ServiceBuilder;
6use vector_lib::{config::proxy::ProxyConfig, configurable::configurable_component};
7
8use super::{
9    request_builder::DatadogMetricsRequestBuilder,
10    service::{DatadogMetricsRetryLogic, DatadogMetricsService},
11    sink::DatadogMetricsSink,
12};
13use crate::{
14    common::datadog,
15    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16    http::HttpClient,
17    sinks::{
18        Healthcheck, UriParseSnafu, VectorSink,
19        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20        util::{ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, batch::BatchConfig},
21    },
22    tls::{MaybeTlsSettings, TlsEnableableConfig},
23};
24
25#[derive(Clone, Copy, Debug, Default)]
26pub struct DatadogMetricsDefaultBatchSettings;
27
28// This default is centered around "series" data, which should be the lion's share of what we
29// process.  Given that a single series, when encoded, is in the 150-300 byte range, we can fit a
30// lot of these into a single request, something like 150-200K series.  Simply to be a little more
31// conservative, though, we use 100K here.  This will also get a little more tricky when it comes to
32// distributions and sketches, but we're going to have to implement incremental encoding to handle
33// "we've exceeded our maximum payload size, split this batch" scenarios anyways.
34impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
35    const MAX_EVENTS: Option<usize> = Some(100_000);
36    const MAX_BYTES: Option<usize> = None;
37    const TIMEOUT_SECS: f64 = 2.0;
38}
39
40pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
41pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
42pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum SeriesApiVersion {
46    V1,
47    V2,
48}
49
50impl SeriesApiVersion {
51    pub const fn get_path(self) -> &'static str {
52        match self {
53            Self::V1 => SERIES_V1_PATH,
54            Self::V2 => SERIES_V2_PATH,
55        }
56    }
57    fn get_api_version() -> Self {
58        static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
59        *API_VERSION.get_or_init(|| {
60            match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
61                Ok(_) => Self::V2,
62                Err(_) => Self::V1,
63            }
64        })
65    }
66}
67
68/// Various metric type-specific API types.
69///
70/// Each of these corresponds to a specific request path when making a request to the agent API.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
72pub enum DatadogMetricsEndpoint {
73    Series(SeriesApiVersion),
74    Sketches,
75}
76
77/// Payload limits for metrics are endpoint-dependent.
78pub(super) struct DatadogMetricsPayloadLimits {
79    pub(super) uncompressed: usize,
80    pub(super) compressed: usize,
81}
82
83impl DatadogMetricsEndpoint {
84    /// Gets the content type associated with the specific encoder for a given metric endpoint.
85    pub const fn content_type(self) -> &'static str {
86        match self {
87            Self::Series(SeriesApiVersion::V1) => "application/json",
88            Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
89        }
90    }
91
92    // Gets whether or not this is a series endpoint.
93    pub const fn is_series(self) -> bool {
94        matches!(self, Self::Series { .. })
95    }
96
97    // Creates an instance of the `Series` variant with the default API version.
98    pub fn series() -> Self {
99        Self::Series(SeriesApiVersion::get_api_version())
100    }
101
102    pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
103        // from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
104
105        let (uncompressed, compressed) = match self {
106            // Sketches use the same payload size limits as v1 series
107            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
108            | DatadogMetricsEndpoint::Sketches => (
109                62_914_560, // 60 MiB
110                3_200_000,  // 3.2 MB
111            ),
112            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
113                5_242_880, // 5 MiB
114                512_000,   // 512 KB
115            ),
116        };
117
118        DatadogMetricsPayloadLimits {
119            uncompressed,
120            compressed,
121        }
122    }
123}
124
125/// Maps Datadog metric endpoints to their actual URI.
126pub struct DatadogMetricsEndpointConfiguration {
127    series_endpoint: Uri,
128    sketches_endpoint: Uri,
129}
130
131impl DatadogMetricsEndpointConfiguration {
132    /// Creates a new `DatadogMEtricsEndpointConfiguration`.
133    pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
134        Self {
135            series_endpoint,
136            sketches_endpoint,
137        }
138    }
139
140    /// Gets the URI for the given Datadog metrics endpoint.
141    pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
142        match endpoint {
143            DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
144            DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
145        }
146    }
147}
148
149/// Configuration for the `datadog_metrics` sink.
150#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
151#[derive(Clone, Debug, Default)]
152#[serde(deny_unknown_fields)]
153pub struct DatadogMetricsConfig {
154    #[serde(flatten)]
155    pub local_dd_common: LocalDatadogCommonConfig,
156
157    /// Sets the default namespace for any metrics sent.
158    ///
159    /// This namespace is only used if a metric has no existing namespace. When a namespace is
160    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
161    #[configurable(metadata(docs::examples = "myservice"))]
162    #[serde(default)]
163    pub default_namespace: Option<String>,
164
165    #[configurable(derived)]
166    #[serde(default)]
167    pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
168
169    #[configurable(derived)]
170    #[serde(default)]
171    pub request: TowerRequestConfig,
172}
173
174impl_generate_config_from_default!(DatadogMetricsConfig);
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "datadog_metrics")]
178impl SinkConfig for DatadogMetricsConfig {
179    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
180        let client = self.build_client(&cx.proxy)?;
181        let global = cx.extra_context.get_or_default::<datadog::Options>();
182        let dd_common = self.local_dd_common.with_globals(global)?;
183        let healthcheck = dd_common.build_healthcheck(client.clone())?;
184        let sink = self.build_sink(&dd_common, client)?;
185
186        Ok((sink, healthcheck))
187    }
188
189    fn input(&self) -> Input {
190        Input::metric()
191    }
192
193    fn acknowledgements(&self) -> &AcknowledgementsConfig {
194        &self.local_dd_common.acknowledgements
195    }
196}
197
198impl DatadogMetricsConfig {
199    /// Gets the base URI of the Datadog agent API.
200    ///
201    /// Per the Datadog agent convention, we should include a unique identifier as part of the
202    /// domain to indicate that these metrics are being submitted by Vector, including the version,
203    /// likely useful for detecting if a specific version of the agent (Vector, in this case) is
204    /// doing something wrong, for understanding issues from the API side.
205    ///
206    /// The `endpoint` configuration field will be used here if it is present.
207    fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
208        dd_common.endpoint.clone().unwrap_or_else(|| {
209            let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
210            format!(
211                "https://{}-vector.agent.{}",
212                version,
213                dd_common.site.as_str()
214            )
215        })
216    }
217
218    /// Generates the `DatadogMetricsEndpointConfiguration`, used for mapping endpoints to their URI.
219    fn generate_metrics_endpoint_configuration(
220        &self,
221        dd_common: &DatadogCommonConfig,
222    ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
223        let base_uri = self.get_base_agent_endpoint(dd_common);
224
225        let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
226        let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
227
228        Ok(DatadogMetricsEndpointConfiguration::new(
229            series_endpoint,
230            sketches_endpoint,
231        ))
232    }
233
234    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
235        let default_tls_config;
236
237        let tls_settings = MaybeTlsSettings::from_config(
238            Some(match self.local_dd_common.tls.as_ref() {
239                Some(config) => config,
240                None => {
241                    default_tls_config = TlsEnableableConfig::enabled();
242                    &default_tls_config
243                }
244            }),
245            false,
246        )?;
247        let client = HttpClient::new(tls_settings, proxy)?;
248        Ok(client)
249    }
250
251    fn build_sink(
252        &self,
253        dd_common: &DatadogCommonConfig,
254        client: HttpClient,
255    ) -> crate::Result<VectorSink> {
256        let batcher_settings = self.batch.into_batcher_settings()?;
257
258        // TODO: revisit our concurrency and batching defaults
259        let request_limits = self.request.into_settings();
260
261        let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
262        let service = ServiceBuilder::new()
263            .settings(request_limits, DatadogMetricsRetryLogic)
264            .service(DatadogMetricsService::new(
265                client,
266                dd_common.default_api_key.inner(),
267            ));
268
269        let request_builder = DatadogMetricsRequestBuilder::new(
270            endpoint_configuration,
271            self.default_namespace.clone(),
272        )?;
273
274        let protocol = self.get_protocol(dd_common);
275        let sink = DatadogMetricsSink::new(service, request_builder, batcher_settings, protocol);
276
277        Ok(VectorSink::from_event_streamsink(sink))
278    }
279
280    fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
281        self.get_base_agent_endpoint(dd_common)
282            .parse::<Uri>()
283            .unwrap()
284            .scheme_str()
285            .unwrap_or("http")
286            .to_string()
287    }
288}
289
290fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
291    let result = format!("{host}{endpoint}")
292        .parse::<Uri>()
293        .context(UriParseSnafu)?;
294    Ok(result)
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn generate_config() {
303        crate::test_util::test_generate_config::<DatadogMetricsConfig>();
304    }
305}