vector/sinks/splunk_hec/metrics/
sink.rs

1use std::{fmt, sync::Arc};
2
3use serde::Serialize;
4use vector_lib::event::{Metric, MetricValue};
5use vrl::path::OwnedValuePath;
6
7use super::request_builder::HecMetricsRequestBuilder;
8use crate::{
9    internal_events::SplunkInvalidMetricReceivedError,
10    sinks::{
11        prelude::*,
12        splunk_hec::common::{render_template_string, request::HecRequest},
13        util::{encode_namespace, processed_event::ProcessedEvent},
14    },
15};
16
17pub struct HecMetricsSink<S> {
18    pub service: S,
19    pub batch_settings: BatcherSettings,
20    pub request_builder: HecMetricsRequestBuilder,
21    pub sourcetype: Option<Template>,
22    pub source: Option<Template>,
23    pub index: Option<Template>,
24    pub host_key: Option<OwnedValuePath>,
25    pub default_namespace: Option<String>,
26}
27
28impl<S> HecMetricsSink<S>
29where
30    S: Service<HecRequest> + Send + 'static,
31    S::Future: Send + 'static,
32    S::Response: DriverResponse + Send + 'static,
33    S::Error: fmt::Debug + Into<crate::Error> + Send,
34{
35    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36        let sourcetype = self.sourcetype.as_ref();
37        let source = self.source.as_ref();
38        let index = self.index.as_ref();
39        let host_key = self.host_key.as_ref();
40        let default_namespace = self.default_namespace.as_deref();
41        let batch_settings = self.batch_settings;
42
43        input
44            .map(|event| (event.size_of(), event.into_metric()))
45            .filter_map(move |(event_byte_size, metric)| {
46                future::ready(process_metric(
47                    metric,
48                    event_byte_size,
49                    sourcetype,
50                    source,
51                    index,
52                    host_key,
53                    default_namespace,
54                ))
55            })
56            .batched_partitioned(EventPartitioner, batch_settings.timeout, |_| {
57                batch_settings.as_byte_size_config()
58            })
59            .request_builder(
60                default_request_builder_concurrency_limit(),
61                self.request_builder,
62            )
63            .filter_map(|request| async move {
64                match request {
65                    Err(e) => {
66                        error!("Failed to build HEC Metrics request: {:?}.", e);
67                        None
68                    }
69                    Ok(req) => Some(req),
70                }
71            })
72            .into_driver(self.service)
73            .run()
74            .await
75    }
76}
77
78#[async_trait]
79impl<S> StreamSink<Event> for HecMetricsSink<S>
80where
81    S: Service<HecRequest> + Send + 'static,
82    S::Future: Send + 'static,
83    S::Response: DriverResponse + Send + 'static,
84    S::Error: fmt::Debug + Into<crate::Error> + Send,
85{
86    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87        self.run_inner(input).await
88    }
89}
90
91#[derive(Default)]
92struct EventPartitioner;
93
94impl Partitioner for EventPartitioner {
95    type Item = HecProcessedEvent;
96    type Key = Option<Arc<str>>;
97
98    fn partition(&self, item: &Self::Item) -> Self::Key {
99        item.event.metadata().splunk_hec_token()
100    }
101}
102
103#[derive(Serialize)]
104pub struct HecMetricsProcessedEventMetadata {
105    pub event_byte_size: usize,
106    pub sourcetype: Option<String>,
107    pub source: Option<String>,
108    pub index: Option<String>,
109    pub host: Option<String>,
110    pub metric_name: String,
111    pub metric_value: f64,
112    pub templated_field_keys: Vec<String>,
113}
114
115impl ByteSizeOf for HecMetricsProcessedEventMetadata {
116    fn allocated_bytes(&self) -> usize {
117        self.sourcetype.allocated_bytes()
118            + self.source.allocated_bytes()
119            + self.index.allocated_bytes()
120            + self.host.allocated_bytes()
121            + self.metric_name.allocated_bytes()
122            + self.templated_field_keys.allocated_bytes()
123    }
124}
125
126impl HecMetricsProcessedEventMetadata {
127    fn extract_metric_name(metric: &Metric, default_namespace: Option<&str>) -> String {
128        encode_namespace(metric.namespace().or(default_namespace), '.', metric.name())
129    }
130
131    fn extract_metric_value(metric: &Metric) -> Option<f64> {
132        match *metric.value() {
133            MetricValue::Counter { value } => Some(value),
134            MetricValue::Gauge { value } => Some(value),
135            _ => {
136                emit!(SplunkInvalidMetricReceivedError {
137                    value: metric.value(),
138                    kind: &metric.kind(),
139                    error: "Metric kind not supported.".into(),
140                });
141                None
142            }
143        }
144    }
145}
146
147pub type HecProcessedEvent = ProcessedEvent<Metric, HecMetricsProcessedEventMetadata>;
148
149pub fn process_metric(
150    metric: Metric,
151    event_byte_size: usize,
152    sourcetype: Option<&Template>,
153    source: Option<&Template>,
154    index: Option<&Template>,
155    host_key: Option<&OwnedValuePath>,
156    default_namespace: Option<&str>,
157) -> Option<HecProcessedEvent> {
158    let templated_field_keys = [index.as_ref(), source.as_ref(), sourcetype.as_ref()]
159        .iter()
160        .flatten()
161        .filter_map(|t| t.get_fields())
162        .flatten()
163        .map(|f| f.replace("tags.", ""))
164        .collect::<Vec<_>>();
165    let metric_name =
166        HecMetricsProcessedEventMetadata::extract_metric_name(&metric, default_namespace);
167    let metric_value = HecMetricsProcessedEventMetadata::extract_metric_value(&metric)?;
168
169    let sourcetype =
170        sourcetype.and_then(|sourcetype| render_template_string(sourcetype, &metric, "sourcetype"));
171    let source = source.and_then(|source| render_template_string(source, &metric, "source"));
172    let index = index.and_then(|index| render_template_string(index, &metric, "index"));
173    let host = host_key.and_then(|key| metric.tag_value(key.to_string().as_str()));
174
175    let metadata = HecMetricsProcessedEventMetadata {
176        event_byte_size,
177        sourcetype,
178        source,
179        index,
180        host,
181        metric_name,
182        metric_value,
183        templated_field_keys,
184    };
185
186    Some(HecProcessedEvent {
187        event: metric,
188        metadata,
189    })
190}
191
192impl EventCount for HecProcessedEvent {
193    fn event_count(&self) -> usize {
194        // A HecProcessedEvent is mapped one-to-one with an event.
195        1
196    }
197}