vector/sinks/datadog/logs/
config.rs

1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5use vector_lib::{
6    config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
7};
8use vrl::value::Kind;
9
10use super::{service::LogApiRetry, sink::LogSinkBuilder};
11use crate::{
12    common::datadog,
13    http::HttpClient,
14    schema,
15    sinks::{
16        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig, logs::service::LogApiService},
17        prelude::*,
18        util::http::RequestConfig,
19    },
20    tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23// The Datadog API has a hard limit of 5MB for uncompressed payloads. Above this
24// threshold the API will toss results. We previously serialized Events as they
25// came in -- a very CPU intensive process -- and to avoid that we only batch up
26// to 750KB below the max and then build our payloads. This does mean that in
27// some situations we'll kick out over-large payloads -- for instance, a string
28// of escaped double-quotes -- but we believe this should be very rare in
29// practice.
30pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
31pub const BATCH_GOAL_BYTES: usize = 4_250_000;
32pub const BATCH_MAX_EVENTS: usize = 1_000;
33pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct DatadogLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
39    const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
40    const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
41    const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
42}
43
44/// Configuration for the `datadog_logs` sink.
45#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct DatadogLogsConfig {
49    #[serde(flatten)]
50    pub local_dd_common: LocalDatadogCommonConfig,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    pub compression: Option<Compression>,
55
56    #[configurable(derived)]
57    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
58    pub encoding: Transformer,
59
60    #[configurable(derived)]
61    #[serde(default)]
62    pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
63
64    #[configurable(derived)]
65    #[serde(default)]
66    pub request: RequestConfig,
67
68    /// When enabled this sink will normalize events to conform to the Datadog Agent standard. This
69    /// also sends requests to the logs backend with the `DD-PROTOCOL: agent-json` header. This bool
70    /// will be overidden as `true` if this header has already been set in the request.headers
71    /// configuration setting.
72    #[serde(default)]
73    pub conforms_as_agent: bool,
74}
75
76impl GenerateConfig for DatadogLogsConfig {
77    fn generate_config() -> toml::Value {
78        toml::from_str(indoc! {r#"
79            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
80        "#})
81        .unwrap()
82    }
83}
84
85impl DatadogLogsConfig {
86    // TODO: We should probably hoist this type of base URI generation so that all DD sinks can
87    // utilize it, since it all follows the same pattern.
88    fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
89        let base_url = dd_common
90            .endpoint
91            .clone()
92            .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
93
94        http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
95    }
96
97    pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
98        self.get_uri(dd_common)
99            .scheme_str()
100            .unwrap_or("http")
101            .to_string()
102    }
103
104    pub fn build_processor(
105        &self,
106        dd_common: &DatadogCommonConfig,
107        client: HttpClient,
108        dd_evp_origin: String,
109    ) -> crate::Result<VectorSink> {
110        let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
111        let request_limits = self.request.tower.into_settings();
112
113        // We forcefully cap the provided batch configuration to the size/log line limits imposed by
114        // the Datadog Logs API, but we still allow them to be lowered if need be.
115        let batch = self
116            .batch
117            .validate()?
118            .limit_max_bytes(BATCH_GOAL_BYTES)?
119            .limit_max_events(BATCH_MAX_EVENTS)?
120            .into_batcher_settings()?;
121
122        let headers = {
123            let mut request_headers = self.request.headers.clone();
124            if self.conforms_as_agent {
125                request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
126            }
127            request_headers
128        };
129
130        // conforms_as_agent is true if either the user supplied configuration parameter is enabled
131        // or the DD-PROTOCOL: agent-json header had already been manually set
132        let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
133            value == "agent-json"
134        } else {
135            false
136        };
137
138        let service = ServiceBuilder::new()
139            .settings(request_limits, LogApiRetry)
140            .service(LogApiService::new(
141                client,
142                self.get_uri(dd_common),
143                headers,
144                dd_evp_origin,
145            )?);
146
147        let encoding = self.encoding.clone();
148        let protocol = self.get_protocol(dd_common);
149
150        let sink = LogSinkBuilder::new(
151            encoding,
152            service,
153            default_api_key,
154            batch,
155            protocol,
156            conforms_as_agent,
157        )
158        .compression(self.compression.unwrap_or_default())
159        .build();
160
161        Ok(VectorSink::from_event_streamsink(sink))
162    }
163
164    pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
165        let default_tls_config;
166
167        let tls_settings = MaybeTlsSettings::from_config(
168            Some(match self.local_dd_common.tls.as_ref() {
169                Some(config) => config,
170                None => {
171                    default_tls_config = TlsEnableableConfig::enabled();
172                    &default_tls_config
173                }
174            }),
175            false,
176        )?;
177        Ok(HttpClient::new(tls_settings, proxy)?)
178    }
179}
180
181#[async_trait::async_trait]
182#[typetag::serde(name = "datadog_logs")]
183impl SinkConfig for DatadogLogsConfig {
184    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
185        let client = self.create_client(&cx.proxy)?;
186        let global = cx.extra_context.get_or_default::<datadog::Options>();
187        let dd_common = self.local_dd_common.with_globals(global)?;
188
189        let healthcheck = dd_common.build_healthcheck(client.clone())?;
190
191        let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
192
193        Ok((sink, healthcheck))
194    }
195
196    fn input(&self) -> Input {
197        let requirement = schema::Requirement::empty()
198            .optional_meaning(meaning::MESSAGE, Kind::bytes())
199            .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
200            .optional_meaning(meaning::HOST, Kind::bytes())
201            .optional_meaning(meaning::SOURCE, Kind::bytes())
202            .optional_meaning(meaning::SEVERITY, Kind::bytes())
203            .optional_meaning(meaning::SERVICE, Kind::bytes())
204            .optional_meaning(meaning::TRACE_ID, Kind::bytes());
205
206        Input::log().with_schema_requirement(requirement)
207    }
208
209    fn acknowledgements(&self) -> &AcknowledgementsConfig {
210        &self.local_dd_common.acknowledgements
211    }
212}
213
214#[cfg(test)]
215mod test {
216    use vector_lib::{
217        codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
218        config::LogNamespace,
219    };
220
221    use super::*;
222    use crate::{codecs::EncodingConfigWithFraming, components::validation::prelude::*};
223
224    #[test]
225    fn generate_config() {
226        crate::test_util::test_generate_config::<DatadogLogsConfig>();
227    }
228
229    impl ValidatableComponent for DatadogLogsConfig {
230        fn validation_configuration() -> ValidationConfiguration {
231            let endpoint = "http://127.0.0.1:9005".to_string();
232            let config = Self {
233                local_dd_common: LocalDatadogCommonConfig {
234                    endpoint: Some(endpoint.clone()),
235                    default_api_key: Some("unused".to_string().into()),
236                    ..Default::default()
237                },
238                ..Default::default()
239            };
240
241            let encoding = EncodingConfigWithFraming::new(
242                None,
243                JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
244                    .into(),
245                config.encoding.clone(),
246            );
247
248            let logs_endpoint = format!("{endpoint}/api/v2/logs");
249
250            let external_resource = ExternalResource::new(
251                ResourceDirection::Push,
252                HttpResourceConfig::from_parts(
253                    http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
254                    None,
255                ),
256                encoding,
257            );
258
259            ValidationConfiguration::from_sink(
260                Self::NAME,
261                LogNamespace::Legacy,
262                vec![ComponentTestCaseConfig::from_sink(
263                    config,
264                    None,
265                    Some(external_resource),
266                )],
267            )
268        }
269    }
270
271    register_validatable_component!(DatadogLogsConfig);
272}