vector/sinks/datadog/
mod.rs

1use futures_util::FutureExt;
2use http::{Request, StatusCode, Uri};
3use hyper::body::Body;
4use snafu::Snafu;
5use vector_lib::{
6    config::AcknowledgementsConfig, configurable::configurable_component,
7    sensitive_string::SensitiveString, tls::TlsEnableableConfig,
8};
9
10use super::Healthcheck;
11use crate::{
12    common::datadog,
13    http::{HttpClient, HttpError},
14    sinks::HealthcheckError,
15};
16
17#[cfg(feature = "sinks-datadog_events")]
18pub mod events;
19#[cfg(feature = "sinks-datadog_logs")]
20pub mod logs;
21#[cfg(feature = "sinks-datadog_metrics")]
22pub mod metrics;
23#[cfg(any(
24    all(feature = "sinks-datadog_logs", feature = "test-utils"),
25    all(feature = "sinks-datadog_metrics", feature = "test-utils"),
26    all(feature = "sinks-datadog_logs", test),
27    all(feature = "sinks-datadog_metrics", test)
28))]
29pub mod test_utils;
30#[cfg(feature = "sinks-datadog_traces")]
31pub mod traces;
32
33/// Shared configuration for Datadog sinks.
34/// Contains the maximum set of common settings that applies to all DD sink components.
35#[configurable_component]
36#[derive(Clone, Debug, Default)]
37#[serde(deny_unknown_fields)]
38pub struct LocalDatadogCommonConfig {
39    /// The endpoint to send observability data to.
40    ///
41    /// The endpoint must contain an HTTP scheme, and may specify a hostname or IP
42    /// address and port. The API path should NOT be specified as this is handled by
43    /// the sink.
44    ///
45    /// If set, overrides the `site` option.
46    #[configurable(metadata(docs::advanced))]
47    #[configurable(metadata(docs::examples = "http://127.0.0.1:8080"))]
48    #[configurable(metadata(docs::examples = "http://example.com:12345"))]
49    #[serde(default)]
50    pub endpoint: Option<String>,
51
52    /// The Datadog [site][dd_site] to send observability data to.
53    ///
54    /// This value can also be set by specifying the `DD_SITE` environment variable.
55    /// The value specified here takes precedence over the environment variable.
56    ///
57    /// If not specified by the environment variable, a default value of
58    /// `datadoghq.com` is taken.
59    ///
60    /// [dd_site]: https://docs.datadoghq.com/getting_started/site
61    #[configurable(metadata(docs::examples = "us3.datadoghq.com"))]
62    #[configurable(metadata(docs::examples = "datadoghq.eu"))]
63    pub site: Option<String>,
64
65    /// The default Datadog [API key][api_key] to use in authentication of HTTP requests.
66    ///
67    /// If an event has a Datadog [API key][api_key] set explicitly in its metadata, it takes
68    /// precedence over this setting.
69    ///
70    /// This value can also be set by specifying the `DD_API_KEY` environment variable.
71    /// The value specified here takes precedence over the environment variable.
72    ///
73    /// [api_key]: https://docs.datadoghq.com/api/?lang=bash#authentication
74    /// [global_options]: /docs/reference/configuration/global-options/#datadog
75    #[configurable(metadata(docs::examples = "${DATADOG_API_KEY_ENV_VAR}"))]
76    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
77    pub default_api_key: Option<SensitiveString>,
78
79    #[configurable(derived)]
80    #[serde(default)]
81    pub tls: Option<TlsEnableableConfig>,
82
83    #[configurable(derived)]
84    #[serde(
85        default,
86        deserialize_with = "crate::serde::bool_or_struct",
87        skip_serializing_if = "crate::serde::is_default"
88    )]
89    pub acknowledgements: AcknowledgementsConfig,
90}
91
92impl LocalDatadogCommonConfig {
93    pub fn new(
94        endpoint: Option<String>,
95        site: Option<String>,
96        default_api_key: Option<SensitiveString>,
97    ) -> Self {
98        Self {
99            endpoint,
100            site,
101            default_api_key,
102            ..Default::default()
103        }
104    }
105
106    pub fn with_globals(
107        &self,
108        config: datadog::Options,
109    ) -> Result<DatadogCommonConfig, ConfigurationError> {
110        Ok(DatadogCommonConfig {
111            endpoint: self.endpoint.clone(),
112            site: self.site.clone().unwrap_or(config.site),
113            default_api_key: self
114                .default_api_key
115                .clone()
116                .or(config.api_key)
117                .ok_or(ConfigurationError::ApiKeyRequired)?,
118            acknowledgements: self.acknowledgements,
119        })
120    }
121}
122
123#[derive(Debug, Snafu, PartialEq, Eq)]
124pub enum ConfigurationError {
125    #[snafu(display("API Key must be specified."))]
126    ApiKeyRequired,
127}
128
129#[derive(Clone, Debug, Default)]
130pub struct DatadogCommonConfig {
131    pub endpoint: Option<String>,
132    pub site: String,
133    pub default_api_key: SensitiveString,
134    pub acknowledgements: AcknowledgementsConfig,
135}
136
137impl DatadogCommonConfig {
138    /// Returns a `Healthcheck` which is a future that will be used to ensure the
139    /// `<site>/api/v1/validate` endpoint is reachable.
140    pub fn build_healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
141        let validate_endpoint = self.get_api_endpoint("/api/v1/validate")?;
142
143        let api_key: String = self.default_api_key.clone().into();
144
145        Ok(build_healthcheck_future(client, validate_endpoint, api_key).boxed())
146    }
147
148    /// Gets the API endpoint with a given suffix path.
149    ///
150    /// If `endpoint` is not specified, we fallback to `site`.
151    fn get_api_endpoint(&self, path: &str) -> crate::Result<Uri> {
152        let base = datadog::get_api_base_endpoint(self.endpoint.as_deref(), self.site.as_str());
153        [&base, path].join("").parse().map_err(Into::into)
154    }
155}
156
157/// Makes a GET HTTP request to `<site>/api/v1/validate` using the provided client and API key.
158async fn build_healthcheck_future(
159    client: HttpClient,
160    validate_endpoint: Uri,
161    api_key: String,
162) -> crate::Result<()> {
163    let request = Request::get(validate_endpoint)
164        .header("DD-API-KEY", api_key)
165        .body(hyper::Body::empty())
166        .map_err(|e| format!("Failed to make HTTP(S) request: {e:?}"))?;
167
168    let response = client.send(request).await?;
169
170    match response.status() {
171        StatusCode::OK => Ok(()),
172        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
173    }
174}
175
176#[derive(Debug, Snafu)]
177pub enum DatadogApiError {
178    #[snafu(display("Failed to make HTTP(S) request: {}", error))]
179    HttpError { error: HttpError },
180    #[snafu(display("Client request was not valid for unknown reasons."))]
181    BadRequest,
182    #[snafu(display("Client request was unauthorized."))]
183    Unauthorized,
184    #[snafu(display("Client request was forbidden."))]
185    Forbidden,
186    #[snafu(display("Client request timed out."))]
187    RequestTimeout,
188    #[snafu(display("Client sent a payload that is too large."))]
189    PayloadTooLarge,
190    #[snafu(display("Client sent too many requests (rate limiting)."))]
191    TooManyRequests,
192    #[snafu(display("Client request was invalid."))]
193    ClientError,
194    #[snafu(display("Server responded with an error."))]
195    ServerError,
196}
197
198impl DatadogApiError {
199    /// Common DatadogApiError handling for HTTP Responses.
200    /// Returns Ok(response) if the response was Ok/Accepted.
201    pub fn from_result(
202        result: Result<http::Response<Body>, HttpError>,
203    ) -> Result<http::Response<Body>, DatadogApiError> {
204        match result {
205            Ok(response) => {
206                match response.status() {
207                    // From https://docs.datadoghq.com/api/latest/logs/:
208                    //
209                    // The status codes answered by the HTTP API are:
210                    // 200: OK (v1)
211                    // 202: Accepted (v2)
212                    // 400: Bad request (likely an issue in the payload
213                    //      formatting)
214                    // 401: Unauthorized (likely a missing API Key))
215                    // 403: Permission issue (likely using an invalid API Key)
216                    // 408: Request Timeout, request should be retried after some
217                    // 413: Payload too large (batch is above 5MB uncompressed)
218                    // 429: Too Many Requests, request should be retried after some time
219                    // 500: Internal Server Error, the server encountered an unexpected condition
220                    //      that prevented it from fulfilling the request, request should be
221                    //      retried after some time
222                    // 503: Service Unavailable, the server is not ready to handle the request
223                    //      probably because it is overloaded, request should be retried after some time
224                    s if s.is_success() => Ok(response),
225                    StatusCode::BAD_REQUEST => Err(DatadogApiError::BadRequest),
226                    StatusCode::UNAUTHORIZED => Err(DatadogApiError::Unauthorized),
227                    StatusCode::FORBIDDEN => Err(DatadogApiError::Forbidden),
228                    StatusCode::REQUEST_TIMEOUT => Err(DatadogApiError::RequestTimeout),
229                    StatusCode::PAYLOAD_TOO_LARGE => Err(DatadogApiError::PayloadTooLarge),
230                    StatusCode::TOO_MANY_REQUESTS => Err(DatadogApiError::TooManyRequests),
231                    s if s.is_client_error() => Err(DatadogApiError::ClientError),
232                    _ => Err(DatadogApiError::ServerError),
233                }
234            }
235            Err(error) => Err(DatadogApiError::HttpError { error }),
236        }
237    }
238
239    pub const fn is_retriable(&self) -> bool {
240        match self {
241            // This retry logic will be expanded further, but specifically retrying unauthorized
242            // requests and lower level HttpErrors for now.
243            // I verified using `curl` that `403` is the response code for this.
244            //
245            // https://github.com/vectordotdev/vector/issues/10870
246            // https://github.com/vectordotdev/vector/issues/12220
247            DatadogApiError::HttpError { error } => error.is_retriable(),
248            DatadogApiError::BadRequest | DatadogApiError::PayloadTooLarge => false,
249            DatadogApiError::ServerError
250            | DatadogApiError::ClientError
251            | DatadogApiError::Unauthorized
252            | DatadogApiError::Forbidden
253            | DatadogApiError::RequestTimeout
254            | DatadogApiError::TooManyRequests => true,
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn local_config_with_no_overrides() {
265        let local = LocalDatadogCommonConfig::new(
266            None,
267            Some("potato.com".into()),
268            Some("key".to_string().into()),
269        );
270        let global = datadog::Options {
271            api_key: Some("more key".to_string().into()),
272            site: "tomato.com".into(),
273        };
274
275        let overriden = local.with_globals(global).unwrap();
276
277        assert_eq!(None, overriden.endpoint);
278        assert_eq!("potato.com".to_string(), overriden.site);
279        assert_eq!(
280            SensitiveString::from("key".to_string()),
281            overriden.default_api_key
282        );
283    }
284
285    #[test]
286    fn local_config_with_overrides() {
287        let local = LocalDatadogCommonConfig::new(None, None, None);
288        let global = datadog::Options {
289            api_key: Some("more key".to_string().into()),
290            site: "tomato.com".into(),
291        };
292
293        let overriden = local.with_globals(global).unwrap();
294
295        assert_eq!(None, overriden.endpoint);
296        assert_eq!("tomato.com".to_string(), overriden.site);
297        assert_eq!(
298            SensitiveString::from("more key".to_string()),
299            overriden.default_api_key
300        );
301    }
302
303    #[test]
304    fn no_api_key() {
305        let local = LocalDatadogCommonConfig::new(None, None, None);
306        let global = datadog::Options {
307            api_key: None,
308            site: "tomato.com".into(),
309        };
310
311        let error = local.with_globals(global).unwrap_err();
312        assert_eq!(ConfigurationError::ApiKeyRequired, error);
313    }
314}