1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use indoc::indoc;
5use vector_lib::codecs::JsonSerializerConfig;
6use vector_lib::configurable::configurable_component;
7use vector_lib::lookup;
8use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath, OptionalValuePath};
9use vector_lib::sensitive_string::SensitiveString;
10use vector_lib::sink::StreamSink;
11
12use super::{
13 config_host_key,
14 logs::{HumioLogsConfig, HOST},
15};
16use crate::{
17 config::{
18 AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
19 },
20 event::{Event, EventArray, EventContainer},
21 sinks::{
22 splunk_hec::common::SplunkHecDefaultBatchSettings,
23 util::{BatchConfig, Compression, TowerRequestConfig},
24 Healthcheck, VectorSink,
25 },
26 template::Template,
27 tls::TlsConfig,
28 transforms::{
29 metric_to_log::{MetricToLog, MetricToLogConfig},
30 FunctionTransform, OutputBuffer,
31 },
32};
33
34#[configurable_component(sink("humio_metrics", "Deliver metric event data to Humio."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct HumioMetricsConfig {
46 #[serde(flatten)]
47 transform: MetricToLogConfig,
48
49 #[configurable(metadata(
51 docs::examples = "${HUMIO_TOKEN}",
52 docs::examples = "A94A8FE5CCB19BA61C4C08"
53 ))]
54 token: SensitiveString,
55
56 #[serde(alias = "host")]
63 #[serde(default = "default_endpoint")]
64 #[configurable(metadata(
65 docs::examples = "http://127.0.0.1",
66 docs::examples = "https://example.com",
67 ))]
68 pub(super) endpoint: String,
69
70 source: Option<Template>,
74
75 #[configurable(metadata(
79 docs::examples = "json",
80 docs::examples = "none",
81 docs::examples = "{{ event_type }}"
82 ))]
83 event_type: Option<Template>,
84
85 #[serde(default = "config_host_key")]
92 host_key: OptionalValuePath,
93
94 #[serde(default)]
102 indexed_fields: Vec<ConfigValuePath>,
103
104 #[serde(default)]
114 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
115 index: Option<Template>,
116
117 #[configurable(derived)]
118 #[serde(default)]
119 compression: Compression,
120
121 #[configurable(derived)]
122 #[serde(default)]
123 request: TowerRequestConfig,
124
125 #[configurable(derived)]
126 #[serde(default)]
127 batch: BatchConfig<SplunkHecDefaultBatchSettings>,
128
129 #[configurable(derived)]
130 tls: Option<TlsConfig>,
131
132 #[configurable(derived)]
133 #[serde(
134 default,
135 deserialize_with = "crate::serde::bool_or_struct",
136 skip_serializing_if = "crate::serde::is_default"
137 )]
138 acknowledgements: AcknowledgementsConfig,
139}
140
141fn default_endpoint() -> String {
142 HOST.to_string()
143}
144
145impl GenerateConfig for HumioMetricsConfig {
146 fn generate_config() -> toml::Value {
147 toml::from_str(indoc! {r#"
148 host_key = "hostname"
149 token = "${HUMIO_TOKEN}"
150 "#})
151 .unwrap()
152 }
153}
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "humio_metrics")]
157impl SinkConfig for HumioMetricsConfig {
158 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
159 let transform = self
160 .transform
161 .build_transform(&TransformContext::new_with_globals(cx.globals.clone()));
162
163 let sink = HumioLogsConfig {
164 token: self.token.clone(),
165 endpoint: self.endpoint.clone(),
166 source: self.source.clone(),
167 encoding: JsonSerializerConfig::default().into(),
168 event_type: self.event_type.clone(),
169 host_key: OptionalTargetPath::from(
170 vrl::path::PathPrefix::Event,
171 self.host_key.path.clone(),
172 ),
173 indexed_fields: self.indexed_fields.clone(),
174 index: self.index.clone(),
175 compression: self.compression,
176 request: self.request,
177 batch: self.batch,
178 tls: self.tls.clone(),
179 timestamp_nanos_key: None,
180 acknowledgements: Default::default(),
181 timestamp_key: OptionalTargetPath::from(
183 vrl::path::PathPrefix::Event,
184 Some(lookup::owned_value_path!("timestamp")),
185 ),
186 };
187
188 let (sink, healthcheck) = sink.clone().build(cx).await?;
189
190 let sink = HumioMetricsSink {
191 inner: sink,
192 transform,
193 };
194
195 Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
196 }
197
198 fn input(&self) -> Input {
199 Input::metric()
200 }
201
202 fn acknowledgements(&self) -> &AcknowledgementsConfig {
203 &self.acknowledgements
204 }
205}
206
207pub struct HumioMetricsSink {
208 inner: VectorSink,
209 transform: MetricToLog,
210}
211
212#[async_trait]
213impl StreamSink<EventArray> for HumioMetricsSink {
214 async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
215 let mut transform = self.transform;
216 self.inner
217 .run(input.map(move |events| {
218 let mut buf = OutputBuffer::with_capacity(events.len());
219 for event in events.into_events() {
220 transform.transform(&mut buf, event);
221 }
222 let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
224 events.into()
225 }))
226 .await
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use chrono::{offset::TimeZone, Utc};
233 use futures::stream;
234 use indoc::indoc;
235 use similar_asserts::assert_eq;
236 use vector_lib::metric_tags;
237
238 use super::*;
239 use crate::{
240 event::{
241 metric::{MetricKind, MetricValue, StatisticKind},
242 Event, Metric,
243 },
244 sinks::util::test::{build_test_server, load_sink},
245 test_util::{
246 self,
247 components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
248 },
249 };
250
251 #[test]
252 fn generate_config() {
253 crate::test_util::test_generate_config::<HumioMetricsConfig>();
254 }
255
256 #[test]
257 fn test_endpoint_field() {
258 let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
259 token = "atoken"
260 batch.max_events = 1
261 endpoint = "https://localhost:9200/"
262 "#})
263 .unwrap();
264
265 assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
266 let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
267 token = "atoken"
268 batch.max_events = 1
269 host = "https://localhost:9200/"
270 "#})
271 .unwrap();
272
273 assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
274 }
275
276 #[tokio::test]
277 async fn smoke_json() {
278 let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
279 token = "atoken"
280 batch.max_events = 1
281 "#})
282 .unwrap();
283
284 let addr = test_util::next_addr();
285 config.endpoint = format!("http://{addr}");
288
289 let (sink, _) = config.build(cx).await.unwrap();
290
291 let (rx, _trigger, server) = build_test_server(addr);
292 tokio::spawn(server);
293
294 let metrics = vec![
296 Event::from(
297 Metric::new(
298 "metric1",
299 MetricKind::Incremental,
300 MetricValue::Counter { value: 42.0 },
301 )
302 .with_tags(Some(metric_tags!("os.host" => "somehost")))
303 .with_timestamp(Some(
304 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
305 .single()
306 .expect("invalid timestamp"),
307 )),
308 ),
309 Event::from(
310 Metric::new(
311 "metric2",
312 MetricKind::Absolute,
313 MetricValue::Distribution {
314 samples: vector_lib::samples![1.0 => 100, 2.0 => 200, 3.0 => 300],
315 statistic: StatisticKind::Histogram,
316 },
317 )
318 .with_tags(Some(metric_tags!("os.host" => "somehost")))
319 .with_timestamp(Some(
320 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 2)
321 .single()
322 .expect("invalid timestamp"),
323 )),
324 ),
325 ];
326
327 let len = metrics.len();
328 run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
329
330 let output = rx.take(len).collect::<Vec<_>>().await;
331 assert_eq!(
332 r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"os.host":"somehost"}},"fields":{},"time":1597784401.0}"#,
333 output[0].1
334 );
335 assert_eq!(
336 r#"{"event":{"distribution":{"samples":[{"rate":100,"value":1.0},{"rate":200,"value":2.0},{"rate":300,"value":3.0}],"statistic":"histogram"},"kind":"absolute","name":"metric2","tags":{"os.host":"somehost"}},"fields":{},"time":1597784402.0}"#,
337 output[1].1
338 );
339 }
340
341 #[tokio::test]
342 async fn multi_value_tags() {
343 let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
344 token = "atoken"
345 batch.max_events = 1
346 metric_tag_values = "full"
347 "#})
348 .unwrap();
349
350 let addr = test_util::next_addr();
351 config.endpoint = format!("http://{addr}");
354
355 let (sink, _) = config.build(cx).await.unwrap();
356
357 let (rx, _trigger, server) = build_test_server(addr);
358 tokio::spawn(server);
359
360 let metrics = vec![Event::from(
362 Metric::new(
363 "metric1",
364 MetricKind::Incremental,
365 MetricValue::Counter { value: 42.0 },
366 )
367 .with_tags(Some(metric_tags!(
368 "code" => "200",
369 "code" => "success"
370 )))
371 .with_timestamp(Some(
372 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
373 .single()
374 .expect("invalid timestamp"),
375 )),
376 )];
377
378 let len = metrics.len();
379 run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
380
381 let output = rx.take(len).collect::<Vec<_>>().await;
382 assert_eq!(
383 r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"code":["200","success"]}},"fields":{},"time":1597784401.0}"#,
384 output[0].1
385 );
386 }
387}