vector/sinks/splunk_hec/logs/
config.rs1use std::sync::Arc;
2
3use vector_lib::{
4 codecs::TextSerializerConfig,
5 lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
6 sensitive_string::SensitiveString,
7};
8
9use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
10use crate::{
11 http::HttpClient,
12 sinks::{
13 prelude::*,
14 splunk_hec::common::{
15 EndpointTarget, SplunkHecDefaultBatchSettings,
16 acknowledgements::HecClientAcknowledgementsConfig,
17 build_healthcheck, build_http_batch_service, create_client,
18 service::{HecService, HttpRequestBuilder},
19 },
20 util::http::HttpRetryLogic,
21 },
22};
23
24#[configurable_component(sink(
26 "splunk_hec_logs",
27 "Deliver log data to Splunk's HTTP Event Collector."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields)]
31pub struct HecLogsSinkConfig {
32 #[serde(alias = "token")]
36 pub default_token: SensitiveString,
37
38 #[configurable(metadata(
45 docs::examples = "https://http-inputs-hec.splunkcloud.com",
46 docs::examples = "https://hec.splunk.com:8088",
47 docs::examples = "http://example.com"
48 ))]
49 #[configurable(validation(format = "uri"))]
50 pub endpoint: String,
51
52 #[configurable(metadata(docs::advanced))]
62 pub host_key: Option<OptionalTargetPath>,
63
64 #[configurable(metadata(docs::advanced))]
68 #[serde(default)]
69 #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))]
70 pub indexed_fields: Vec<ConfigValuePath>,
71
72 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
76 pub index: Option<Template>,
77
78 #[configurable(metadata(docs::advanced))]
82 #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
83 pub sourcetype: Option<Template>,
84
85 #[configurable(metadata(docs::advanced))]
91 #[configurable(metadata(
92 docs::examples = "{{ file }}",
93 docs::examples = "/var/log/syslog",
94 docs::examples = "UDP:514"
95 ))]
96 pub source: Option<Template>,
97
98 #[configurable(derived)]
99 pub encoding: EncodingConfig,
100
101 #[configurable(derived)]
102 #[serde(default)]
103 pub compression: Compression,
104
105 #[configurable(derived)]
106 #[serde(default)]
107 pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
108
109 #[configurable(derived)]
110 #[serde(default)]
111 pub request: TowerRequestConfig,
112
113 #[configurable(derived)]
114 pub tls: Option<TlsConfig>,
115
116 #[configurable(derived)]
117 #[serde(default)]
118 pub acknowledgements: HecClientAcknowledgementsConfig,
119
120 #[serde(skip)]
123 pub timestamp_nanos_key: Option<String>,
124
125 #[configurable(metadata(docs::advanced))]
133 #[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
134 pub timestamp_key: Option<OptionalTargetPath>,
138
139 #[serde(default)]
148 pub auto_extract_timestamp: Option<bool>,
149
150 #[configurable(derived)]
151 #[configurable(metadata(docs::advanced))]
152 #[serde(default = "default_endpoint_target")]
153 pub endpoint_target: EndpointTarget,
154}
155
156const fn default_endpoint_target() -> EndpointTarget {
157 EndpointTarget::Event
158}
159
160impl GenerateConfig for HecLogsSinkConfig {
161 fn generate_config() -> toml::Value {
162 toml::Value::try_from(Self {
163 default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
164 endpoint: "endpoint".to_owned(),
165 host_key: None,
166 indexed_fields: vec![],
167 index: None,
168 sourcetype: None,
169 source: None,
170 encoding: TextSerializerConfig::default().into(),
171 compression: Compression::default(),
172 batch: BatchConfig::default(),
173 request: TowerRequestConfig::default(),
174 tls: None,
175 acknowledgements: Default::default(),
176 timestamp_nanos_key: None,
177 timestamp_key: None,
178 auto_extract_timestamp: None,
179 endpoint_target: EndpointTarget::Event,
180 })
181 .unwrap()
182 }
183}
184
185#[async_trait::async_trait]
186#[typetag::serde(name = "splunk_hec_logs")]
187impl SinkConfig for HecLogsSinkConfig {
188 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
189 if self.auto_extract_timestamp.is_some() && self.endpoint_target == EndpointTarget::Raw {
190 return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
191 }
192
193 let client = create_client(self.tls.as_ref(), cx.proxy())?;
194 let healthcheck = build_healthcheck(
195 self.endpoint.clone(),
196 self.default_token.inner().to_owned(),
197 client.clone(),
198 )
199 .boxed();
200 let sink = self.build_processor(client, cx)?;
201
202 Ok((sink, healthcheck))
203 }
204
205 fn input(&self) -> Input {
206 Input::new(self.encoding.config().input_type() & DataType::Log)
207 }
208
209 fn acknowledgements(&self) -> &AcknowledgementsConfig {
210 &self.acknowledgements.inner
211 }
212}
213
214impl HecLogsSinkConfig {
215 pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
216 let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
217 Some(client.clone())
218 } else {
219 None
220 };
221
222 let transformer = self.encoding.transformer();
223 let serializer = self.encoding.build()?;
224 let encoder = Encoder::<()>::new(serializer);
225 let encoder = HecLogsEncoder {
226 transformer,
227 encoder,
228 auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
229 };
230 let request_builder = HecLogsRequestBuilder {
231 encoder,
232 compression: self.compression,
233 };
234
235 let request_settings = self.request.into_settings();
236 let http_request_builder = Arc::new(HttpRequestBuilder::new(
237 self.endpoint.clone(),
238 self.endpoint_target,
239 self.default_token.inner().to_owned(),
240 self.compression,
241 ));
242 let http_service = ServiceBuilder::new()
243 .settings(request_settings, HttpRetryLogic::default())
244 .service(build_http_batch_service(
245 client,
246 Arc::clone(&http_request_builder),
247 self.endpoint_target,
248 self.auto_extract_timestamp.unwrap_or_default(),
249 ));
250
251 let service = HecService::new(
252 http_service,
253 ack_client,
254 http_request_builder,
255 self.acknowledgements.clone(),
256 );
257
258 let batch_settings = self.batch.into_batcher_settings()?;
259
260 let sink = HecLogsSink {
261 service,
262 request_builder,
263 batch_settings,
264 sourcetype: self.sourcetype.clone(),
265 source: self.source.clone(),
266 index: self.index.clone(),
267 indexed_fields: self
268 .indexed_fields
269 .iter()
270 .map(|config_path| config_path.0.clone())
271 .collect(),
272 host_key: self.host_key.clone(),
273 timestamp_nanos_key: self.timestamp_nanos_key.clone(),
274 timestamp_key: self.timestamp_key.clone(),
275 endpoint_target: self.endpoint_target,
276 auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
277 };
278
279 Ok(VectorSink::from_event_streamsink(sink))
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use vector_lib::{
286 codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
287 config::LogNamespace,
288 };
289
290 use super::*;
291 use crate::components::validation::prelude::*;
292
293 #[test]
294 fn generate_config() {
295 crate::test_util::test_generate_config::<HecLogsSinkConfig>();
296 }
297
298 impl ValidatableComponent for HecLogsSinkConfig {
299 fn validation_configuration() -> ValidationConfiguration {
300 let endpoint = "http://127.0.0.1:9001".to_string();
301
302 let mut batch = BatchConfig::default();
303 batch.max_events = Some(1);
304
305 let config = Self {
306 endpoint: endpoint.clone(),
307 default_token: "i_am_an_island".to_string().into(),
308 host_key: None,
309 indexed_fields: vec![],
310 index: None,
311 sourcetype: None,
312 source: None,
313 encoding: EncodingConfig::new(
314 JsonSerializerConfig::new(
315 MetricTagValues::Full,
316 JsonSerializerOptions::default(),
317 )
318 .into(),
319 Transformer::default(),
320 ),
321 compression: Compression::default(),
322 batch,
323 request: TowerRequestConfig {
324 timeout_secs: 2,
325 retry_attempts: 0,
326 ..Default::default()
327 },
328 tls: None,
329 acknowledgements: HecClientAcknowledgementsConfig {
330 indexer_acknowledgements_enabled: false,
331 ..Default::default()
332 },
333 timestamp_nanos_key: None,
334 timestamp_key: None,
335 auto_extract_timestamp: None,
336 endpoint_target: EndpointTarget::Raw,
337 };
338
339 let endpoint = format!("{endpoint}/services/collector/raw");
340
341 let external_resource = ExternalResource::new(
342 ResourceDirection::Push,
343 HttpResourceConfig::from_parts(
344 http::Uri::try_from(&endpoint).expect("should not fail to parse URI"),
345 None,
346 ),
347 config.encoding.clone(),
348 );
349
350 ValidationConfiguration::from_sink(
351 Self::NAME,
352 LogNamespace::Legacy,
353 vec![ComponentTestCaseConfig::from_sink(
354 config,
355 None,
356 Some(external_resource),
357 )],
358 )
359 }
360 }
361
362 register_validatable_component!(HecLogsSinkConfig);
363}