vector/sinks/datadog/metrics/
config.rs1use std::sync::OnceLock;
2
3use http::Uri;
4use snafu::ResultExt;
5use tower::ServiceBuilder;
6use vector_lib::{config::proxy::ProxyConfig, configurable::configurable_component};
7
8use super::{
9 request_builder::DatadogMetricsRequestBuilder,
10 service::{DatadogMetricsRetryLogic, DatadogMetricsService},
11 sink::DatadogMetricsSink,
12};
13use crate::{
14 common::datadog,
15 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
16 http::HttpClient,
17 sinks::{
18 Healthcheck, UriParseSnafu, VectorSink,
19 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
20 util::{ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, batch::BatchConfig},
21 },
22 tls::{MaybeTlsSettings, TlsEnableableConfig},
23};
24
25#[derive(Clone, Copy, Debug, Default)]
26pub struct DatadogMetricsDefaultBatchSettings;
27
28impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings {
35 const MAX_EVENTS: Option<usize> = Some(100_000);
36 const MAX_BYTES: Option<usize> = None;
37 const TIMEOUT_SECS: f64 = 2.0;
38}
39
40pub(super) const SERIES_V1_PATH: &str = "/api/v1/series";
41pub(super) const SERIES_V2_PATH: &str = "/api/v2/series";
42pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum SeriesApiVersion {
46 V1,
47 V2,
48}
49
50impl SeriesApiVersion {
51 pub const fn get_path(self) -> &'static str {
52 match self {
53 Self::V1 => SERIES_V1_PATH,
54 Self::V2 => SERIES_V2_PATH,
55 }
56 }
57 fn get_api_version() -> Self {
58 static API_VERSION: OnceLock<SeriesApiVersion> = OnceLock::new();
59 *API_VERSION.get_or_init(|| {
60 match std::env::var("VECTOR_TEMP_USE_DD_METRICS_SERIES_V2_API") {
61 Ok(_) => Self::V2,
62 Err(_) => Self::V1,
63 }
64 })
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
72pub enum DatadogMetricsEndpoint {
73 Series(SeriesApiVersion),
74 Sketches,
75}
76
77pub(super) struct DatadogMetricsPayloadLimits {
79 pub(super) uncompressed: usize,
80 pub(super) compressed: usize,
81}
82
83impl DatadogMetricsEndpoint {
84 pub const fn content_type(self) -> &'static str {
86 match self {
87 Self::Series(SeriesApiVersion::V1) => "application/json",
88 Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf",
89 }
90 }
91
92 pub const fn is_series(self) -> bool {
94 matches!(self, Self::Series { .. })
95 }
96
97 pub fn series() -> Self {
99 Self::Series(SeriesApiVersion::get_api_version())
100 }
101
102 pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits {
103 let (uncompressed, compressed) = match self {
106 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)
108 | DatadogMetricsEndpoint::Sketches => (
109 62_914_560, 3_200_000, ),
112 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => (
113 5_242_880, 512_000, ),
116 };
117
118 DatadogMetricsPayloadLimits {
119 uncompressed,
120 compressed,
121 }
122 }
123}
124
125pub struct DatadogMetricsEndpointConfiguration {
127 series_endpoint: Uri,
128 sketches_endpoint: Uri,
129}
130
131impl DatadogMetricsEndpointConfiguration {
132 pub const fn new(series_endpoint: Uri, sketches_endpoint: Uri) -> Self {
134 Self {
135 series_endpoint,
136 sketches_endpoint,
137 }
138 }
139
140 pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri {
142 match endpoint {
143 DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(),
144 DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(),
145 }
146 }
147}
148
149#[configurable_component(sink("datadog_metrics", "Publish metric events to Datadog."))]
151#[derive(Clone, Debug, Default)]
152#[serde(deny_unknown_fields)]
153pub struct DatadogMetricsConfig {
154 #[serde(flatten)]
155 pub local_dd_common: LocalDatadogCommonConfig,
156
157 #[configurable(metadata(docs::examples = "myservice"))]
162 #[serde(default)]
163 pub default_namespace: Option<String>,
164
165 #[configurable(derived)]
166 #[serde(default)]
167 pub batch: BatchConfig<DatadogMetricsDefaultBatchSettings>,
168
169 #[configurable(derived)]
170 #[serde(default)]
171 pub request: TowerRequestConfig,
172}
173
174impl_generate_config_from_default!(DatadogMetricsConfig);
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "datadog_metrics")]
178impl SinkConfig for DatadogMetricsConfig {
179 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
180 let client = self.build_client(&cx.proxy)?;
181 let global = cx.extra_context.get_or_default::<datadog::Options>();
182 let dd_common = self.local_dd_common.with_globals(global)?;
183 let healthcheck = dd_common.build_healthcheck(client.clone())?;
184 let sink = self.build_sink(&dd_common, client)?;
185
186 Ok((sink, healthcheck))
187 }
188
189 fn input(&self) -> Input {
190 Input::metric()
191 }
192
193 fn acknowledgements(&self) -> &AcknowledgementsConfig {
194 &self.local_dd_common.acknowledgements
195 }
196}
197
198impl DatadogMetricsConfig {
199 fn get_base_agent_endpoint(&self, dd_common: &DatadogCommonConfig) -> String {
208 dd_common.endpoint.clone().unwrap_or_else(|| {
209 let version = str::replace(crate::built_info::PKG_VERSION, ".", "-");
210 format!(
211 "https://{}-vector.agent.{}",
212 version,
213 dd_common.site.as_str()
214 )
215 })
216 }
217
218 fn generate_metrics_endpoint_configuration(
220 &self,
221 dd_common: &DatadogCommonConfig,
222 ) -> crate::Result<DatadogMetricsEndpointConfiguration> {
223 let base_uri = self.get_base_agent_endpoint(dd_common);
224
225 let series_endpoint = build_uri(&base_uri, SeriesApiVersion::get_api_version().get_path())?;
226 let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?;
227
228 Ok(DatadogMetricsEndpointConfiguration::new(
229 series_endpoint,
230 sketches_endpoint,
231 ))
232 }
233
234 fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
235 let default_tls_config;
236
237 let tls_settings = MaybeTlsSettings::from_config(
238 Some(match self.local_dd_common.tls.as_ref() {
239 Some(config) => config,
240 None => {
241 default_tls_config = TlsEnableableConfig::enabled();
242 &default_tls_config
243 }
244 }),
245 false,
246 )?;
247 let client = HttpClient::new(tls_settings, proxy)?;
248 Ok(client)
249 }
250
251 fn build_sink(
252 &self,
253 dd_common: &DatadogCommonConfig,
254 client: HttpClient,
255 ) -> crate::Result<VectorSink> {
256 let batcher_settings = self.batch.into_batcher_settings()?;
257
258 let request_limits = self.request.into_settings();
260
261 let endpoint_configuration = self.generate_metrics_endpoint_configuration(dd_common)?;
262 let service = ServiceBuilder::new()
263 .settings(request_limits, DatadogMetricsRetryLogic)
264 .service(DatadogMetricsService::new(
265 client,
266 dd_common.default_api_key.inner(),
267 ));
268
269 let request_builder = DatadogMetricsRequestBuilder::new(
270 endpoint_configuration,
271 self.default_namespace.clone(),
272 )?;
273
274 let protocol = self.get_protocol(dd_common);
275 let sink = DatadogMetricsSink::new(service, request_builder, batcher_settings, protocol);
276
277 Ok(VectorSink::from_event_streamsink(sink))
278 }
279
280 fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
281 self.get_base_agent_endpoint(dd_common)
282 .parse::<Uri>()
283 .unwrap()
284 .scheme_str()
285 .unwrap_or("http")
286 .to_string()
287 }
288}
289
290fn build_uri(host: &str, endpoint: &str) -> crate::Result<Uri> {
291 let result = format!("{host}{endpoint}")
292 .parse::<Uri>()
293 .context(UriParseSnafu)?;
294 Ok(result)
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn generate_config() {
303 crate::test_util::test_generate_config::<DatadogMetricsConfig>();
304 }
305}