vector/sinks/splunk_hec/metrics/
config.rs1use std::sync::Arc;
2
3use futures_util::FutureExt;
4use tower::ServiceBuilder;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7use vector_lib::sensitive_string::SensitiveString;
8use vector_lib::sink::VectorSink;
9
10use super::{request_builder::HecMetricsRequestBuilder, sink::HecMetricsSink};
11use crate::{
12 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::HttpClient,
14 sinks::{
15 splunk_hec::common::{
16 acknowledgements::HecClientAcknowledgementsConfig,
17 build_healthcheck, build_http_batch_service, config_host_key, create_client,
18 service::{HecService, HttpRequestBuilder},
19 EndpointTarget, SplunkHecDefaultBatchSettings,
20 },
21 util::{
22 http::HttpRetryLogic, BatchConfig, Compression, ServiceBuilderExt, TowerRequestConfig,
23 },
24 Healthcheck,
25 },
26 template::Template,
27 tls::TlsConfig,
28};
29
30#[configurable_component(sink(
32 "splunk_hec_metrics",
33 "Deliver metric data to Splunk's HTTP Event Collector."
34))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct HecMetricsSinkConfig {
38 #[configurable(metadata(docs::examples = "service"))]
43 pub default_namespace: Option<String>,
44
45 #[serde(alias = "token")]
49 #[configurable(metadata(
50 docs::examples = "${SPLUNK_HEC_TOKEN}",
51 docs::examples = "A94A8FE5CCB19BA61C4C08"
52 ))]
53 pub default_token: SensitiveString,
54
55 #[configurable(metadata(
62 docs::examples = "https://http-inputs-hec.splunkcloud.com",
63 docs::examples = "https://hec.splunk.com:8088",
64 docs::examples = "http://example.com"
65 ))]
66 #[configurable(validation(format = "uri"))]
67 pub endpoint: String,
68
69 #[configurable(metadata(docs::advanced))]
75 #[serde(default = "config_host_key")]
76 pub host_key: OptionalValuePath,
77
78 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
82 pub index: Option<Template>,
83
84 #[configurable(metadata(docs::advanced))]
88 #[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
89 pub sourcetype: Option<Template>,
90
91 #[configurable(metadata(docs::advanced))]
97 #[configurable(metadata(
98 docs::examples = "{{ file }}",
99 docs::examples = "/var/log/syslog",
100 docs::examples = "UDP:514"
101 ))]
102 pub source: Option<Template>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 pub compression: Compression,
107
108 #[configurable(derived)]
109 #[serde(default)]
110 pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
111
112 #[configurable(derived)]
113 #[serde(default)]
114 pub request: TowerRequestConfig,
115
116 #[configurable(derived)]
117 pub tls: Option<TlsConfig>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 pub acknowledgements: HecClientAcknowledgementsConfig,
122}
123
124impl GenerateConfig for HecMetricsSinkConfig {
125 fn generate_config() -> toml::Value {
126 toml::Value::try_from(Self {
127 default_namespace: None,
128 default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
129 endpoint: "http://localhost:8088".to_owned(),
130 host_key: config_host_key(),
131 index: None,
132 sourcetype: None,
133 source: None,
134 compression: Compression::default(),
135 batch: BatchConfig::default(),
136 request: TowerRequestConfig::default(),
137 tls: None,
138 acknowledgements: Default::default(),
139 })
140 .unwrap()
141 }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "splunk_hec_metrics")]
146impl SinkConfig for HecMetricsSinkConfig {
147 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
148 let client = create_client(self.tls.as_ref(), cx.proxy())?;
149 let healthcheck = build_healthcheck(
150 self.endpoint.clone(),
151 self.default_token.inner().to_owned(),
152 client.clone(),
153 )
154 .boxed();
155 let sink = self.build_processor(client, cx)?;
156 Ok((sink, healthcheck))
157 }
158
159 fn input(&self) -> Input {
160 Input::metric()
161 }
162
163 fn acknowledgements(&self) -> &AcknowledgementsConfig {
164 &self.acknowledgements.inner
165 }
166}
167
168impl HecMetricsSinkConfig {
169 pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
170 let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
171 Some(client.clone())
172 } else {
173 None
174 };
175
176 let request_builder = HecMetricsRequestBuilder {
177 compression: self.compression,
178 };
179
180 let request_settings = self.request.into_settings();
181 let http_request_builder = Arc::new(HttpRequestBuilder::new(
182 self.endpoint.clone(),
183 EndpointTarget::default(),
184 self.default_token.inner().to_owned(),
185 self.compression,
186 ));
187 let http_service = ServiceBuilder::new()
188 .settings(request_settings, HttpRetryLogic::default())
189 .service(build_http_batch_service(
190 client,
191 Arc::clone(&http_request_builder),
192 EndpointTarget::Event,
193 false,
194 ));
195
196 let service = HecService::new(
197 http_service,
198 ack_client,
199 http_request_builder,
200 self.acknowledgements.clone(),
201 );
202
203 let batch_settings = self.batch.into_batcher_settings()?;
204
205 let sink = HecMetricsSink {
206 service,
207 batch_settings,
208 request_builder,
209 sourcetype: self.sourcetype.clone(),
210 source: self.source.clone(),
211 index: self.index.clone(),
212 host_key: self.host_key.path.clone(),
213 default_namespace: self.default_namespace.clone(),
214 };
215
216 Ok(VectorSink::from_event_streamsink(sink))
217 }
218}