vector/sinks/splunk_hec/metrics/
config.rs

1use std::sync::Arc;
2
3use futures_util::FutureExt;
4use tower::ServiceBuilder;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7use vector_lib::sensitive_string::SensitiveString;
8use vector_lib::sink::VectorSink;
9
10use super::{request_builder::HecMetricsRequestBuilder, sink::HecMetricsSink};
11use crate::{
12    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::HttpClient,
14    sinks::{
15        splunk_hec::common::{
16            acknowledgements::HecClientAcknowledgementsConfig,
17            build_healthcheck, build_http_batch_service, config_host_key, create_client,
18            service::{HecService, HttpRequestBuilder},
19            EndpointTarget, SplunkHecDefaultBatchSettings,
20        },
21        util::{
22            http::HttpRetryLogic, BatchConfig, Compression, ServiceBuilderExt, TowerRequestConfig,
23        },
24        Healthcheck,
25    },
26    template::Template,
27    tls::TlsConfig,
28};
29
30/// Configuration of the `splunk_hec_metrics` sink.
31#[configurable_component(sink(
32    "splunk_hec_metrics",
33    "Deliver metric data to Splunk's HTTP Event Collector."
34))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct HecMetricsSinkConfig {
38    /// Sets the default namespace for any metrics sent.
39    ///
40    /// This namespace is only used if a metric has no existing namespace. When a namespace is
41    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
42    #[configurable(metadata(docs::examples = "service"))]
43    pub default_namespace: Option<String>,
44
45    /// Default Splunk HEC token.
46    ///
47    /// If an event has a token set in its metadata, it prevails over the one set here.
48    #[serde(alias = "token")]
49    #[configurable(metadata(
50        docs::examples = "${SPLUNK_HEC_TOKEN}",
51        docs::examples = "A94A8FE5CCB19BA61C4C08"
52    ))]
53    pub default_token: SensitiveString,
54
55    /// The base URL of the Splunk instance.
56    ///
57    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
58    /// by the [`Splunk`][splunk] API are used.
59    ///
60    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
61    #[configurable(metadata(
62        docs::examples = "https://http-inputs-hec.splunkcloud.com",
63        docs::examples = "https://hec.splunk.com:8088",
64        docs::examples = "http://example.com"
65    ))]
66    #[configurable(validation(format = "uri"))]
67    pub endpoint: String,
68
69    /// Overrides the name of the log field used to retrieve the hostname to send to Splunk HEC.
70    ///
71    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
72    ///
73    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
74    #[configurable(metadata(docs::advanced))]
75    #[serde(default = "config_host_key")]
76    pub host_key: OptionalValuePath,
77
78    /// The name of the index where to send the events to.
79    ///
80    /// If not specified, the default index defined within Splunk is used.
81    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
82    pub index: Option<Template>,
83
84    /// The sourcetype of events sent to this sink.
85    ///
86    /// If unset, Splunk defaults to `httpevent`.
87    #[configurable(metadata(docs::advanced))]
88    #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
89    pub sourcetype: Option<Template>,
90
91    /// The source of events sent to this sink.
92    ///
93    /// This is typically the filename the logs originated from.
94    ///
95    /// If unset, the Splunk collector sets it.
96    #[configurable(metadata(docs::advanced))]
97    #[configurable(metadata(
98        docs::examples = "{{ file }}",
99        docs::examples = "/var/log/syslog",
100        docs::examples = "UDP:514"
101    ))]
102    pub source: Option<Template>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    pub compression: Compression,
107
108    #[configurable(derived)]
109    #[serde(default)]
110    pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
111
112    #[configurable(derived)]
113    #[serde(default)]
114    pub request: TowerRequestConfig,
115
116    #[configurable(derived)]
117    pub tls: Option<TlsConfig>,
118
119    #[configurable(derived)]
120    #[serde(default)]
121    pub acknowledgements: HecClientAcknowledgementsConfig,
122}
123
124impl GenerateConfig for HecMetricsSinkConfig {
125    fn generate_config() -> toml::Value {
126        toml::Value::try_from(Self {
127            default_namespace: None,
128            default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
129            endpoint: "http://localhost:8088".to_owned(),
130            host_key: config_host_key(),
131            index: None,
132            sourcetype: None,
133            source: None,
134            compression: Compression::default(),
135            batch: BatchConfig::default(),
136            request: TowerRequestConfig::default(),
137            tls: None,
138            acknowledgements: Default::default(),
139        })
140        .unwrap()
141    }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "splunk_hec_metrics")]
146impl SinkConfig for HecMetricsSinkConfig {
147    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
148        let client = create_client(self.tls.as_ref(), cx.proxy())?;
149        let healthcheck = build_healthcheck(
150            self.endpoint.clone(),
151            self.default_token.inner().to_owned(),
152            client.clone(),
153        )
154        .boxed();
155        let sink = self.build_processor(client, cx)?;
156        Ok((sink, healthcheck))
157    }
158
159    fn input(&self) -> Input {
160        Input::metric()
161    }
162
163    fn acknowledgements(&self) -> &AcknowledgementsConfig {
164        &self.acknowledgements.inner
165    }
166}
167
168impl HecMetricsSinkConfig {
169    pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
170        let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
171            Some(client.clone())
172        } else {
173            None
174        };
175
176        let request_builder = HecMetricsRequestBuilder {
177            compression: self.compression,
178        };
179
180        let request_settings = self.request.into_settings();
181        let http_request_builder = Arc::new(HttpRequestBuilder::new(
182            self.endpoint.clone(),
183            EndpointTarget::default(),
184            self.default_token.inner().to_owned(),
185            self.compression,
186        ));
187        let http_service = ServiceBuilder::new()
188            .settings(request_settings, HttpRetryLogic::default())
189            .service(build_http_batch_service(
190                client,
191                Arc::clone(&http_request_builder),
192                EndpointTarget::Event,
193                false,
194            ));
195
196        let service = HecService::new(
197            http_service,
198            ack_client,
199            http_request_builder,
200            self.acknowledgements.clone(),
201        );
202
203        let batch_settings = self.batch.into_batcher_settings()?;
204
205        let sink = HecMetricsSink {
206            service,
207            batch_settings,
208            request_builder,
209            sourcetype: self.sourcetype.clone(),
210            source: self.source.clone(),
211            index: self.index.clone(),
212            host_key: self.host_key.path.clone(),
213            default_namespace: self.default_namespace.clone(),
214        };
215
216        Ok(VectorSink::from_event_streamsink(sink))
217    }
218}