1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use indoc::indoc;
5use vector_lib::{
6 codecs::JsonSerializerConfig,
7 configurable::configurable_component,
8 lookup,
9 lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath, OptionalValuePath},
10 sensitive_string::SensitiveString,
11 sink::StreamSink,
12};
13
14use super::{
15 config_host_key,
16 logs::{HOST, HumioLogsConfig},
17};
18use crate::{
19 config::{
20 AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
21 },
22 event::{Event, EventArray, EventContainer},
23 sinks::{
24 Healthcheck, VectorSink,
25 splunk_hec::common::SplunkHecDefaultBatchSettings,
26 util::{BatchConfig, Compression, TowerRequestConfig},
27 },
28 template::Template,
29 tls::TlsConfig,
30 transforms::{
31 FunctionTransform, OutputBuffer,
32 metric_to_log::{MetricToLog, MetricToLogConfig},
33 },
34};
35
36#[configurable_component(sink("humio_metrics", "Deliver metric event data to Humio."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct HumioMetricsConfig {
48 #[serde(flatten)]
49 transform: MetricToLogConfig,
50
51 #[configurable(metadata(
53 docs::examples = "${HUMIO_TOKEN}",
54 docs::examples = "A94A8FE5CCB19BA61C4C08"
55 ))]
56 token: SensitiveString,
57
58 #[serde(alias = "host")]
65 #[serde(default = "default_endpoint")]
66 #[configurable(metadata(
67 docs::examples = "http://127.0.0.1",
68 docs::examples = "https://example.com",
69 ))]
70 pub(super) endpoint: String,
71
72 source: Option<Template>,
76
77 #[configurable(metadata(
81 docs::examples = "json",
82 docs::examples = "none",
83 docs::examples = "{{ event_type }}"
84 ))]
85 event_type: Option<Template>,
86
87 #[serde(default = "config_host_key")]
94 host_key: OptionalValuePath,
95
96 #[serde(default)]
104 indexed_fields: Vec<ConfigValuePath>,
105
106 #[serde(default)]
116 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
117 index: Option<Template>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 compression: Compression,
122
123 #[configurable(derived)]
124 #[serde(default)]
125 request: TowerRequestConfig,
126
127 #[configurable(derived)]
128 #[serde(default)]
129 batch: BatchConfig<SplunkHecDefaultBatchSettings>,
130
131 #[configurable(derived)]
132 tls: Option<TlsConfig>,
133
134 #[configurable(derived)]
135 #[serde(
136 default,
137 deserialize_with = "crate::serde::bool_or_struct",
138 skip_serializing_if = "crate::serde::is_default"
139 )]
140 acknowledgements: AcknowledgementsConfig,
141}
142
143fn default_endpoint() -> String {
144 HOST.to_string()
145}
146
147impl GenerateConfig for HumioMetricsConfig {
148 fn generate_config() -> toml::Value {
149 toml::from_str(indoc! {r#"
150 host_key = "hostname"
151 token = "${HUMIO_TOKEN}"
152 "#})
153 .unwrap()
154 }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "humio_metrics")]
159impl SinkConfig for HumioMetricsConfig {
160 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
161 let transform = self
162 .transform
163 .build_transform(&TransformContext::new_with_globals(cx.globals.clone()));
164
165 let sink = HumioLogsConfig {
166 token: self.token.clone(),
167 endpoint: self.endpoint.clone(),
168 source: self.source.clone(),
169 encoding: JsonSerializerConfig::default().into(),
170 event_type: self.event_type.clone(),
171 host_key: OptionalTargetPath::from(
172 vrl::path::PathPrefix::Event,
173 self.host_key.path.clone(),
174 ),
175 indexed_fields: self.indexed_fields.clone(),
176 index: self.index.clone(),
177 compression: self.compression,
178 request: self.request,
179 batch: self.batch,
180 tls: self.tls.clone(),
181 timestamp_nanos_key: None,
182 acknowledgements: Default::default(),
183 timestamp_key: OptionalTargetPath::from(
185 vrl::path::PathPrefix::Event,
186 Some(lookup::owned_value_path!("timestamp")),
187 ),
188 };
189
190 let (sink, healthcheck) = sink.clone().build(cx).await?;
191
192 let sink = HumioMetricsSink {
193 inner: sink,
194 transform,
195 };
196
197 Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
198 }
199
200 fn input(&self) -> Input {
201 Input::metric()
202 }
203
204 fn acknowledgements(&self) -> &AcknowledgementsConfig {
205 &self.acknowledgements
206 }
207}
208
209pub struct HumioMetricsSink {
210 inner: VectorSink,
211 transform: MetricToLog,
212}
213
214#[async_trait]
215impl StreamSink<EventArray> for HumioMetricsSink {
216 async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
217 let mut transform = self.transform;
218 self.inner
219 .run(input.map(move |events| {
220 let mut buf = OutputBuffer::with_capacity(events.len());
221 for event in events.into_events() {
222 transform.transform(&mut buf, event);
223 }
224 let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
226 events.into()
227 }))
228 .await
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use chrono::{Utc, offset::TimeZone};
235 use futures::stream;
236 use indoc::indoc;
237 use similar_asserts::assert_eq;
238 use vector_lib::metric_tags;
239
240 use super::*;
241 use crate::{
242 event::{
243 Event, Metric,
244 metric::{MetricKind, MetricValue, StatisticKind},
245 },
246 sinks::util::test::{build_test_server, load_sink},
247 test_util::{
248 self,
249 components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
250 },
251 };
252
253 #[test]
254 fn generate_config() {
255 crate::test_util::test_generate_config::<HumioMetricsConfig>();
256 }
257
258 #[test]
259 fn test_endpoint_field() {
260 let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
261 token = "atoken"
262 batch.max_events = 1
263 endpoint = "https://localhost:9200/"
264 "#})
265 .unwrap();
266
267 assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
268 let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
269 token = "atoken"
270 batch.max_events = 1
271 host = "https://localhost:9200/"
272 "#})
273 .unwrap();
274
275 assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
276 }
277
278 #[tokio::test]
279 async fn smoke_json() {
280 let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
281 token = "atoken"
282 batch.max_events = 1
283 "#})
284 .unwrap();
285
286 let addr = test_util::next_addr();
287 config.endpoint = format!("http://{addr}");
290
291 let (sink, _) = config.build(cx).await.unwrap();
292
293 let (rx, _trigger, server) = build_test_server(addr);
294 tokio::spawn(server);
295
296 let metrics = vec![
298 Event::from(
299 Metric::new(
300 "metric1",
301 MetricKind::Incremental,
302 MetricValue::Counter { value: 42.0 },
303 )
304 .with_tags(Some(metric_tags!("os.host" => "somehost")))
305 .with_timestamp(Some(
306 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
307 .single()
308 .expect("invalid timestamp"),
309 )),
310 ),
311 Event::from(
312 Metric::new(
313 "metric2",
314 MetricKind::Absolute,
315 MetricValue::Distribution {
316 samples: vector_lib::samples![1.0 => 100, 2.0 => 200, 3.0 => 300],
317 statistic: StatisticKind::Histogram,
318 },
319 )
320 .with_tags(Some(metric_tags!("os.host" => "somehost")))
321 .with_timestamp(Some(
322 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 2)
323 .single()
324 .expect("invalid timestamp"),
325 )),
326 ),
327 ];
328
329 let len = metrics.len();
330 run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
331
332 let output = rx.take(len).collect::<Vec<_>>().await;
333 assert_eq!(
334 r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"os.host":"somehost"}},"fields":{},"time":1597784401.0}"#,
335 output[0].1
336 );
337 assert_eq!(
338 r#"{"event":{"distribution":{"samples":[{"rate":100,"value":1.0},{"rate":200,"value":2.0},{"rate":300,"value":3.0}],"statistic":"histogram"},"kind":"absolute","name":"metric2","tags":{"os.host":"somehost"}},"fields":{},"time":1597784402.0}"#,
339 output[1].1
340 );
341 }
342
343 #[tokio::test]
344 async fn multi_value_tags() {
345 let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
346 token = "atoken"
347 batch.max_events = 1
348 metric_tag_values = "full"
349 "#})
350 .unwrap();
351
352 let addr = test_util::next_addr();
353 config.endpoint = format!("http://{addr}");
356
357 let (sink, _) = config.build(cx).await.unwrap();
358
359 let (rx, _trigger, server) = build_test_server(addr);
360 tokio::spawn(server);
361
362 let metrics = vec![Event::from(
364 Metric::new(
365 "metric1",
366 MetricKind::Incremental,
367 MetricValue::Counter { value: 42.0 },
368 )
369 .with_tags(Some(metric_tags!(
370 "code" => "200",
371 "code" => "success"
372 )))
373 .with_timestamp(Some(
374 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
375 .single()
376 .expect("invalid timestamp"),
377 )),
378 )];
379
380 let len = metrics.len();
381 run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
382
383 let output = rx.take(len).collect::<Vec<_>>().await;
384 assert_eq!(
385 r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"code":["200","success"]}},"fields":{},"time":1597784401.0}"#,
386 output[0].1
387 );
388 }
389}