vector/sinks/splunk_hec/logs/
config.rs

1use std::sync::Arc;
2
3use vector_lib::{
4    codecs::TextSerializerConfig,
5    lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
6    sensitive_string::SensitiveString,
7};
8
9use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
10use crate::{
11    http::HttpClient,
12    sinks::{
13        prelude::*,
14        splunk_hec::common::{
15            EndpointTarget, SplunkHecDefaultBatchSettings,
16            acknowledgements::HecClientAcknowledgementsConfig,
17            build_healthcheck, build_http_batch_service, create_client,
18            service::{HecService, HttpRequestBuilder},
19        },
20        util::http::HttpRetryLogic,
21    },
22};
23
24/// Configuration for the `splunk_hec_logs` sink.
25#[configurable_component(sink(
26    "splunk_hec_logs",
27    "Deliver log data to Splunk's HTTP Event Collector."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields)]
31pub struct HecLogsSinkConfig {
32    /// Default Splunk HEC token.
33    ///
34    /// If an event has a token set in its secrets (`splunk_hec_token`), it prevails over the one set here.
35    #[serde(alias = "token")]
36    pub default_token: SensitiveString,
37
38    /// The base URL of the Splunk instance.
39    ///
40    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
41    /// by the [`Splunk`][splunk] API are used.
42    ///
43    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
44    #[configurable(metadata(
45        docs::examples = "https://http-inputs-hec.splunkcloud.com",
46        docs::examples = "https://hec.splunk.com:8088",
47        docs::examples = "http://example.com"
48    ))]
49    #[configurable(validation(format = "uri"))]
50    pub endpoint: String,
51
52    /// Overrides the name of the log field used to retrieve the hostname to send to Splunk HEC.
53    ///
54    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
55    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
56    ///
57    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
58    // NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
59    //       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
60    //       empty string to a `None` path internally.
61    #[configurable(metadata(docs::advanced))]
62    pub host_key: Option<OptionalTargetPath>,
63
64    /// Fields to be [added to Splunk index][splunk_field_index_docs].
65    ///
66    /// [splunk_field_index_docs]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/IFXandHEC
67    #[configurable(metadata(docs::advanced))]
68    #[serde(default)]
69    #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))]
70    pub indexed_fields: Vec<ConfigValuePath>,
71
72    /// The name of the index to send events to.
73    ///
74    /// If not specified, the default index defined within Splunk is used.
75    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
76    pub index: Option<Template>,
77
78    /// The sourcetype of events sent to this sink.
79    ///
80    /// If unset, Splunk defaults to `httpevent`.
81    #[configurable(metadata(docs::advanced))]
82    #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
83    pub sourcetype: Option<Template>,
84
85    /// The source of events sent to this sink.
86    ///
87    /// This is typically the filename the logs originated from.
88    ///
89    /// If unset, the Splunk collector sets it.
90    #[configurable(metadata(docs::advanced))]
91    #[configurable(metadata(
92        docs::examples = "{{ file }}",
93        docs::examples = "/var/log/syslog",
94        docs::examples = "UDP:514"
95    ))]
96    pub source: Option<Template>,
97
98    #[configurable(derived)]
99    pub encoding: EncodingConfig,
100
101    #[configurable(derived)]
102    #[serde(default)]
103    pub compression: Compression,
104
105    #[configurable(derived)]
106    #[serde(default)]
107    pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
108
109    #[configurable(derived)]
110    #[serde(default)]
111    pub request: TowerRequestConfig,
112
113    #[configurable(derived)]
114    pub tls: Option<TlsConfig>,
115
116    #[configurable(derived)]
117    #[serde(default)]
118    pub acknowledgements: HecClientAcknowledgementsConfig,
119
120    // This settings is relevant only for the `humio_logs` sink and should be left as `None`
121    // everywhere else.
122    #[serde(skip)]
123    pub timestamp_nanos_key: Option<String>,
124
125    /// Overrides the name of the log field used to retrieve the timestamp to send to Splunk HEC.
126    /// When set to `“”`, a timestamp is not set in the events sent to Splunk HEC.
127    ///
128    /// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
129    /// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
130    ///
131    /// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
132    #[configurable(metadata(docs::advanced))]
133    #[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
134    // NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
135    //       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
136    //       empty string to a `None` path internally.
137    pub timestamp_key: Option<OptionalTargetPath>,
138
139    /// Passes the `auto_extract_timestamp` option to Splunk.
140    ///
141    /// This option is only relevant to Splunk v8.x and above, and is only applied when
142    /// `endpoint_target` is set to `event`.
143    ///
144    /// Setting this to `true` causes Splunk to extract the timestamp from the message text
145    /// rather than use the timestamp embedded in the event. The timestamp must be in the format
146    /// `yyyy-mm-dd hh:mm:ss`.
147    #[serde(default)]
148    pub auto_extract_timestamp: Option<bool>,
149
150    #[configurable(derived)]
151    #[configurable(metadata(docs::advanced))]
152    #[serde(default = "default_endpoint_target")]
153    pub endpoint_target: EndpointTarget,
154}
155
156const fn default_endpoint_target() -> EndpointTarget {
157    EndpointTarget::Event
158}
159
160impl GenerateConfig for HecLogsSinkConfig {
161    fn generate_config() -> toml::Value {
162        toml::Value::try_from(Self {
163            default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
164            endpoint: "endpoint".to_owned(),
165            host_key: None,
166            indexed_fields: vec![],
167            index: None,
168            sourcetype: None,
169            source: None,
170            encoding: TextSerializerConfig::default().into(),
171            compression: Compression::default(),
172            batch: BatchConfig::default(),
173            request: TowerRequestConfig::default(),
174            tls: None,
175            acknowledgements: Default::default(),
176            timestamp_nanos_key: None,
177            timestamp_key: None,
178            auto_extract_timestamp: None,
179            endpoint_target: EndpointTarget::Event,
180        })
181        .unwrap()
182    }
183}
184
185#[async_trait::async_trait]
186#[typetag::serde(name = "splunk_hec_logs")]
187impl SinkConfig for HecLogsSinkConfig {
188    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
189        if self.auto_extract_timestamp.is_some() && self.endpoint_target == EndpointTarget::Raw {
190            return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
191        }
192
193        let client = create_client(self.tls.as_ref(), cx.proxy())?;
194        let healthcheck = build_healthcheck(
195            self.endpoint.clone(),
196            self.default_token.inner().to_owned(),
197            client.clone(),
198        )
199        .boxed();
200        let sink = self.build_processor(client, cx)?;
201
202        Ok((sink, healthcheck))
203    }
204
205    fn input(&self) -> Input {
206        Input::new(self.encoding.config().input_type() & DataType::Log)
207    }
208
209    fn acknowledgements(&self) -> &AcknowledgementsConfig {
210        &self.acknowledgements.inner
211    }
212}
213
214impl HecLogsSinkConfig {
215    pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
216        let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
217            Some(client.clone())
218        } else {
219            None
220        };
221
222        let transformer = self.encoding.transformer();
223        let serializer = self.encoding.build()?;
224        let encoder = Encoder::<()>::new(serializer);
225        let encoder = HecLogsEncoder {
226            transformer,
227            encoder,
228            auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
229        };
230        let request_builder = HecLogsRequestBuilder {
231            encoder,
232            compression: self.compression,
233        };
234
235        let request_settings = self.request.into_settings();
236        let http_request_builder = Arc::new(HttpRequestBuilder::new(
237            self.endpoint.clone(),
238            self.endpoint_target,
239            self.default_token.inner().to_owned(),
240            self.compression,
241        ));
242        let http_service = ServiceBuilder::new()
243            .settings(request_settings, HttpRetryLogic::default())
244            .service(build_http_batch_service(
245                client,
246                Arc::clone(&http_request_builder),
247                self.endpoint_target,
248                self.auto_extract_timestamp.unwrap_or_default(),
249            ));
250
251        let service = HecService::new(
252            http_service,
253            ack_client,
254            http_request_builder,
255            self.acknowledgements.clone(),
256        );
257
258        let batch_settings = self.batch.into_batcher_settings()?;
259
260        let sink = HecLogsSink {
261            service,
262            request_builder,
263            batch_settings,
264            sourcetype: self.sourcetype.clone(),
265            source: self.source.clone(),
266            index: self.index.clone(),
267            indexed_fields: self
268                .indexed_fields
269                .iter()
270                .map(|config_path| config_path.0.clone())
271                .collect(),
272            host_key: self.host_key.clone(),
273            timestamp_nanos_key: self.timestamp_nanos_key.clone(),
274            timestamp_key: self.timestamp_key.clone(),
275            endpoint_target: self.endpoint_target,
276            auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
277        };
278
279        Ok(VectorSink::from_event_streamsink(sink))
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use vector_lib::{
286        codecs::{JsonSerializerConfig, MetricTagValues, encoding::format::JsonSerializerOptions},
287        config::LogNamespace,
288    };
289
290    use super::*;
291    use crate::components::validation::prelude::*;
292
293    #[test]
294    fn generate_config() {
295        crate::test_util::test_generate_config::<HecLogsSinkConfig>();
296    }
297
298    impl ValidatableComponent for HecLogsSinkConfig {
299        fn validation_configuration() -> ValidationConfiguration {
300            let endpoint = "http://127.0.0.1:9001".to_string();
301
302            let mut batch = BatchConfig::default();
303            batch.max_events = Some(1);
304
305            let config = Self {
306                endpoint: endpoint.clone(),
307                default_token: "i_am_an_island".to_string().into(),
308                host_key: None,
309                indexed_fields: vec![],
310                index: None,
311                sourcetype: None,
312                source: None,
313                encoding: EncodingConfig::new(
314                    JsonSerializerConfig::new(
315                        MetricTagValues::Full,
316                        JsonSerializerOptions::default(),
317                    )
318                    .into(),
319                    Transformer::default(),
320                ),
321                compression: Compression::default(),
322                batch,
323                request: TowerRequestConfig {
324                    timeout_secs: 2,
325                    retry_attempts: 0,
326                    ..Default::default()
327                },
328                tls: None,
329                acknowledgements: HecClientAcknowledgementsConfig {
330                    indexer_acknowledgements_enabled: false,
331                    ..Default::default()
332                },
333                timestamp_nanos_key: None,
334                timestamp_key: None,
335                auto_extract_timestamp: None,
336                endpoint_target: EndpointTarget::Raw,
337            };
338
339            let endpoint = format!("{endpoint}/services/collector/raw");
340
341            let external_resource = ExternalResource::new(
342                ResourceDirection::Push,
343                HttpResourceConfig::from_parts(
344                    http::Uri::try_from(&endpoint).expect("should not fail to parse URI"),
345                    None,
346                ),
347                config.encoding.clone(),
348            );
349
350            ValidationConfiguration::from_sink(
351                Self::NAME,
352                LogNamespace::Legacy,
353                vec![ComponentTestCaseConfig::from_sink(
354                    config,
355                    None,
356                    Some(external_resource),
357                )],
358            )
359        }
360    }
361
362    register_validatable_component!(HecLogsSinkConfig);
363}