vector/sinks/datadog/metrics/
config.rs1use std::sync::OnceLock;
2
3use http::Uri;
4use snafu::ResultExt;
5use tower::ServiceBuilder;
6use vector_lib::config::proxy::ProxyConfig;
7use vector_lib::configurable::configurable_component;
8
9use super::{
10 request_builder::DatadogMetricsRequestBuilder,
11 service::{DatadogMetricsRetryLogic, DatadogMetricsService},
12 sink::DatadogMetricsSink,
13};
14use crate::{
15 common::datadog,
16 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
17 http::HttpClient,
18 sinks::{
19 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20 util::{batch::BatchConfig, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig},
21 Healthcheck, UriParseSnafu, VectorSink,
22 },
23 tls::{MaybeTlsSettings, TlsEnableableConfig},
24};
25
26#[derive(Clone, Copy, Debug, Default)]
27pub struct DatadogMetricsDefaultBatchSettings;
28
29impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
36 const MAX_EVENTS: Option<usize> = Some(100_000);
37 const MAX_BYTES: Option<usize> = None;
38 const TIMEOUT_SECS: f64 = 2.0;
39}
40
41pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
42pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
43pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub enum SeriesApiVersion {
47 V1,
48 V2,
49}
50
51impl SeriesApiVersion {
52 pub const fn get_path(self) -> &'static str {
53 match self {
54 Self::V1 => SERIES_V1_PATH,
55 Self::V2 => SERIES_V2_PATH,
56 }
57 }
58 fn get_api_version() -> Self {
59 static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
60 *API_VERSION.get_or_init(|| {
61 match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
62 Ok(_) => Self::V2,
63 Err(_) => Self::V1,
64 }
65 })
66 }
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
73pub enum DatadogMetricsEndpoint {
74 Series(SeriesApiVersion),
75 Sketches,
76}
77
78pub(super) struct DatadogMetricsPayloadLimits {
80 pub(super) uncompressed: usize,
81 pub(super) compressed: usize,
82}
83
84impl DatadogMetricsEndpoint {
85 pub const fn content_type(self) -> &'static str {
87 match self {
88 Self::Series(SeriesApiVersion::V1) => "application/json",
89 Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
90 }
91 }
92
93 pub const fn is_series(self) -> bool {
95 matches!(self, Self::Series { .. })
96 }
97
98 pub fn series() -> Self {
100 Self::Series(SeriesApiVersion::get_api_version())
101 }
102
103 pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
104 let (uncompressed, compressed) = match self {
107 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
109 | DatadogMetricsEndpoint::Sketches => (
110 62_914_560, 3_200_000, ),
113 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
114 5_242_880, 512_000, ),
117 };
118
119 DatadogMetricsPayloadLimits {
120 uncompressed,
121 compressed,
122 }
123 }
124}
125
126pub struct DatadogMetricsEndpointConfiguration {
128 series_endpoint: Uri,
129 sketches_endpoint: Uri,
130}
131
132impl DatadogMetricsEndpointConfiguration {
133 pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
135 Self {
136 series_endpoint,
137 sketches_endpoint,
138 }
139 }
140
141 pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
143 match endpoint {
144 DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
145 DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
146 }
147 }
148}
149
150#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
152#[derive(Clone, Debug, Default)]
153#[serde(deny_unknown_fields)]
154pub struct DatadogMetricsConfig {
155 #[serde(flatten)]
156 pub local_dd_common: LocalDatadogCommonConfig,
157
158 #[configurable(metadata(docs::examples = "myservice"))]
163 #[serde(default)]
164 pub default_namespace: Option<String>,
165
166 #[configurable(derived)]
167 #[serde(default)]
168 pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 pub request: TowerRequestConfig,
173}
174
175impl_generate_config_from_default!(DatadogMetricsConfig);
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "datadog_metrics")]
179impl SinkConfig for DatadogMetricsConfig {
180 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
181 let client = self.build_client(&cx.proxy)?;
182 let global = cx.extra_context.get_or_default::<datadog::Options>();
183 let dd_common = self.local_dd_common.with_globals(global)?;
184 let healthcheck = dd_common.build_healthcheck(client.clone())?;
185 let sink = self.build_sink(&dd_common, client)?;
186
187 Ok((sink, healthcheck))
188 }
189
190 fn input(&self) -> Input {
191 Input::metric()
192 }
193
194 fn acknowledgements(&self) -> &AcknowledgementsConfig {
195 &self.local_dd_common.acknowledgements
196 }
197}
198
199impl DatadogMetricsConfig {
200 fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
209 dd_common.endpoint.clone().unwrap_or_else(|| {
210 let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
211 format!(
212 "https://{}-vector.agent.{}",
213 version,
214 dd_common.site.as_str()
215 )
216 })
217 }
218
219 fn generate_metrics_endpoint_configuration(
221 &self,
222 dd_common: &DatadogCommonConfig,
223 ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
224 let base_uri = self.get_base_agent_endpoint(dd_common);
225
226 let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
227 let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
228
229 Ok(DatadogMetricsEndpointConfiguration::new(
230 series_endpoint,
231 sketches_endpoint,
232 ))
233 }
234
235 fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
236 let default_tls_config;
237
238 let tls_settings = MaybeTlsSettings::from_config(
239 Some(match self.local_dd_common.tls.as_ref() {
240 Some(config) => config,
241 None => {
242 default_tls_config = TlsEnableableConfig::enabled();
243 &default_tls_config
244 }
245 }),
246 false,
247 )?;
248 let client = HttpClient::new(tls_settings, proxy)?;
249 Ok(client)
250 }
251
252 fn build_sink(
253 &self,
254 dd_common: &DatadogCommonConfig,
255 client: HttpClient,
256 ) -> crate::Result<VectorSink> {
257 let batcher_settings = self.batch.into_batcher_settings()?;
258
259 let request_limits = self.request.into_settings();
261
262 let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
263 let service = ServiceBuilder::new()
264 .settings(request_limits, DatadogMetricsRetryLogic)
265 .service(DatadogMetricsService::new(
266 client,
267 dd_common.default_api_key.inner(),
268 ));
269
270 let request_builder = DatadogMetricsRequestBuilder::new(
271 endpoint_configuration,
272 self.default_namespace.clone(),
273 )?;
274
275 let protocol = self.get_protocol(dd_common);
276 let sink = DatadogMetricsSink::new(service, request_builder, batcher_settings, protocol);
277
278 Ok(VectorSink::from_event_streamsink(sink))
279 }
280
281 fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
282 self.get_base_agent_endpoint(dd_common)
283 .parse::<Uri>()
284 .unwrap()
285 .scheme_str()
286 .unwrap_or("http")
287 .to_string()
288 }
289}
290
291fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
292 let result = format!("{host}{endpoint}")
293 .parse::<Uri>()
294 .context(UriParseSnafu)?;
295 Ok(result)
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn generate_config() {
304 crate::test_util::test_generate_config::<DatadogMetricsConfig>();
305 }
306}