vector/sinks/datadog/logs/
config.rs

1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5
6use vector_lib::{
7    config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
8};
9use vrl::value::Kind;
10
11use crate::{
12    common::datadog,
13    http::HttpClient,
14    schema,
15    sinks::{
16        datadog::{logs::service::LogApiService, DatadogCommonConfig, LocalDatadogCommonConfig},
17        prelude::*,
18        util::http::RequestConfig,
19    },
20    tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23use super::{service::LogApiRetry, sink::LogSinkBuilder};
24
25// The Datadog API has a hard limit of 5MB for uncompressed payloads. Above this
26// threshold the API will toss results. We previously serialized Events as they
27// came in -- a very CPU intensive process -- and to avoid that we only batch up
28// to 750KB below the max and then build our payloads. This does mean that in
29// some situations we'll kick out over-large payloads -- for instance, a string
30// of escaped double-quotes -- but we believe this should be very rare in
31// practice.
32pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
33pub const BATCH_GOAL_BYTES: usize = 4_250_000;
34pub const BATCH_MAX_EVENTS: usize = 1_000;
35pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
36
37#[derive(Clone, Copy, Debug, Default)]
38pub struct DatadogLogsDefaultBatchSettings;
39
40impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
41    const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
42    const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
43    const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
44}
45
46/// Configuration for the `datadog_logs` sink.
47#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub struct DatadogLogsConfig {
51    #[serde(flatten)]
52    pub local_dd_common: LocalDatadogCommonConfig,
53
54    #[configurable(derived)]
55    #[serde(default)]
56    pub compression: Option<Compression>,
57
58    #[configurable(derived)]
59    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60    pub encoding: Transformer,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub request: RequestConfig,
69
70    /// When enabled this sink will normalize events to conform to the Datadog Agent standard. This
71    /// also sends requests to the logs backend with the `DD-PROTOCOL: agent-json` header. This bool
72    /// will be overidden as `true` if this header has already been set in the request.headers
73    /// configuration setting.
74    #[serde(default)]
75    pub conforms_as_agent: bool,
76}
77
78impl GenerateConfig for DatadogLogsConfig {
79    fn generate_config() -> toml::Value {
80        toml::from_str(indoc! {r#"
81            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
82        "#})
83        .unwrap()
84    }
85}
86
87impl DatadogLogsConfig {
88    // TODO: We should probably hoist this type of base URI generation so that all DD sinks can
89    // utilize it, since it all follows the same pattern.
90    fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
91        let base_url = dd_common
92            .endpoint
93            .clone()
94            .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
95
96        http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
97    }
98
99    pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
100        self.get_uri(dd_common)
101            .scheme_str()
102            .unwrap_or("http")
103            .to_string()
104    }
105
106    pub fn build_processor(
107        &self,
108        dd_common: &DatadogCommonConfig,
109        client: HttpClient,
110        dd_evp_origin: String,
111    ) -> crate::Result<VectorSink> {
112        let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
113        let request_limits = self.request.tower.into_settings();
114
115        // We forcefully cap the provided batch configuration to the size/log line limits imposed by
116        // the Datadog Logs API, but we still allow them to be lowered if need be.
117        let batch = self
118            .batch
119            .validate()?
120            .limit_max_bytes(BATCH_GOAL_BYTES)?
121            .limit_max_events(BATCH_MAX_EVENTS)?
122            .into_batcher_settings()?;
123
124        let headers = {
125            let mut request_headers = self.request.headers.clone();
126            if self.conforms_as_agent {
127                request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
128            }
129            request_headers
130        };
131
132        // conforms_as_agent is true if either the user supplied configuration parameter is enabled
133        // or the DD-PROTOCOL: agent-json header had already been manually set
134        let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
135            value == "agent-json"
136        } else {
137            false
138        };
139
140        let service = ServiceBuilder::new()
141            .settings(request_limits, LogApiRetry)
142            .service(LogApiService::new(
143                client,
144                self.get_uri(dd_common),
145                headers,
146                dd_evp_origin,
147            )?);
148
149        let encoding = self.encoding.clone();
150        let protocol = self.get_protocol(dd_common);
151
152        let sink = LogSinkBuilder::new(
153            encoding,
154            service,
155            default_api_key,
156            batch,
157            protocol,
158            conforms_as_agent,
159        )
160        .compression(self.compression.unwrap_or_default())
161        .build();
162
163        Ok(VectorSink::from_event_streamsink(sink))
164    }
165
166    pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
167        let default_tls_config;
168
169        let tls_settings = MaybeTlsSettings::from_config(
170            Some(match self.local_dd_common.tls.as_ref() {
171                Some(config) => config,
172                None => {
173                    default_tls_config = TlsEnableableConfig::enabled();
174                    &default_tls_config
175                }
176            }),
177            false,
178        )?;
179        Ok(HttpClient::new(tls_settings, proxy)?)
180    }
181}
182
183#[async_trait::async_trait]
184#[typetag::serde(name = "datadog_logs")]
185impl SinkConfig for DatadogLogsConfig {
186    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
187        let client = self.create_client(&cx.proxy)?;
188        let global = cx.extra_context.get_or_default::<datadog::Options>();
189        let dd_common = self.local_dd_common.with_globals(global)?;
190
191        let healthcheck = dd_common.build_healthcheck(client.clone())?;
192
193        let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
194
195        Ok((sink, healthcheck))
196    }
197
198    fn input(&self) -> Input {
199        let requirement = schema::Requirement::empty()
200            .optional_meaning(meaning::MESSAGE, Kind::bytes())
201            .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
202            .optional_meaning(meaning::HOST, Kind::bytes())
203            .optional_meaning(meaning::SOURCE, Kind::bytes())
204            .optional_meaning(meaning::SEVERITY, Kind::bytes())
205            .optional_meaning(meaning::SERVICE, Kind::bytes())
206            .optional_meaning(meaning::TRACE_ID, Kind::bytes());
207
208        Input::log().with_schema_requirement(requirement)
209    }
210
211    fn acknowledgements(&self) -> &AcknowledgementsConfig {
212        &self.local_dd_common.acknowledgements
213    }
214}
215
216#[cfg(test)]
217mod test {
218    use super::*;
219    use crate::codecs::EncodingConfigWithFraming;
220    use crate::components::validation::prelude::*;
221    use vector_lib::{
222        codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
223        config::LogNamespace,
224    };
225
226    #[test]
227    fn generate_config() {
228        crate::test_util::test_generate_config::<DatadogLogsConfig>();
229    }
230
231    impl ValidatableComponent for DatadogLogsConfig {
232        fn validation_configuration() -> ValidationConfiguration {
233            let endpoint = "http://127.0.0.1:9005".to_string();
234            let config = Self {
235                local_dd_common: LocalDatadogCommonConfig {
236                    endpoint: Some(endpoint.clone()),
237                    default_api_key: Some("unused".to_string().into()),
238                    ..Default::default()
239                },
240                ..Default::default()
241            };
242
243            let encoding = EncodingConfigWithFraming::new(
244                None,
245                JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
246                    .into(),
247                config.encoding.clone(),
248            );
249
250            let logs_endpoint = format!("{endpoint}/api/v2/logs");
251
252            let external_resource = ExternalResource::new(
253                ResourceDirection::Push,
254                HttpResourceConfig::from_parts(
255                    http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
256                    None,
257                ),
258                encoding,
259            );
260
261            ValidationConfiguration::from_sink(
262                Self::NAME,
263                LogNamespace::Legacy,
264                vec![ComponentTestCaseConfig::from_sink(
265                    config,
266                    None,
267                    Some(external_resource),
268                )],
269            )
270        }
271    }
272
273    register_validatable_component!(DatadogLogsConfig);
274}