vector/sinks/humio/
metrics.rs

1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use indoc::indoc;
5use vector_lib::{
6    codecs::JsonSerializerConfig,
7    configurable::configurable_component,
8    lookup,
9    lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath, OptionalValuePath},
10    sensitive_string::SensitiveString,
11    sink::StreamSink,
12};
13
14use super::{
15    config_host_key,
16    logs::{HOST, HumioLogsConfig},
17};
18use crate::{
19    config::{
20        AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
21    },
22    event::{Event, EventArray, EventContainer},
23    sinks::{
24        Healthcheck, VectorSink,
25        splunk_hec::common::SplunkHecDefaultBatchSettings,
26        util::{BatchConfig, Compression, TowerRequestConfig},
27    },
28    template::Template,
29    tls::TlsConfig,
30    transforms::{
31        FunctionTransform, OutputBuffer,
32        metric_to_log::{MetricToLog, MetricToLogConfig},
33    },
34};
35
36/// Configuration for the `humio_metrics` sink.
37//
38// TODO: This sink overlaps almost entirely with the `humio_logs` sink except for the metric-to-log
39// transform that it uses to get metrics into the shape of a log before sending to Humio. However,
40// due to issues with aliased fields and flattened fields [1] in `serde`, we can't embed the
41// `humio_logs` config here.
42//
43// [1]: https://github.com/serde-rs/serde/issues/1504
44#[configurable_component(sink("humio_metrics", "Deliver metric event data to Humio."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct HumioMetricsConfig {
48    #[serde(flatten)]
49    transform: MetricToLogConfig,
50
51    /// The Humio ingestion token.
52    #[configurable(metadata(
53        docs::examples = "${HUMIO_TOKEN}",
54        docs::examples = "A94A8FE5CCB19BA61C4C08"
55    ))]
56    token: SensitiveString,
57
58    /// The base URL of the Humio instance.
59    ///
60    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
61    /// by the [`Splunk`][splunk] API are used.
62    ///
63    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
64    #[serde(alias = "host")]
65    #[serde(default = "default_endpoint")]
66    #[configurable(metadata(
67        docs::examples = "http://127.0.0.1",
68        docs::examples = "https://example.com",
69    ))]
70    pub(super) endpoint: String,
71
72    /// The source of events sent to this sink.
73    ///
74    /// Typically the filename the metrics originated from. Maps to `@source` in Humio.
75    source: Option<Template>,
76
77    /// The type of events sent to this sink. Humio uses this as the name of the parser to use to ingest the data.
78    ///
79    /// If unset, Humio defaults it to none.
80    #[configurable(metadata(
81        docs::examples = "json",
82        docs::examples = "none",
83        docs::examples = "{{ event_type }}"
84    ))]
85    event_type: Option<Template>,
86
87    /// Overrides the name of the log field used to retrieve the hostname to send to Humio.
88    ///
89    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
90    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
91    ///
92    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
93    #[serde(default = "config_host_key")]
94    host_key: OptionalValuePath,
95
96    /// Event fields to be added to Humio’s extra fields.
97    ///
98    /// Can be used to tag events by specifying fields starting with `#`.
99    ///
100    /// For more information, see [Humio’s Format of Data][humio_data_format].
101    ///
102    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
103    #[serde(default)]
104    indexed_fields: Vec<ConfigValuePath>,
105
106    /// Optional name of the repository to ingest into.
107    ///
108    /// In public-facing APIs, this must (if present) be equal to the repository used to create the ingest token used for authentication.
109    ///
110    /// In private cluster setups, Humio can be configured to allow these to be different.
111    ///
112    /// For more information, see [Humio’s Format of Data][humio_data_format].
113    ///
114    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
115    #[serde(default)]
116    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
117    index: Option<Template>,
118
119    #[configurable(derived)]
120    #[serde(default)]
121    compression: Compression,
122
123    #[configurable(derived)]
124    #[serde(default)]
125    request: TowerRequestConfig,
126
127    #[configurable(derived)]
128    #[serde(default)]
129    batch: BatchConfig<SplunkHecDefaultBatchSettings>,
130
131    #[configurable(derived)]
132    tls: Option<TlsConfig>,
133
134    #[configurable(derived)]
135    #[serde(
136        default,
137        deserialize_with = "crate::serde::bool_or_struct",
138        skip_serializing_if = "crate::serde::is_default"
139    )]
140    acknowledgements: AcknowledgementsConfig,
141}
142
143fn default_endpoint() -> String {
144    HOST.to_string()
145}
146
147impl GenerateConfig for HumioMetricsConfig {
148    fn generate_config() -> toml::Value {
149        toml::from_str(indoc! {r#"
150                host_key = "hostname"
151                token = "${HUMIO_TOKEN}"
152            "#})
153        .unwrap()
154    }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "humio_metrics")]
159impl SinkConfig for HumioMetricsConfig {
160    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
161        let transform = self
162            .transform
163            .build_transform(&TransformContext::new_with_globals(cx.globals.clone()));
164
165        let sink = HumioLogsConfig {
166            token: self.token.clone(),
167            endpoint: self.endpoint.clone(),
168            source: self.source.clone(),
169            encoding: JsonSerializerConfig::default().into(),
170            event_type: self.event_type.clone(),
171            host_key: OptionalTargetPath::from(
172                vrl::path::PathPrefix::Event,
173                self.host_key.path.clone(),
174            ),
175            indexed_fields: self.indexed_fields.clone(),
176            index: self.index.clone(),
177            compression: self.compression,
178            request: self.request,
179            batch: self.batch,
180            tls: self.tls.clone(),
181            timestamp_nanos_key: None,
182            acknowledgements: Default::default(),
183            // hard coded as humio expects this format so no sense in making it configurable
184            timestamp_key: OptionalTargetPath::from(
185                vrl::path::PathPrefix::Event,
186                Some(lookup::owned_value_path!("timestamp")),
187            ),
188        };
189
190        let (sink, healthcheck) = sink.clone().build(cx).await?;
191
192        let sink = HumioMetricsSink {
193            inner: sink,
194            transform,
195        };
196
197        Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
198    }
199
200    fn input(&self) -> Input {
201        Input::metric()
202    }
203
204    fn acknowledgements(&self) -> &AcknowledgementsConfig {
205        &self.acknowledgements
206    }
207}
208
209pub struct HumioMetricsSink {
210    inner: VectorSink,
211    transform: MetricToLog,
212}
213
214#[async_trait]
215impl StreamSink<EventArray> for HumioMetricsSink {
216    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
217        let mut transform = self.transform;
218        self.inner
219            .run(input.map(move |events| {
220                let mut buf = OutputBuffer::with_capacity(events.len());
221                for event in events.into_events() {
222                    transform.transform(&mut buf, event);
223                }
224                // Awkward but necessary for the `EventArray` type
225                let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
226                events.into()
227            }))
228            .await
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use chrono::{Utc, offset::TimeZone};
235    use futures::stream;
236    use indoc::indoc;
237    use similar_asserts::assert_eq;
238    use vector_lib::metric_tags;
239
240    use super::*;
241    use crate::{
242        event::{
243            Event, Metric,
244            metric::{MetricKind, MetricValue, StatisticKind},
245        },
246        sinks::util::test::{build_test_server, load_sink},
247        test_util::{
248            self,
249            components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
250        },
251    };
252
253    #[test]
254    fn generate_config() {
255        crate::test_util::test_generate_config::<HumioMetricsConfig>();
256    }
257
258    #[test]
259    fn test_endpoint_field() {
260        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
261            token = "atoken"
262            batch.max_events = 1
263            endpoint = "https://localhost:9200/"
264        "#})
265        .unwrap();
266
267        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
268        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
269            token = "atoken"
270            batch.max_events = 1
271            host = "https://localhost:9200/"
272        "#})
273        .unwrap();
274
275        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
276    }
277
278    #[tokio::test]
279    async fn smoke_json() {
280        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
281            token = "atoken"
282            batch.max_events = 1
283        "#})
284        .unwrap();
285
286        let addr = test_util::next_addr();
287        // Swap out the endpoint so we can force send it
288        // to our local server
289        config.endpoint = format!("http://{addr}");
290
291        let (sink, _) = config.build(cx).await.unwrap();
292
293        let (rx, _trigger, server) = build_test_server(addr);
294        tokio::spawn(server);
295
296        // Make our test metrics.
297        let metrics = vec![
298            Event::from(
299                Metric::new(
300                    "metric1",
301                    MetricKind::Incremental,
302                    MetricValue::Counter { value: 42.0 },
303                )
304                .with_tags(Some(metric_tags!("os.host" => "somehost")))
305                .with_timestamp(Some(
306                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
307                        .single()
308                        .expect("invalid timestamp"),
309                )),
310            ),
311            Event::from(
312                Metric::new(
313                    "metric2",
314                    MetricKind::Absolute,
315                    MetricValue::Distribution {
316                        samples: vector_lib::samples![1.0 => 100, 2.0 => 200, 3.0 => 300],
317                        statistic: StatisticKind::Histogram,
318                    },
319                )
320                .with_tags(Some(metric_tags!("os.host" => "somehost")))
321                .with_timestamp(Some(
322                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 2)
323                        .single()
324                        .expect("invalid timestamp"),
325                )),
326            ),
327        ];
328
329        let len = metrics.len();
330        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
331
332        let output = rx.take(len).collect::<Vec<_>>().await;
333        assert_eq!(
334            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"os.host":"somehost"}},"fields":{},"time":1597784401.0}"#,
335            output[0].1
336        );
337        assert_eq!(
338            r#"{"event":{"distribution":{"samples":[{"rate":100,"value":1.0},{"rate":200,"value":2.0},{"rate":300,"value":3.0}],"statistic":"histogram"},"kind":"absolute","name":"metric2","tags":{"os.host":"somehost"}},"fields":{},"time":1597784402.0}"#,
339            output[1].1
340        );
341    }
342
343    #[tokio::test]
344    async fn multi_value_tags() {
345        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
346            token = "atoken"
347            batch.max_events = 1
348            metric_tag_values = "full"
349        "#})
350        .unwrap();
351
352        let addr = test_util::next_addr();
353        // Swap out the endpoint so we can force send it
354        // to our local server
355        config.endpoint = format!("http://{addr}");
356
357        let (sink, _) = config.build(cx).await.unwrap();
358
359        let (rx, _trigger, server) = build_test_server(addr);
360        tokio::spawn(server);
361
362        // Make our test metrics.
363        let metrics = vec![Event::from(
364            Metric::new(
365                "metric1",
366                MetricKind::Incremental,
367                MetricValue::Counter { value: 42.0 },
368            )
369            .with_tags(Some(metric_tags!(
370                "code" => "200",
371                "code" => "success"
372            )))
373            .with_timestamp(Some(
374                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
375                    .single()
376                    .expect("invalid timestamp"),
377            )),
378        )];
379
380        let len = metrics.len();
381        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
382
383        let output = rx.take(len).collect::<Vec<_>>().await;
384        assert_eq!(
385            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"code":["200","success"]}},"fields":{},"time":1597784401.0}"#,
386            output[0].1
387        );
388    }
389}