vector/sinks/splunk_hec/logs/
config.rs

1use std::sync::Arc;
2
3use vector_lib::{
4    codecs::TextSerializerConfig,
5    lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
6    sensitive_string::SensitiveString,
7};
8
9use crate::{
10    http::HttpClient,
11    sinks::{
12        prelude::*,
13        splunk_hec::common::{
14            acknowledgements::HecClientAcknowledgementsConfig,
15            build_healthcheck, build_http_batch_service, create_client,
16            service::{HecService, HttpRequestBuilder},
17            EndpointTarget, SplunkHecDefaultBatchSettings,
18        },
19        util::http::HttpRetryLogic,
20    },
21};
22
23use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
24
25/// Configuration for the `splunk_hec_logs` sink.
26#[configurable_component(sink(
27    "splunk_hec_logs",
28    "Deliver log data to Splunk's HTTP Event Collector."
29))]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct HecLogsSinkConfig {
33    /// Default Splunk HEC token.
34    ///
35    /// If an event has a token set in its secrets (`splunk_hec_token`), it prevails over the one set here.
36    #[serde(alias = "token")]
37    pub default_token: SensitiveString,
38
39    /// The base URL of the Splunk instance.
40    ///
41    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
42    /// by the [`Splunk`][splunk] API are used.
43    ///
44    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
45    #[configurable(metadata(
46        docs::examples = "https://http-inputs-hec.splunkcloud.com",
47        docs::examples = "https://hec.splunk.com:8088",
48        docs::examples = "http://example.com"
49    ))]
50    #[configurable(validation(format = "uri"))]
51    pub endpoint: String,
52
53    /// Overrides the name of the log field used to retrieve the hostname to send to Splunk HEC.
54    ///
55    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
56    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
57    ///
58    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
59    // NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
60    //       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
61    //       empty string to a `None` path internally.
62    #[configurable(metadata(docs::advanced))]
63    pub host_key: Option<OptionalTargetPath>,
64
65    /// Fields to be [added to Splunk index][splunk_field_index_docs].
66    ///
67    /// [splunk_field_index_docs]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/IFXandHEC
68    #[configurable(metadata(docs::advanced))]
69    #[serde(default)]
70    #[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))]
71    pub indexed_fields: Vec<ConfigValuePath>,
72
73    /// The name of the index to send events to.
74    ///
75    /// If not specified, the default index defined within Splunk is used.
76    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
77    pub index: Option<Template>,
78
79    /// The sourcetype of events sent to this sink.
80    ///
81    /// If unset, Splunk defaults to `httpevent`.
82    #[configurable(metadata(docs::advanced))]
83    #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
84    pub sourcetype: Option<Template>,
85
86    /// The source of events sent to this sink.
87    ///
88    /// This is typically the filename the logs originated from.
89    ///
90    /// If unset, the Splunk collector sets it.
91    #[configurable(metadata(docs::advanced))]
92    #[configurable(metadata(
93        docs::examples = "{{ file }}",
94        docs::examples = "/var/log/syslog",
95        docs::examples = "UDP:514"
96    ))]
97    pub source: Option<Template>,
98
99    #[configurable(derived)]
100    pub encoding: EncodingConfig,
101
102    #[configurable(derived)]
103    #[serde(default)]
104    pub compression: Compression,
105
106    #[configurable(derived)]
107    #[serde(default)]
108    pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
109
110    #[configurable(derived)]
111    #[serde(default)]
112    pub request: TowerRequestConfig,
113
114    #[configurable(derived)]
115    pub tls: Option<TlsConfig>,
116
117    #[configurable(derived)]
118    #[serde(default)]
119    pub acknowledgements: HecClientAcknowledgementsConfig,
120
121    // This settings is relevant only for the `humio_logs` sink and should be left as `None`
122    // everywhere else.
123    #[serde(skip)]
124    pub timestamp_nanos_key: Option<String>,
125
126    /// Overrides the name of the log field used to retrieve the timestamp to send to Splunk HEC.
127    /// When set to `“”`, a timestamp is not set in the events sent to Splunk HEC.
128    ///
129    /// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
130    /// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
131    ///
132    /// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
133    #[configurable(metadata(docs::advanced))]
134    #[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
135    // NOTE: The `OptionalTargetPath` is wrapped in an `Option` in order to distinguish between a true
136    //       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
137    //       empty string to a `None` path internally.
138    pub timestamp_key: Option<OptionalTargetPath>,
139
140    /// Passes the `auto_extract_timestamp` option to Splunk.
141    ///
142    /// This option is only relevant to Splunk v8.x and above, and is only applied when
143    /// `endpoint_target` is set to `event`.
144    ///
145    /// Setting this to `true` causes Splunk to extract the timestamp from the message text
146    /// rather than use the timestamp embedded in the event. The timestamp must be in the format
147    /// `yyyy-mm-dd hh:mm:ss`.
148    #[serde(default)]
149    pub auto_extract_timestamp: Option<bool>,
150
151    #[configurable(derived)]
152    #[configurable(metadata(docs::advanced))]
153    #[serde(default = "default_endpoint_target")]
154    pub endpoint_target: EndpointTarget,
155}
156
157const fn default_endpoint_target() -> EndpointTarget {
158    EndpointTarget::Event
159}
160
161impl GenerateConfig for HecLogsSinkConfig {
162    fn generate_config() -> toml::Value {
163        toml::Value::try_from(Self {
164            default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
165            endpoint: "endpoint".to_owned(),
166            host_key: None,
167            indexed_fields: vec![],
168            index: None,
169            sourcetype: None,
170            source: None,
171            encoding: TextSerializerConfig::default().into(),
172            compression: Compression::default(),
173            batch: BatchConfig::default(),
174            request: TowerRequestConfig::default(),
175            tls: None,
176            acknowledgements: Default::default(),
177            timestamp_nanos_key: None,
178            timestamp_key: None,
179            auto_extract_timestamp: None,
180            endpoint_target: EndpointTarget::Event,
181        })
182        .unwrap()
183    }
184}
185
186#[async_trait::async_trait]
187#[typetag::serde(name = "splunk_hec_logs")]
188impl SinkConfig for HecLogsSinkConfig {
189    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
190        if self.auto_extract_timestamp.is_some() && self.endpoint_target == EndpointTarget::Raw {
191            return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
192        }
193
194        let client = create_client(self.tls.as_ref(), cx.proxy())?;
195        let healthcheck = build_healthcheck(
196            self.endpoint.clone(),
197            self.default_token.inner().to_owned(),
198            client.clone(),
199        )
200        .boxed();
201        let sink = self.build_processor(client, cx)?;
202
203        Ok((sink, healthcheck))
204    }
205
206    fn input(&self) -> Input {
207        Input::new(self.encoding.config().input_type() & DataType::Log)
208    }
209
210    fn acknowledgements(&self) -> &AcknowledgementsConfig {
211        &self.acknowledgements.inner
212    }
213}
214
215impl HecLogsSinkConfig {
216    pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
217        let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
218            Some(client.clone())
219        } else {
220            None
221        };
222
223        let transformer = self.encoding.transformer();
224        let serializer = self.encoding.build()?;
225        let encoder = Encoder::<()>::new(serializer);
226        let encoder = HecLogsEncoder {
227            transformer,
228            encoder,
229            auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
230        };
231        let request_builder = HecLogsRequestBuilder {
232            encoder,
233            compression: self.compression,
234        };
235
236        let request_settings = self.request.into_settings();
237        let http_request_builder = Arc::new(HttpRequestBuilder::new(
238            self.endpoint.clone(),
239            self.endpoint_target,
240            self.default_token.inner().to_owned(),
241            self.compression,
242        ));
243        let http_service = ServiceBuilder::new()
244            .settings(request_settings, HttpRetryLogic::default())
245            .service(build_http_batch_service(
246                client,
247                Arc::clone(&http_request_builder),
248                self.endpoint_target,
249                self.auto_extract_timestamp.unwrap_or_default(),
250            ));
251
252        let service = HecService::new(
253            http_service,
254            ack_client,
255            http_request_builder,
256            self.acknowledgements.clone(),
257        );
258
259        let batch_settings = self.batch.into_batcher_settings()?;
260
261        let sink = HecLogsSink {
262            service,
263            request_builder,
264            batch_settings,
265            sourcetype: self.sourcetype.clone(),
266            source: self.source.clone(),
267            index: self.index.clone(),
268            indexed_fields: self
269                .indexed_fields
270                .iter()
271                .map(|config_path| config_path.0.clone())
272                .collect(),
273            host_key: self.host_key.clone(),
274            timestamp_nanos_key: self.timestamp_nanos_key.clone(),
275            timestamp_key: self.timestamp_key.clone(),
276            endpoint_target: self.endpoint_target,
277            auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
278        };
279
280        Ok(VectorSink::from_event_streamsink(sink))
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use crate::components::validation::prelude::*;
288    use vector_lib::{
289        codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
290        config::LogNamespace,
291    };
292
293    #[test]
294    fn generate_config() {
295        crate::test_util::test_generate_config::<HecLogsSinkConfig>();
296    }
297
298    impl ValidatableComponent for HecLogsSinkConfig {
299        fn validation_configuration() -> ValidationConfiguration {
300            let endpoint = "http://127.0.0.1:9001".to_string();
301
302            let mut batch = BatchConfig::default();
303            batch.max_events = Some(1);
304
305            let config = Self {
306                endpoint: endpoint.clone(),
307                default_token: "i_am_an_island".to_string().into(),
308                host_key: None,
309                indexed_fields: vec![],
310                index: None,
311                sourcetype: None,
312                source: None,
313                encoding: EncodingConfig::new(
314                    JsonSerializerConfig::new(
315                        MetricTagValues::Full,
316                        JsonSerializerOptions::default(),
317                    )
318                    .into(),
319                    Transformer::default(),
320                ),
321                compression: Compression::default(),
322                batch,
323                request: TowerRequestConfig {
324                    timeout_secs: 2,
325                    retry_attempts: 0,
326                    ..Default::default()
327                },
328                tls: None,
329                acknowledgements: HecClientAcknowledgementsConfig {
330                    indexer_acknowledgements_enabled: false,
331                    ..Default::default()
332                },
333                timestamp_nanos_key: None,
334                timestamp_key: None,
335                auto_extract_timestamp: None,
336                endpoint_target: EndpointTarget::Raw,
337            };
338
339            let endpoint = format!("{endpoint}/services/collector/raw");
340
341            let external_resource = ExternalResource::new(
342                ResourceDirection::Push,
343                HttpResourceConfig::from_parts(
344                    http::Uri::try_from(&endpoint).expect("should not fail to parse URI"),
345                    None,
346                ),
347                config.encoding.clone(),
348            );
349
350            ValidationConfiguration::from_sink(
351                Self::NAME,
352                LogNamespace::Legacy,
353                vec![ComponentTestCaseConfig::from_sink(
354                    config,
355                    None,
356                    Some(external_resource),
357                )],
358            )
359        }
360    }
361
362    register_validatable_component!(HecLogsSinkConfig);
363}