vector/sinks/datadog/logs/
config.rs1use std::{convert::TryFrom, sync::Arc};
2
3use indoc::indoc;
4use tower::ServiceBuilder;
5
6use vector_lib::{
7 config::proxy::ProxyConfig, configurable::configurable_component, schema::meaning,
8};
9use vrl::value::Kind;
10
11use crate::{
12 common::datadog,
13 http::HttpClient,
14 schema,
15 sinks::{
16 datadog::{logs::service::LogApiService, DatadogCommonConfig, LocalDatadogCommonConfig},
17 prelude::*,
18 util::http::RequestConfig,
19 },
20 tls::{MaybeTlsSettings, TlsEnableableConfig},
21};
22
23use super::{service::LogApiRetry, sink::LogSinkBuilder};
24
25pub const MAX_PAYLOAD_BYTES: usize = 5_000_000;
33pub const BATCH_GOAL_BYTES: usize = 4_250_000;
34pub const BATCH_MAX_EVENTS: usize = 1_000;
35pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 5.0;
36
37#[derive(Clone, Copy, Debug, Default)]
38pub struct DatadogLogsDefaultBatchSettings;
39
40impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {
41 const MAX_EVENTS: Option<usize> = Some(BATCH_MAX_EVENTS);
42 const MAX_BYTES: Option<usize> = Some(BATCH_GOAL_BYTES);
43 const TIMEOUT_SECS: f64 = BATCH_DEFAULT_TIMEOUT_SECS;
44}
45
46#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub struct DatadogLogsConfig {
51 #[serde(flatten)]
52 pub local_dd_common: LocalDatadogCommonConfig,
53
54 #[configurable(derived)]
55 #[serde(default)]
56 pub compression: Option<Compression>,
57
58 #[configurable(derived)]
59 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60 pub encoding: Transformer,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub batch: BatchConfig<DatadogLogsDefaultBatchSettings>,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub request: RequestConfig,
69
70 #[serde(default)]
75 pub conforms_as_agent: bool,
76}
77
78impl GenerateConfig for DatadogLogsConfig {
79 fn generate_config() -> toml::Value {
80 toml::from_str(indoc! {r#"
81 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
82 "#})
83 .unwrap()
84 }
85}
86
87impl DatadogLogsConfig {
88 fn get_uri(&self, dd_common: &DatadogCommonConfig) -> http::Uri {
91 let base_url = dd_common
92 .endpoint
93 .clone()
94 .unwrap_or_else(|| format!("https://http-intake.logs.{}", dd_common.site));
95
96 http::Uri::try_from(format!("{base_url}/api/v2/logs")).expect("URI not valid")
97 }
98
99 pub fn get_protocol(&self, dd_common: &DatadogCommonConfig) -> String {
100 self.get_uri(dd_common)
101 .scheme_str()
102 .unwrap_or("http")
103 .to_string()
104 }
105
106 pub fn build_processor(
107 &self,
108 dd_common: &DatadogCommonConfig,
109 client: HttpClient,
110 dd_evp_origin: String,
111 ) -> crate::Result<VectorSink> {
112 let default_api_key: Arc<str> = Arc::from(dd_common.default_api_key.inner());
113 let request_limits = self.request.tower.into_settings();
114
115 let batch = self
118 .batch
119 .validate()?
120 .limit_max_bytes(BATCH_GOAL_BYTES)?
121 .limit_max_events(BATCH_MAX_EVENTS)?
122 .into_batcher_settings()?;
123
124 let headers = {
125 let mut request_headers = self.request.headers.clone();
126 if self.conforms_as_agent {
127 request_headers.insert(String::from("DD-PROTOCOL"), String::from("agent-json"));
128 }
129 request_headers
130 };
131
132 let conforms_as_agent = if let Some(value) = headers.get("DD-PROTOCOL") {
135 value == "agent-json"
136 } else {
137 false
138 };
139
140 let service = ServiceBuilder::new()
141 .settings(request_limits, LogApiRetry)
142 .service(LogApiService::new(
143 client,
144 self.get_uri(dd_common),
145 headers,
146 dd_evp_origin,
147 )?);
148
149 let encoding = self.encoding.clone();
150 let protocol = self.get_protocol(dd_common);
151
152 let sink = LogSinkBuilder::new(
153 encoding,
154 service,
155 default_api_key,
156 batch,
157 protocol,
158 conforms_as_agent,
159 )
160 .compression(self.compression.unwrap_or_default())
161 .build();
162
163 Ok(VectorSink::from_event_streamsink(sink))
164 }
165
166 pub fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
167 let default_tls_config;
168
169 let tls_settings = MaybeTlsSettings::from_config(
170 Some(match self.local_dd_common.tls.as_ref() {
171 Some(config) => config,
172 None => {
173 default_tls_config = TlsEnableableConfig::enabled();
174 &default_tls_config
175 }
176 }),
177 false,
178 )?;
179 Ok(HttpClient::new(tls_settings, proxy)?)
180 }
181}
182
183#[async_trait::async_trait]
184#[typetag::serde(name = "datadog_logs")]
185impl SinkConfig for DatadogLogsConfig {
186 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
187 let client = self.create_client(&cx.proxy)?;
188 let global = cx.extra_context.get_or_default::<datadog::Options>();
189 let dd_common = self.local_dd_common.with_globals(global)?;
190
191 let healthcheck = dd_common.build_healthcheck(client.clone())?;
192
193 let sink = self.build_processor(&dd_common, client, cx.app_name_slug)?;
194
195 Ok((sink, healthcheck))
196 }
197
198 fn input(&self) -> Input {
199 let requirement = schema::Requirement::empty()
200 .optional_meaning(meaning::MESSAGE, Kind::bytes())
201 .optional_meaning(meaning::TIMESTAMP, Kind::timestamp())
202 .optional_meaning(meaning::HOST, Kind::bytes())
203 .optional_meaning(meaning::SOURCE, Kind::bytes())
204 .optional_meaning(meaning::SEVERITY, Kind::bytes())
205 .optional_meaning(meaning::SERVICE, Kind::bytes())
206 .optional_meaning(meaning::TRACE_ID, Kind::bytes());
207
208 Input::log().with_schema_requirement(requirement)
209 }
210
211 fn acknowledgements(&self) -> &AcknowledgementsConfig {
212 &self.local_dd_common.acknowledgements
213 }
214}
215
216#[cfg(test)]
217mod test {
218 use super::*;
219 use crate::codecs::EncodingConfigWithFraming;
220 use crate::components::validation::prelude::*;
221 use vector_lib::{
222 codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
223 config::LogNamespace,
224 };
225
226 #[test]
227 fn generate_config() {
228 crate::test_util::test_generate_config::<DatadogLogsConfig>();
229 }
230
231 impl ValidatableComponent for DatadogLogsConfig {
232 fn validation_configuration() -> ValidationConfiguration {
233 let endpoint = "http://127.0.0.1:9005".to_string();
234 let config = Self {
235 local_dd_common: LocalDatadogCommonConfig {
236 endpoint: Some(endpoint.clone()),
237 default_api_key: Some("unused".to_string().into()),
238 ..Default::default()
239 },
240 ..Default::default()
241 };
242
243 let encoding = EncodingConfigWithFraming::new(
244 None,
245 JsonSerializerConfig::new(MetricTagValues::Full, JsonSerializerOptions::default())
246 .into(),
247 config.encoding.clone(),
248 );
249
250 let logs_endpoint = format!("{endpoint}/api/v2/logs");
251
252 let external_resource = ExternalResource::new(
253 ResourceDirection::Push,
254 HttpResourceConfig::from_parts(
255 http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"),
256 None,
257 ),
258 encoding,
259 );
260
261 ValidationConfiguration::from_sink(
262 Self::NAME,
263 LogNamespace::Legacy,
264 vec![ComponentTestCaseConfig::from_sink(
265 config,
266 None,
267 Some(external_resource),
268 )],
269 )
270 }
271 }
272
273 register_validatable_component!(DatadogLogsConfig);
274}