vector/sinks/datadog/logs/
config.rs

1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5use vector_lib::{
6    config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
7};
8use vrl::value::Kind;
9
10use super::{service::LogApiRetry, sink::LogSinkBuilder};
11use crate::{
12    common::datadog,
13    http::HttpClient,
14    schema,
15    sinks::{
16        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig, logs::service::LogApiService},
17        prelude::*,
18        util::http::RequestConfig,
19    },
20    tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23// The Datadog API has a hard limit of 5MB for uncompressed payloads. Above this
24// threshold the API will toss results. We previously serialized Events as they
25// came in -- a very CPU intensive process -- and to avoid that we only batch up
26// to 750KB below the max and then build our payloads. This does mean that in
27// some situations we'll kick out over-large payloads -- for instance, a string
28// of escaped double-quotes -- but we believe this should be very rare in
29// practice.
30pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
31pub const BATCH_GOAL_BYTES: usize = 4_250_000;
32pub const BATCH_MAX_EVENTS: usize = 1_000;
33pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct DatadogLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
39    const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
40    const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
41    const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
42}
43
44/// Configuration for the `datadog_logs` sink.
45#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
46#[derive(Clone, Debug, Derivative)]
47#[derivative(Default)]
48#[serde(deny_unknown_fields)]
49pub struct DatadogLogsConfig {
50    #[serde(flatten)]
51    pub local_dd_common: LocalDatadogCommonConfig,
52
53    #[configurable(derived)]
54    #[derivative(Default(value = "default_compression()"))]
55    #[serde(default = "default_compression")]
56    pub compression: Option<Compression>,
57
58    #[configurable(derived)]
59    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60    pub encoding: Transformer,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub request: RequestConfig,
69
70    /// When enabled this sink will normalize events to conform to the Datadog Agent standard. This
71    /// also sends requests to the logs backend with the `DD-PROTOCOL: agent-json` header. This bool
72    /// will be overidden as `true` if this header has already been set in the request.headers
73    /// configuration setting.
74    #[serde(default)]
75    pub conforms_as_agent: bool,
76}
77
78const fn default_compression() -> Option<Compression> {
79    Some(Compression::zstd_default())
80}
81
82impl GenerateConfig for DatadogLogsConfig {
83    fn generate_config() -> toml::Value {
84        toml::from_str(indoc! {r#"
85            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
86        "#})
87        .unwrap()
88    }
89}
90
91impl DatadogLogsConfig {
92    // TODO: We should probably hoist this type of base URI generation so that all DD sinks can
93    // utilize it, since it all follows the same pattern.
94    fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
95        let base_url = dd_common
96            .endpoint
97            .clone()
98            .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
99
100        http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
101    }
102
103    pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
104        self.get_uri(dd_common)
105            .scheme_str()
106            .unwrap_or("http")
107            .to_string()
108    }
109
110    pub fn build_processor(
111        &self,
112        dd_common: &DatadogCommonConfig,
113        client: HttpClient,
114        dd_evp_origin: String,
115    ) -> crate::Result<VectorSink> {
116        let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
117        let request_limits = self.request.tower.into_settings();
118
119        // We forcefully cap the provided batch configuration to the size/log line limits imposed by
120        // the Datadog Logs API, but we still allow them to be lowered if need be.
121        let batch = self
122            .batch
123            .validate()?
124            .limit_max_bytes(BATCH_GOAL_BYTES)?
125            .limit_max_events(BATCH_MAX_EVENTS)?
126            .into_batcher_settings()?;
127
128        let headers = {
129            let mut request_headers = self.request.headers.clone();
130            if self.conforms_as_agent {
131                request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
132            }
133            request_headers
134        };
135
136        // conforms_as_agent is true if either the user supplied configuration parameter is enabled
137        // or the DD-PROTOCOL: agent-json header had already been manually set
138        let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
139            value == "agent-json"
140        } else {
141            false
142        };
143
144        let service = ServiceBuilder::new()
145            .settings(request_limits, LogApiRetry)
146            .service(LogApiService::new(
147                client,
148                self.get_uri(dd_common),
149                headers,
150                dd_evp_origin,
151            )?);
152
153        let encoding = self.encoding.clone();
154        let protocol = self.get_protocol(dd_common);
155
156        let sink = LogSinkBuilder::new(
157            encoding,
158            service,
159            default_api_key,
160            batch,
161            protocol,
162            conforms_as_agent,
163        )
164        .compression(self.compression.or_else(default_compression).unwrap())
165        .build();
166
167        Ok(VectorSink::from_event_streamsink(sink))
168    }
169
170    pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
171        let default_tls_config;
172
173        let tls_settings = MaybeTlsSettings::from_config(
174            Some(match self.local_dd_common.tls.as_ref() {
175                Some(config) => config,
176                None => {
177                    default_tls_config = TlsEnableableConfig::enabled();
178                    &default_tls_config
179                }
180            }),
181            false,
182        )?;
183        Ok(HttpClient::new(tls_settings, proxy)?)
184    }
185}
186
187#[async_trait::async_trait]
188#[typetag::serde(name = "datadog_logs")]
189impl SinkConfig for DatadogLogsConfig {
190    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
191        let client = self.create_client(&cx.proxy)?;
192        let global = cx.extra_context.get_or_default::<datadog::Options>();
193        let dd_common = self.local_dd_common.with_globals(global)?;
194
195        let healthcheck = dd_common.build_healthcheck(client.clone())?;
196
197        let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
198
199        Ok((sink, healthcheck))
200    }
201
202    fn input(&self) -> Input {
203        let requirement = schema::Requirement::empty()
204            .optional_meaning(meaning::MESSAGE, Kind::bytes())
205            .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
206            .optional_meaning(meaning::HOST, Kind::bytes())
207            .optional_meaning(meaning::SOURCE, Kind::bytes())
208            .optional_meaning(meaning::SEVERITY, Kind::bytes())
209            .optional_meaning(meaning::SERVICE, Kind::bytes())
210            .optional_meaning(meaning::TRACE_ID, Kind::bytes());
211
212        Input::log().with_schema_requirement(requirement)
213    }
214
215    fn acknowledgements(&self) -> &AcknowledgementsConfig {
216        &self.local_dd_common.acknowledgements
217    }
218}
219
220#[cfg(test)]
221mod test {
222    use vector_lib::{
223        codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
224        config::LogNamespace,
225    };
226
227    use super::*;
228    use crate::{codecs::EncodingConfigWithFraming, components::validation::prelude::*};
229
230    #[test]
231    fn generate_config() {
232        crate::test_util::test_generate_config::<DatadogLogsConfig>();
233    }
234
235    #[test]
236    fn compression_config_field() {
237        // Verify the default compression function returns zstd
238        assert_eq!(default_compression(), Some(Compression::zstd_default()));
239
240        // Test 1: Config deserialized without compression field gets zstd default
241        // (due to #[serde(default = "default_compression")])
242        let config_yaml = indoc! {r#"
243            default_api_key: "test_key"
244        "#};
245
246        let config: DatadogLogsConfig = serde_yaml::from_str(config_yaml).unwrap();
247        // The serde default applies immediately during deserialization
248        assert!(matches!(config.compression, Some(Compression::Zstd(_))));
249
250        // Test 2: When explicitly set to "none", it should be Some(Compression::None)
251        let config_yaml_with_none = indoc! {r#"
252            default_api_key: "test_key"
253            compression: "none"
254        "#};
255
256        let config_no_compression: DatadogLogsConfig =
257            serde_yaml::from_str(config_yaml_with_none).unwrap();
258        assert_eq!(config_no_compression.compression, Some(Compression::None));
259
260        // Test 3: When explicitly set to "zstd", it should be Some(Compression::Zstd)
261        let config_yaml_with_zstd = indoc! {r#"
262            default_api_key: "test_key"
263            compression: "zstd"
264        "#};
265
266        let config_zstd: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_zstd).unwrap();
267        assert!(matches!(
268            config_zstd.compression,
269            Some(Compression::Zstd(_))
270        ));
271
272        // Test 4: When explicitly set to "gzip", it should be Some(Compression::Gzip)
273        let config_yaml_with_gzip = indoc! {r#"
274            default_api_key: "test_key"
275            compression: "gzip"
276        "#};
277
278        let config_gzip: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_gzip).unwrap();
279        assert!(matches!(
280            config_gzip.compression,
281            Some(Compression::Gzip(_))
282        ));
283    }
284
285    impl ValidatableComponent for DatadogLogsConfig {
286        fn validation_configuration() -> ValidationConfiguration {
287            let endpoint = "http://127.0.0.1:9005".to_string();
288            let config = Self {
289                local_dd_common: LocalDatadogCommonConfig {
290                    endpoint: Some(endpoint.clone()),
291                    default_api_key: Some("unused".to_string().into()),
292                    ..Default::default()
293                },
294                // Disable compression for validation tests to ensure byte counting is accurate
295                compression: Some(Compression::None),
296                ..Default::default()
297            };
298
299            let encoding = EncodingConfigWithFraming::new(
300                None,
301                JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
302                    .into(),
303                config.encoding.clone(),
304            );
305
306            let logs_endpoint = format!("{endpoint}/api/v2/logs");
307
308            let external_resource = ExternalResource::new(
309                ResourceDirection::Push,
310                HttpResourceConfig::from_parts(
311                    http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
312                    None,
313                ),
314                encoding,
315            );
316
317            ValidationConfiguration::from_sink(
318                Self::NAME,
319                LogNamespace::Legacy,
320                vec![ComponentTestCaseConfig::from_sink(
321                    config,
322                    None,
323                    Some(external_resource),
324                )],
325            )
326        }
327    }
328
329    register_validatable_component!(DatadogLogsConfig);
330}