vector/sinks/datadog/logs/
config.rs1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5use vector_lib::{
6 config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
7};
8use vrl::value::Kind;
9
10use super::{service::LogApiRetry, sink::LogSinkBuilder};
11use crate::{
12 common::datadog,
13 http::HttpClient,
14 schema,
15 sinks::{
16 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig, logs::service::LogApiService},
17 prelude::*,
18 util::http::RequestConfig,
19 },
20 tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
31pub const BATCH_GOAL_BYTES: usize = 4_250_000;
32pub const BATCH_MAX_EVENTS: usize = 1_000;
33pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct DatadogLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
39 const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
40 const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
41 const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
42}
43
44#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
46#[derive(Clone, Debug, Derivative)]
47#[derivative(Default)]
48#[serde(deny_unknown_fields)]
49pub struct DatadogLogsConfig {
50 #[serde(flatten)]
51 pub local_dd_common: LocalDatadogCommonConfig,
52
53 #[configurable(derived)]
54 #[derivative(Default(value = "default_compression()"))]
55 #[serde(default = "default_compression")]
56 pub compression: Option<Compression>,
57
58 #[configurable(derived)]
59 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60 pub encoding: Transformer,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub request: RequestConfig,
69
70 #[serde(default)]
75 pub conforms_as_agent: bool,
76}
77
78const fn default_compression() -> Option<Compression> {
79 Some(Compression::zstd_default())
80}
81
82impl GenerateConfig for DatadogLogsConfig {
83 fn generate_config() -> toml::Value {
84 toml::from_str(indoc! {r#"
85 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
86 "#})
87 .unwrap()
88 }
89}
90
91impl DatadogLogsConfig {
92 fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
95 let base_url = dd_common
96 .endpoint
97 .clone()
98 .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
99
100 http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
101 }
102
103 pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
104 self.get_uri(dd_common)
105 .scheme_str()
106 .unwrap_or("http")
107 .to_string()
108 }
109
110 pub fn build_processor(
111 &self,
112 dd_common: &DatadogCommonConfig,
113 client: HttpClient,
114 dd_evp_origin: String,
115 ) -> crate::Result<VectorSink> {
116 let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
117 let request_limits = self.request.tower.into_settings();
118
119 let batch = self
122 .batch
123 .validate()?
124 .limit_max_bytes(BATCH_GOAL_BYTES)?
125 .limit_max_events(BATCH_MAX_EVENTS)?
126 .into_batcher_settings()?;
127
128 let headers = {
129 let mut request_headers = self.request.headers.clone();
130 if self.conforms_as_agent {
131 request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
132 }
133 request_headers
134 };
135
136 let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
139 value == "agent-json"
140 } else {
141 false
142 };
143
144 let service = ServiceBuilder::new()
145 .settings(request_limits, LogApiRetry)
146 .service(LogApiService::new(
147 client,
148 self.get_uri(dd_common),
149 headers,
150 dd_evp_origin,
151 )?);
152
153 let encoding = self.encoding.clone();
154 let protocol = self.get_protocol(dd_common);
155
156 let sink = LogSinkBuilder::new(
157 encoding,
158 service,
159 default_api_key,
160 batch,
161 protocol,
162 conforms_as_agent,
163 )
164 .compression(self.compression.or_else(default_compression).unwrap())
165 .build();
166
167 Ok(VectorSink::from_event_streamsink(sink))
168 }
169
170 pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
171 let default_tls_config;
172
173 let tls_settings = MaybeTlsSettings::from_config(
174 Some(match self.local_dd_common.tls.as_ref() {
175 Some(config) => config,
176 None => {
177 default_tls_config = TlsEnableableConfig::enabled();
178 &default_tls_config
179 }
180 }),
181 false,
182 )?;
183 Ok(HttpClient::new(tls_settings, proxy)?)
184 }
185}
186
187#[async_trait::async_trait]
188#[typetag::serde(name = "datadog_logs")]
189impl SinkConfig for DatadogLogsConfig {
190 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
191 let client = self.create_client(&cx.proxy)?;
192 let global = cx.extra_context.get_or_default::<datadog::Options>();
193 let dd_common = self.local_dd_common.with_globals(global)?;
194
195 let healthcheck = dd_common.build_healthcheck(client.clone())?;
196
197 let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
198
199 Ok((sink, healthcheck))
200 }
201
202 fn input(&self) -> Input {
203 let requirement = schema::Requirement::empty()
204 .optional_meaning(meaning::MESSAGE, Kind::bytes())
205 .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
206 .optional_meaning(meaning::HOST, Kind::bytes())
207 .optional_meaning(meaning::SOURCE, Kind::bytes())
208 .optional_meaning(meaning::SEVERITY, Kind::bytes())
209 .optional_meaning(meaning::SERVICE, Kind::bytes())
210 .optional_meaning(meaning::TRACE_ID, Kind::bytes());
211
212 Input::log().with_schema_requirement(requirement)
213 }
214
215 fn acknowledgements(&self) -> &AcknowledgementsConfig {
216 &self.local_dd_common.acknowledgements
217 }
218}
219
220#[cfg(test)]
221mod test {
222 use vector_lib::{
223 codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
224 config::LogNamespace,
225 };
226
227 use super::*;
228 use crate::{codecs::EncodingConfigWithFraming, components::validation::prelude::*};
229
230 #[test]
231 fn generate_config() {
232 crate::test_util::test_generate_config::<DatadogLogsConfig>();
233 }
234
235 #[test]
236 fn compression_config_field() {
237 assert_eq!(default_compression(), Some(Compression::zstd_default()));
239
240 let config_yaml = indoc! {r#"
243 default_api_key: "test_key"
244 "#};
245
246 let config: DatadogLogsConfig = serde_yaml::from_str(config_yaml).unwrap();
247 assert!(matches!(config.compression, Some(Compression::Zstd(_))));
249
250 let config_yaml_with_none = indoc! {r#"
252 default_api_key: "test_key"
253 compression: "none"
254 "#};
255
256 let config_no_compression: DatadogLogsConfig =
257 serde_yaml::from_str(config_yaml_with_none).unwrap();
258 assert_eq!(config_no_compression.compression, Some(Compression::None));
259
260 let config_yaml_with_zstd = indoc! {r#"
262 default_api_key: "test_key"
263 compression: "zstd"
264 "#};
265
266 let config_zstd: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_zstd).unwrap();
267 assert!(matches!(
268 config_zstd.compression,
269 Some(Compression::Zstd(_))
270 ));
271
272 let config_yaml_with_gzip = indoc! {r#"
274 default_api_key: "test_key"
275 compression: "gzip"
276 "#};
277
278 let config_gzip: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_gzip).unwrap();
279 assert!(matches!(
280 config_gzip.compression,
281 Some(Compression::Gzip(_))
282 ));
283 }
284
285 impl ValidatableComponent for DatadogLogsConfig {
286 fn validation_configuration() -> ValidationConfiguration {
287 let endpoint = "http://127.0.0.1:9005".to_string();
288 let config = Self {
289 local_dd_common: LocalDatadogCommonConfig {
290 endpoint: Some(endpoint.clone()),
291 default_api_key: Some("unused".to_string().into()),
292 ..Default::default()
293 },
294 compression: Some(Compression::None),
296 ..Default::default()
297 };
298
299 let encoding = EncodingConfigWithFraming::new(
300 None,
301 JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
302 .into(),
303 config.encoding.clone(),
304 );
305
306 let logs_endpoint = format!("{endpoint}/api/v2/logs");
307
308 let external_resource = ExternalResource::new(
309 ResourceDirection::Push,
310 HttpResourceConfig::from_parts(
311 http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
312 None,
313 ),
314 encoding,
315 );
316
317 ValidationConfiguration::from_sink(
318 Self::NAME,
319 LogNamespace::Legacy,
320 vec![ComponentTestCaseConfig::from_sink(
321 config,
322 None,
323 Some(external_resource),
324 )],
325 )
326 }
327 }
328
329 register_validatable_component!(DatadogLogsConfig);
330}