vector/sinks/datadog/logs/
config.rs1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5use vector_lib::{
6 config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
7};
8use vrl::value::Kind;
9
10use super::{service::LogApiRetry, sink::LogSinkBuilder};
11use crate::{
12 common::datadog,
13 http::HttpClient,
14 schema,
15 sinks::{
16 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig, logs::service::LogApiService},
17 prelude::*,
18 util::http::RequestConfig,
19 },
20 tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
31pub const BATCH_GOAL_BYTES: usize = 4_250_000;
32pub const BATCH_MAX_EVENTS: usize = 1_000;
33pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct DatadogLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
39 const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
40 const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
41 const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
42}
43
44#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct DatadogLogsConfig {
49 #[serde(flatten)]
50 pub local_dd_common: LocalDatadogCommonConfig,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 pub compression: Option<Compression>,
55
56 #[configurable(derived)]
57 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
58 pub encoding: Transformer,
59
60 #[configurable(derived)]
61 #[serde(default)]
62 pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
63
64 #[configurable(derived)]
65 #[serde(default)]
66 pub request: RequestConfig,
67
68 #[serde(default)]
73 pub conforms_as_agent: bool,
74}
75
76impl GenerateConfig for DatadogLogsConfig {
77 fn generate_config() -> toml::Value {
78 toml::from_str(indoc! {r#"
79 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
80 "#})
81 .unwrap()
82 }
83}
84
85impl DatadogLogsConfig {
86 fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
89 let base_url = dd_common
90 .endpoint
91 .clone()
92 .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
93
94 http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
95 }
96
97 pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
98 self.get_uri(dd_common)
99 .scheme_str()
100 .unwrap_or("http")
101 .to_string()
102 }
103
104 pub fn build_processor(
105 &self,
106 dd_common: &DatadogCommonConfig,
107 client: HttpClient,
108 dd_evp_origin: String,
109 ) -> crate::Result<VectorSink> {
110 let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
111 let request_limits = self.request.tower.into_settings();
112
113 let batch = self
116 .batch
117 .validate()?
118 .limit_max_bytes(BATCH_GOAL_BYTES)?
119 .limit_max_events(BATCH_MAX_EVENTS)?
120 .into_batcher_settings()?;
121
122 let headers = {
123 let mut request_headers = self.request.headers.clone();
124 if self.conforms_as_agent {
125 request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
126 }
127 request_headers
128 };
129
130 let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
133 value == "agent-json"
134 } else {
135 false
136 };
137
138 let service = ServiceBuilder::new()
139 .settings(request_limits, LogApiRetry)
140 .service(LogApiService::new(
141 client,
142 self.get_uri(dd_common),
143 headers,
144 dd_evp_origin,
145 )?);
146
147 let encoding = self.encoding.clone();
148 let protocol = self.get_protocol(dd_common);
149
150 let sink = LogSinkBuilder::new(
151 encoding,
152 service,
153 default_api_key,
154 batch,
155 protocol,
156 conforms_as_agent,
157 )
158 .compression(self.compression.unwrap_or_default())
159 .build();
160
161 Ok(VectorSink::from_event_streamsink(sink))
162 }
163
164 pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
165 let default_tls_config;
166
167 let tls_settings = MaybeTlsSettings::from_config(
168 Some(match self.local_dd_common.tls.as_ref() {
169 Some(config) => config,
170 None => {
171 default_tls_config = TlsEnableableConfig::enabled();
172 &default_tls_config
173 }
174 }),
175 false,
176 )?;
177 Ok(HttpClient::new(tls_settings, proxy)?)
178 }
179}
180
181#[async_trait::async_trait]
182#[typetag::serde(name = "datadog_logs")]
183impl SinkConfig for DatadogLogsConfig {
184 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
185 let client = self.create_client(&cx.proxy)?;
186 let global = cx.extra_context.get_or_default::<datadog::Options>();
187 let dd_common = self.local_dd_common.with_globals(global)?;
188
189 let healthcheck = dd_common.build_healthcheck(client.clone())?;
190
191 let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
192
193 Ok((sink, healthcheck))
194 }
195
196 fn input(&self) -> Input {
197 let requirement = schema::Requirement::empty()
198 .optional_meaning(meaning::MESSAGE, Kind::bytes())
199 .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
200 .optional_meaning(meaning::HOST, Kind::bytes())
201 .optional_meaning(meaning::SOURCE, Kind::bytes())
202 .optional_meaning(meaning::SEVERITY, Kind::bytes())
203 .optional_meaning(meaning::SERVICE, Kind::bytes())
204 .optional_meaning(meaning::TRACE_ID, Kind::bytes());
205
206 Input::log().with_schema_requirement(requirement)
207 }
208
209 fn acknowledgements(&self) -> &AcknowledgementsConfig {
210 &self.local_dd_common.acknowledgements
211 }
212}
213
214#[cfg(test)]
215mod test {
216 use vector_lib::{
217 codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
218 config::LogNamespace,
219 };
220
221 use super::*;
222 use crate::{codecs::EncodingConfigWithFraming, components::validation::prelude::*};
223
224 #[test]
225 fn generate_config() {
226 crate::test_util::test_generate_config::<DatadogLogsConfig>();
227 }
228
229 impl ValidatableComponent for DatadogLogsConfig {
230 fn validation_configuration() -> ValidationConfiguration {
231 let endpoint = "http://127.0.0.1:9005".to_string();
232 let config = Self {
233 local_dd_common: LocalDatadogCommonConfig {
234 endpoint: Some(endpoint.clone()),
235 default_api_key: Some("unused".to_string().into()),
236 ..Default::default()
237 },
238 ..Default::default()
239 };
240
241 let encoding = EncodingConfigWithFraming::new(
242 None,
243 JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
244 .into(),
245 config.encoding.clone(),
246 );
247
248 let logs_endpoint = format!("{endpoint}/api/v2/logs");
249
250 let external_resource = ExternalResource::new(
251 ResourceDirection::Push,
252 HttpResourceConfig::from_parts(
253 http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
254 None,
255 ),
256 encoding,
257 );
258
259 ValidationConfiguration::from_sink(
260 Self::NAME,
261 LogNamespace::Legacy,
262 vec![ComponentTestCaseConfig::from_sink(
263 config,
264 None,
265 Some(external_resource),
266 )],
267 )
268 }
269 }
270
271 register_validatable_component!(DatadogLogsConfig);
272}