1use futures_util::FutureExt;
2use http::{Request, StatusCode, Uri};
3use hyper::body::Body;
4use snafu::Snafu;
5use vector_lib::{
6 config::AcknowledgementsConfig, configurable::configurable_component,
7 sensitive_string::SensitiveString, tls::TlsEnableableConfig,
8};
9
10use super::Healthcheck;
11use crate::{
12 common::datadog,
13 http::{HttpClient, HttpError},
14 sinks::HealthcheckError,
15};
16
17#[cfg(feature = "sinks-datadog_events")]
18pub mod events;
19#[cfg(feature = "sinks-datadog_logs")]
20pub mod logs;
21#[cfg(feature = "sinks-datadog_metrics")]
22pub mod metrics;
23#[cfg(any(
24 all(feature = "sinks-datadog_logs", feature = "test-utils"),
25 all(feature = "sinks-datadog_metrics", feature = "test-utils"),
26 all(feature = "sinks-datadog_logs", test),
27 all(feature = "sinks-datadog_metrics", test)
28))]
29pub mod test_utils;
30#[cfg(feature = "sinks-datadog_traces")]
31pub mod traces;
32
33#[configurable_component]
36#[derive(Clone, Debug, Default)]
37#[serde(deny_unknown_fields)]
38pub struct LocalDatadogCommonConfig {
39 #[configurable(metadata(docs::advanced))]
47 #[configurable(metadata(docs::examples = "http://127.0.0.1:8080"))]
48 #[configurable(metadata(docs::examples = "http://example.com:12345"))]
49 #[serde(default)]
50 pub endpoint: Option<String>,
51
52 #[configurable(metadata(docs::examples = "us3.datadoghq.com"))]
62 #[configurable(metadata(docs::examples = "datadoghq.eu"))]
63 pub site: Option<String>,
64
65 #[configurable(metadata(docs::examples = "${DATADOG_API_KEY_ENV_VAR}"))]
76 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
77 pub default_api_key: Option<SensitiveString>,
78
79 #[configurable(derived)]
80 #[serde(default)]
81 pub tls: Option<TlsEnableableConfig>,
82
83 #[configurable(derived)]
84 #[serde(
85 default,
86 deserialize_with = "crate::serde::bool_or_struct",
87 skip_serializing_if = "crate::serde::is_default"
88 )]
89 pub acknowledgements: AcknowledgementsConfig,
90}
91
92impl LocalDatadogCommonConfig {
93 pub fn new(
94 endpoint: Option<String>,
95 site: Option<String>,
96 default_api_key: Option<SensitiveString>,
97 ) -> Self {
98 Self {
99 endpoint,
100 site,
101 default_api_key,
102 ..Default::default()
103 }
104 }
105
106 pub fn with_globals(
107 &self,
108 config: datadog::Options,
109 ) -> Result<DatadogCommonConfig, ConfigurationError> {
110 Ok(DatadogCommonConfig {
111 endpoint: self.endpoint.clone(),
112 site: self.site.clone().unwrap_or(config.site),
113 default_api_key: self
114 .default_api_key
115 .clone()
116 .or(config.api_key)
117 .ok_or(ConfigurationError::ApiKeyRequired)?,
118 acknowledgements: self.acknowledgements,
119 })
120 }
121}
122
123#[derive(Debug, Snafu, PartialEq, Eq)]
124pub enum ConfigurationError {
125 #[snafu(display("API Key must be specified."))]
126 ApiKeyRequired,
127}
128
129#[derive(Clone, Debug, Default)]
130pub struct DatadogCommonConfig {
131 pub endpoint: Option<String>,
132 pub site: String,
133 pub default_api_key: SensitiveString,
134 pub acknowledgements: AcknowledgementsConfig,
135}
136
137impl DatadogCommonConfig {
138 pub fn build_healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
141 let validate_endpoint = self.get_api_endpoint("/api/v1/validate")?;
142
143 let api_key: String = self.default_api_key.clone().into();
144
145 Ok(build_healthcheck_future(client, validate_endpoint, api_key).boxed())
146 }
147
148 fn get_api_endpoint(&self, path: &str) -> crate::Result<Uri> {
152 let base = datadog::get_api_base_endpoint(self.endpoint.as_deref(), self.site.as_str());
153 [&base, path].join("").parse().map_err(Into::into)
154 }
155}
156
157async fn build_healthcheck_future(
159 client: HttpClient,
160 validate_endpoint: Uri,
161 api_key: String,
162) -> crate::Result<()> {
163 let request = Request::get(validate_endpoint)
164 .header("DD-API-KEY", api_key)
165 .body(hyper::Body::empty())
166 .map_err(|e| format!("Failed to make HTTP(S) request: {e:?}"))?;
167
168 let response = client.send(request).await?;
169
170 match response.status() {
171 StatusCode::OK => Ok(()),
172 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
173 }
174}
175
176#[derive(Debug, Snafu)]
177pub enum DatadogApiError {
178 #[snafu(display("Failed to make HTTP(S) request: {}", error))]
179 HttpError { error: HttpError },
180 #[snafu(display("Client request was not valid for unknown reasons."))]
181 BadRequest,
182 #[snafu(display("Client request was unauthorized."))]
183 Unauthorized,
184 #[snafu(display("Client request was forbidden."))]
185 Forbidden,
186 #[snafu(display("Client request timed out."))]
187 RequestTimeout,
188 #[snafu(display("Client sent a payload that is too large."))]
189 PayloadTooLarge,
190 #[snafu(display("Client sent too many requests (rate limiting)."))]
191 TooManyRequests,
192 #[snafu(display("Client request was invalid."))]
193 ClientError,
194 #[snafu(display("Server responded with an error."))]
195 ServerError,
196}
197
198impl DatadogApiError {
199 pub fn from_result(
202 result: Result<http::Response<Body>, HttpError>,
203 ) -> Result<http::Response<Body>, DatadogApiError> {
204 match result {
205 Ok(response) => {
206 match response.status() {
207 s if s.is_success() => Ok(response),
225 StatusCode::BAD_REQUEST => Err(DatadogApiError::BadRequest),
226 StatusCode::UNAUTHORIZED => Err(DatadogApiError::Unauthorized),
227 StatusCode::FORBIDDEN => Err(DatadogApiError::Forbidden),
228 StatusCode::REQUEST_TIMEOUT => Err(DatadogApiError::RequestTimeout),
229 StatusCode::PAYLOAD_TOO_LARGE => Err(DatadogApiError::PayloadTooLarge),
230 StatusCode::TOO_MANY_REQUESTS => Err(DatadogApiError::TooManyRequests),
231 s if s.is_client_error() => Err(DatadogApiError::ClientError),
232 _ => Err(DatadogApiError::ServerError),
233 }
234 }
235 Err(error) => Err(DatadogApiError::HttpError { error }),
236 }
237 }
238
239 pub const fn is_retriable(&self) -> bool {
240 match self {
241 DatadogApiError::HttpError { error } => error.is_retriable(),
248 DatadogApiError::BadRequest | DatadogApiError::PayloadTooLarge => false,
249 DatadogApiError::ServerError
250 | DatadogApiError::ClientError
251 | DatadogApiError::Unauthorized
252 | DatadogApiError::Forbidden
253 | DatadogApiError::RequestTimeout
254 | DatadogApiError::TooManyRequests => true,
255 }
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn local_config_with_no_overrides() {
265 let local = LocalDatadogCommonConfig::new(
266 None,
267 Some("potato.com".into()),
268 Some("key".to_string().into()),
269 );
270 let global = datadog::Options {
271 api_key: Some("more key".to_string().into()),
272 site: "tomato.com".into(),
273 };
274
275 let overriden = local.with_globals(global).unwrap();
276
277 assert_eq!(None, overriden.endpoint);
278 assert_eq!("potato.com".to_string(), overriden.site);
279 assert_eq!(
280 SensitiveString::from("key".to_string()),
281 overriden.default_api_key
282 );
283 }
284
285 #[test]
286 fn local_config_with_overrides() {
287 let local = LocalDatadogCommonConfig::new(None, None, None);
288 let global = datadog::Options {
289 api_key: Some("more key".to_string().into()),
290 site: "tomato.com".into(),
291 };
292
293 let overriden = local.with_globals(global).unwrap();
294
295 assert_eq!(None, overriden.endpoint);
296 assert_eq!("tomato.com".to_string(), overriden.site);
297 assert_eq!(
298 SensitiveString::from("more key".to_string()),
299 overriden.default_api_key
300 );
301 }
302
303 #[test]
304 fn no_api_key() {
305 let local = LocalDatadogCommonConfig::new(None, None, None);
306 let global = datadog::Options {
307 api_key: None,
308 site: "tomato.com".into(),
309 };
310
311 let error = local.with_globals(global).unwrap_err();
312 assert_eq!(ConfigurationError::ApiKeyRequired, error);
313 }
314}