vector/sinks/splunk_hec/logs/
config.rs1use std::sync::Arc;
2
3use vector_lib::{
4 codecs::TextSerializerConfig,
5 lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
6 sensitive_string::SensitiveString,
7};
8
9use crate::{
10 http::HttpClient,
11 sinks::{
12 prelude::*,
13 splunk_hec::common::{
14 acknowledgements::HecClientAcknowledgementsConfig,
15 build_healthcheck, build_http_batch_service, create_client,
16 service::{HecService, HttpRequestBuilder},
17 EndpointTarget, SplunkHecDefaultBatchSettings,
18 },
19 util::http::HttpRetryLogic,
20 },
21};
22
23use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
24
25#[configurable_component(sink(
27 "splunk_hec_logs",
28 "Deliver log data to Splunk's HTTP Event Collector."
29))]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct HecLogsSinkConfig {
33 #[serde(alias = "token")]
37 pub default_token: SensitiveString,
38
39 #[configurable(metadata(
46 docs::examples = "https://http-inputs-hec.splunkcloud.com",
47 docs::examples = "https://hec.splunk.com:8088",
48 docs::examples = "http://example.com"
49 ))]
50 #[configurable(validation(format = "uri"))]
51 pub endpoint: String,
52
53 #[configurable(metadata(docs::advanced))]
63 pub host_key: Option<OptionalTargetPath>,
64
65 #[configurable(metadata(docs::advanced))]
69 #[serde(default)]
70 #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))]
71 pub indexed_fields: Vec<ConfigValuePath>,
72
73 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
77 pub index: Option<Template>,
78
79 #[configurable(metadata(docs::advanced))]
83 #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
84 pub sourcetype: Option<Template>,
85
86 #[configurable(metadata(docs::advanced))]
92 #[configurable(metadata(
93 docs::examples = "{{ file }}",
94 docs::examples = "/var/log/syslog",
95 docs::examples = "UDP:514"
96 ))]
97 pub source: Option<Template>,
98
99 #[configurable(derived)]
100 pub encoding: EncodingConfig,
101
102 #[configurable(derived)]
103 #[serde(default)]
104 pub compression: Compression,
105
106 #[configurable(derived)]
107 #[serde(default)]
108 pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
109
110 #[configurable(derived)]
111 #[serde(default)]
112 pub request: TowerRequestConfig,
113
114 #[configurable(derived)]
115 pub tls: Option<TlsConfig>,
116
117 #[configurable(derived)]
118 #[serde(default)]
119 pub acknowledgements: HecClientAcknowledgementsConfig,
120
121 #[serde(skip)]
124 pub timestamp_nanos_key: Option<String>,
125
126 #[configurable(metadata(docs::advanced))]
134 #[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
135 pub timestamp_key: Option<OptionalTargetPath>,
139
140 #[serde(default)]
149 pub auto_extract_timestamp: Option<bool>,
150
151 #[configurable(derived)]
152 #[configurable(metadata(docs::advanced))]
153 #[serde(default = "default_endpoint_target")]
154 pub endpoint_target: EndpointTarget,
155}
156
157const fn default_endpoint_target() -> EndpointTarget {
158 EndpointTarget::Event
159}
160
161impl GenerateConfig for HecLogsSinkConfig {
162 fn generate_config() -> toml::Value {
163 toml::Value::try_from(Self {
164 default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
165 endpoint: "endpoint".to_owned(),
166 host_key: None,
167 indexed_fields: vec![],
168 index: None,
169 sourcetype: None,
170 source: None,
171 encoding: TextSerializerConfig::default().into(),
172 compression: Compression::default(),
173 batch: BatchConfig::default(),
174 request: TowerRequestConfig::default(),
175 tls: None,
176 acknowledgements: Default::default(),
177 timestamp_nanos_key: None,
178 timestamp_key: None,
179 auto_extract_timestamp: None,
180 endpoint_target: EndpointTarget::Event,
181 })
182 .unwrap()
183 }
184}
185
186#[async_trait::async_trait]
187#[typetag::serde(name = "splunk_hec_logs")]
188impl SinkConfig for HecLogsSinkConfig {
189 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
190 if self.auto_extract_timestamp.is_some() && self.endpoint_target == EndpointTarget::Raw {
191 return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
192 }
193
194 let client = create_client(self.tls.as_ref(), cx.proxy())?;
195 let healthcheck = build_healthcheck(
196 self.endpoint.clone(),
197 self.default_token.inner().to_owned(),
198 client.clone(),
199 )
200 .boxed();
201 let sink = self.build_processor(client, cx)?;
202
203 Ok((sink, healthcheck))
204 }
205
206 fn input(&self) -> Input {
207 Input::new(self.encoding.config().input_type() & DataType::Log)
208 }
209
210 fn acknowledgements(&self) -> &AcknowledgementsConfig {
211 &self.acknowledgements.inner
212 }
213}
214
215impl HecLogsSinkConfig {
216 pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
217 let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
218 Some(client.clone())
219 } else {
220 None
221 };
222
223 let transformer = self.encoding.transformer();
224 let serializer = self.encoding.build()?;
225 let encoder = Encoder::<()>::new(serializer);
226 let encoder = HecLogsEncoder {
227 transformer,
228 encoder,
229 auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
230 };
231 let request_builder = HecLogsRequestBuilder {
232 encoder,
233 compression: self.compression,
234 };
235
236 let request_settings = self.request.into_settings();
237 let http_request_builder = Arc::new(HttpRequestBuilder::new(
238 self.endpoint.clone(),
239 self.endpoint_target,
240 self.default_token.inner().to_owned(),
241 self.compression,
242 ));
243 let http_service = ServiceBuilder::new()
244 .settings(request_settings, HttpRetryLogic::default())
245 .service(build_http_batch_service(
246 client,
247 Arc::clone(&http_request_builder),
248 self.endpoint_target,
249 self.auto_extract_timestamp.unwrap_or_default(),
250 ));
251
252 let service = HecService::new(
253 http_service,
254 ack_client,
255 http_request_builder,
256 self.acknowledgements.clone(),
257 );
258
259 let batch_settings = self.batch.into_batcher_settings()?;
260
261 let sink = HecLogsSink {
262 service,
263 request_builder,
264 batch_settings,
265 sourcetype: self.sourcetype.clone(),
266 source: self.source.clone(),
267 index: self.index.clone(),
268 indexed_fields: self
269 .indexed_fields
270 .iter()
271 .map(|config_path| config_path.0.clone())
272 .collect(),
273 host_key: self.host_key.clone(),
274 timestamp_nanos_key: self.timestamp_nanos_key.clone(),
275 timestamp_key: self.timestamp_key.clone(),
276 endpoint_target: self.endpoint_target,
277 auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
278 };
279
280 Ok(VectorSink::from_event_streamsink(sink))
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::components::validation::prelude::*;
288 use vector_lib::{
289 codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
290 config::LogNamespace,
291 };
292
293 #[test]
294 fn generate_config() {
295 crate::test_util::test_generate_config::<HecLogsSinkConfig>();
296 }
297
298 impl ValidatableComponent for HecLogsSinkConfig {
299 fn validation_configuration() -> ValidationConfiguration {
300 let endpoint = "http://127.0.0.1:9001".to_string();
301
302 let mut batch = BatchConfig::default();
303 batch.max_events = Some(1);
304
305 let config = Self {
306 endpoint: endpoint.clone(),
307 default_token: "i_am_an_island".to_string().into(),
308 host_key: None,
309 indexed_fields: vec![],
310 index: None,
311 sourcetype: None,
312 source: None,
313 encoding: EncodingConfig::new(
314 JsonSerializerConfig::new(
315 MetricTagValues::Full,
316 JsonSerializerOptions::default(),
317 )
318 .into(),
319 Transformer::default(),
320 ),
321 compression: Compression::default(),
322 batch,
323 request: TowerRequestConfig {
324 timeout_secs: 2,
325 retry_attempts: 0,
326 ..Default::default()
327 },
328 tls: None,
329 acknowledgements: HecClientAcknowledgementsConfig {
330 indexer_acknowledgements_enabled: false,
331 ..Default::default()
332 },
333 timestamp_nanos_key: None,
334 timestamp_key: None,
335 auto_extract_timestamp: None,
336 endpoint_target: EndpointTarget::Raw,
337 };
338
339 let endpoint = format!("{endpoint}/services/collector/raw");
340
341 let external_resource = ExternalResource::new(
342 ResourceDirection::Push,
343 HttpResourceConfig::from_parts(
344 http::Uri::try_from(&endpoint).expect("should not fail to parse URI"),
345 None,
346 ),
347 config.encoding.clone(),
348 );
349
350 ValidationConfiguration::from_sink(
351 Self::NAME,
352 LogNamespace::Legacy,
353 vec![ComponentTestCaseConfig::from_sink(
354 config,
355 None,
356 Some(external_resource),
357 )],
358 )
359 }
360 }
361
362 register_validatable_component!(HecLogsSinkConfig);
363}