vector/sinks/humio/
metrics.rs

1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use indoc::indoc;
5use vector_lib::codecs::JsonSerializerConfig;
6use vector_lib::configurable::configurable_component;
7use vector_lib::lookup;
8use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath, OptionalValuePath};
9use vector_lib::sensitive_string::SensitiveString;
10use vector_lib::sink::StreamSink;
11
12use super::{
13    config_host_key,
14    logs::{HumioLogsConfig, HOST},
15};
16use crate::{
17    config::{
18        AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext, TransformContext,
19    },
20    event::{Event, EventArray, EventContainer},
21    sinks::{
22        splunk_hec::common::SplunkHecDefaultBatchSettings,
23        util::{BatchConfig, Compression, TowerRequestConfig},
24        Healthcheck, VectorSink,
25    },
26    template::Template,
27    tls::TlsConfig,
28    transforms::{
29        metric_to_log::{MetricToLog, MetricToLogConfig},
30        FunctionTransform, OutputBuffer,
31    },
32};
33
34/// Configuration for the `humio_metrics` sink.
35//
36// TODO: This sink overlaps almost entirely with the `humio_logs` sink except for the metric-to-log
37// transform that it uses to get metrics into the shape of a log before sending to Humio. However,
38// due to issues with aliased fields and flattened fields [1] in `serde`, we can't embed the
39// `humio_logs` config here.
40//
41// [1]: https://github.com/serde-rs/serde/issues/1504
42#[configurable_component(sink("humio_metrics", "Deliver metric event data to Humio."))]
43#[derive(Clone, Debug)]
44#[serde(deny_unknown_fields)]
45pub struct HumioMetricsConfig {
46    #[serde(flatten)]
47    transform: MetricToLogConfig,
48
49    /// The Humio ingestion token.
50    #[configurable(metadata(
51        docs::examples = "${HUMIO_TOKEN}",
52        docs::examples = "A94A8FE5CCB19BA61C4C08"
53    ))]
54    token: SensitiveString,
55
56    /// The base URL of the Humio instance.
57    ///
58    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
59    /// by the [`Splunk`][splunk] API are used.
60    ///
61    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
62    #[serde(alias = "host")]
63    #[serde(default = "default_endpoint")]
64    #[configurable(metadata(
65        docs::examples = "http://127.0.0.1",
66        docs::examples = "https://example.com",
67    ))]
68    pub(super) endpoint: String,
69
70    /// The source of events sent to this sink.
71    ///
72    /// Typically the filename the metrics originated from. Maps to `@source` in Humio.
73    source: Option<Template>,
74
75    /// The type of events sent to this sink. Humio uses this as the name of the parser to use to ingest the data.
76    ///
77    /// If unset, Humio defaults it to none.
78    #[configurable(metadata(
79        docs::examples = "json",
80        docs::examples = "none",
81        docs::examples = "{{ event_type }}"
82    ))]
83    event_type: Option<Template>,
84
85    /// Overrides the name of the log field used to retrieve the hostname to send to Humio.
86    ///
87    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
88    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
89    ///
90    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
91    #[serde(default = "config_host_key")]
92    host_key: OptionalValuePath,
93
94    /// Event fields to be added to Humio’s extra fields.
95    ///
96    /// Can be used to tag events by specifying fields starting with `#`.
97    ///
98    /// For more information, see [Humio’s Format of Data][humio_data_format].
99    ///
100    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
101    #[serde(default)]
102    indexed_fields: Vec<ConfigValuePath>,
103
104    /// Optional name of the repository to ingest into.
105    ///
106    /// In public-facing APIs, this must (if present) be equal to the repository used to create the ingest token used for authentication.
107    ///
108    /// In private cluster setups, Humio can be configured to allow these to be different.
109    ///
110    /// For more information, see [Humio’s Format of Data][humio_data_format].
111    ///
112    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
113    #[serde(default)]
114    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
115    index: Option<Template>,
116
117    #[configurable(derived)]
118    #[serde(default)]
119    compression: Compression,
120
121    #[configurable(derived)]
122    #[serde(default)]
123    request: TowerRequestConfig,
124
125    #[configurable(derived)]
126    #[serde(default)]
127    batch: BatchConfig<SplunkHecDefaultBatchSettings>,
128
129    #[configurable(derived)]
130    tls: Option<TlsConfig>,
131
132    #[configurable(derived)]
133    #[serde(
134        default,
135        deserialize_with = "crate::serde::bool_or_struct",
136        skip_serializing_if = "crate::serde::is_default"
137    )]
138    acknowledgements: AcknowledgementsConfig,
139}
140
141fn default_endpoint() -> String {
142    HOST.to_string()
143}
144
145impl GenerateConfig for HumioMetricsConfig {
146    fn generate_config() -> toml::Value {
147        toml::from_str(indoc! {r#"
148                host_key = "hostname"
149                token = "${HUMIO_TOKEN}"
150            "#})
151        .unwrap()
152    }
153}
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "humio_metrics")]
157impl SinkConfig for HumioMetricsConfig {
158    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
159        let transform = self
160            .transform
161            .build_transform(&TransformContext::new_with_globals(cx.globals.clone()));
162
163        let sink = HumioLogsConfig {
164            token: self.token.clone(),
165            endpoint: self.endpoint.clone(),
166            source: self.source.clone(),
167            encoding: JsonSerializerConfig::default().into(),
168            event_type: self.event_type.clone(),
169            host_key: OptionalTargetPath::from(
170                vrl::path::PathPrefix::Event,
171                self.host_key.path.clone(),
172            ),
173            indexed_fields: self.indexed_fields.clone(),
174            index: self.index.clone(),
175            compression: self.compression,
176            request: self.request,
177            batch: self.batch,
178            tls: self.tls.clone(),
179            timestamp_nanos_key: None,
180            acknowledgements: Default::default(),
181            // hard coded as humio expects this format so no sense in making it configurable
182            timestamp_key: OptionalTargetPath::from(
183                vrl::path::PathPrefix::Event,
184                Some(lookup::owned_value_path!("timestamp")),
185            ),
186        };
187
188        let (sink, healthcheck) = sink.clone().build(cx).await?;
189
190        let sink = HumioMetricsSink {
191            inner: sink,
192            transform,
193        };
194
195        Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
196    }
197
198    fn input(&self) -> Input {
199        Input::metric()
200    }
201
202    fn acknowledgements(&self) -> &AcknowledgementsConfig {
203        &self.acknowledgements
204    }
205}
206
207pub struct HumioMetricsSink {
208    inner: VectorSink,
209    transform: MetricToLog,
210}
211
212#[async_trait]
213impl StreamSink<EventArray> for HumioMetricsSink {
214    async fn run(self: Box<Self>, input: BoxStream<'_, EventArray>) -> Result<(), ()> {
215        let mut transform = self.transform;
216        self.inner
217            .run(input.map(move |events| {
218                let mut buf = OutputBuffer::with_capacity(events.len());
219                for event in events.into_events() {
220                    transform.transform(&mut buf, event);
221                }
222                // Awkward but necessary for the `EventArray` type
223                let events = buf.into_events().map(Event::into_log).collect::<Vec<_>>();
224                events.into()
225            }))
226            .await
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use chrono::{offset::TimeZone, Utc};
233    use futures::stream;
234    use indoc::indoc;
235    use similar_asserts::assert_eq;
236    use vector_lib::metric_tags;
237
238    use super::*;
239    use crate::{
240        event::{
241            metric::{MetricKind, MetricValue, StatisticKind},
242            Event, Metric,
243        },
244        sinks::util::test::{build_test_server, load_sink},
245        test_util::{
246            self,
247            components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
248        },
249    };
250
251    #[test]
252    fn generate_config() {
253        crate::test_util::test_generate_config::<HumioMetricsConfig>();
254    }
255
256    #[test]
257    fn test_endpoint_field() {
258        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
259            token = "atoken"
260            batch.max_events = 1
261            endpoint = "https://localhost:9200/"
262        "#})
263        .unwrap();
264
265        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
266        let (config, _) = load_sink::<HumioMetricsConfig>(indoc! {r#"
267            token = "atoken"
268            batch.max_events = 1
269            host = "https://localhost:9200/"
270        "#})
271        .unwrap();
272
273        assert_eq!("https://localhost:9200/".to_string(), config.endpoint);
274    }
275
276    #[tokio::test]
277    async fn smoke_json() {
278        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
279            token = "atoken"
280            batch.max_events = 1
281        "#})
282        .unwrap();
283
284        let addr = test_util::next_addr();
285        // Swap out the endpoint so we can force send it
286        // to our local server
287        config.endpoint = format!("http://{addr}");
288
289        let (sink, _) = config.build(cx).await.unwrap();
290
291        let (rx, _trigger, server) = build_test_server(addr);
292        tokio::spawn(server);
293
294        // Make our test metrics.
295        let metrics = vec![
296            Event::from(
297                Metric::new(
298                    "metric1",
299                    MetricKind::Incremental,
300                    MetricValue::Counter { value: 42.0 },
301                )
302                .with_tags(Some(metric_tags!("os.host" => "somehost")))
303                .with_timestamp(Some(
304                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
305                        .single()
306                        .expect("invalid timestamp"),
307                )),
308            ),
309            Event::from(
310                Metric::new(
311                    "metric2",
312                    MetricKind::Absolute,
313                    MetricValue::Distribution {
314                        samples: vector_lib::samples![1.0 => 100, 2.0 => 200, 3.0 => 300],
315                        statistic: StatisticKind::Histogram,
316                    },
317                )
318                .with_tags(Some(metric_tags!("os.host" => "somehost")))
319                .with_timestamp(Some(
320                    Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 2)
321                        .single()
322                        .expect("invalid timestamp"),
323                )),
324            ),
325        ];
326
327        let len = metrics.len();
328        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
329
330        let output = rx.take(len).collect::<Vec<_>>().await;
331        assert_eq!(
332            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"os.host":"somehost"}},"fields":{},"time":1597784401.0}"#,
333            output[0].1
334        );
335        assert_eq!(
336            r#"{"event":{"distribution":{"samples":[{"rate":100,"value":1.0},{"rate":200,"value":2.0},{"rate":300,"value":3.0}],"statistic":"histogram"},"kind":"absolute","name":"metric2","tags":{"os.host":"somehost"}},"fields":{},"time":1597784402.0}"#,
337            output[1].1
338        );
339    }
340
341    #[tokio::test]
342    async fn multi_value_tags() {
343        let (mut config, cx) = load_sink::<HumioMetricsConfig>(indoc! {r#"
344            token = "atoken"
345            batch.max_events = 1
346            metric_tag_values = "full"
347        "#})
348        .unwrap();
349
350        let addr = test_util::next_addr();
351        // Swap out the endpoint so we can force send it
352        // to our local server
353        config.endpoint = format!("http://{addr}");
354
355        let (sink, _) = config.build(cx).await.unwrap();
356
357        let (rx, _trigger, server) = build_test_server(addr);
358        tokio::spawn(server);
359
360        // Make our test metrics.
361        let metrics = vec![Event::from(
362            Metric::new(
363                "metric1",
364                MetricKind::Incremental,
365                MetricValue::Counter { value: 42.0 },
366            )
367            .with_tags(Some(metric_tags!(
368                "code" => "200",
369                "code" => "success"
370            )))
371            .with_timestamp(Some(
372                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 1)
373                    .single()
374                    .expect("invalid timestamp"),
375            )),
376        )];
377
378        let len = metrics.len();
379        run_and_assert_sink_compliance(sink, stream::iter(metrics), &HTTP_SINK_TAGS).await;
380
381        let output = rx.take(len).collect::<Vec<_>>().await;
382        assert_eq!(
383            r#"{"event":{"counter":{"value":42.0},"kind":"incremental","name":"metric1","tags":{"code":["200","success"]}},"fields":{},"time":1597784401.0}"#,
384            output[0].1
385        );
386    }
387}