vector/sinks/splunk_hec/metrics/
sink.rs

1use std::{fmt, sync::Arc};
2
3use serde::Serialize;
4use vector_lib::event::{Metric, MetricValue};
5use vrl::path::OwnedValuePath;
6
7use super::request_builder::HecMetricsRequestBuilder;
8use crate::{
9    internal_events::SplunkInvalidMetricReceivedError,
10    sinks::{
11        prelude::*,
12        splunk_hec::common::{render_template_string, request::HecRequest},
13        util::{encode_namespace, processed_event::ProcessedEvent},
14    },
15};
16
17pub struct HecMetricsSink<S> {
18    pub service: S,
19    pub batch_settings: BatcherSettings,
20    pub request_builder: HecMetricsRequestBuilder,
21    pub sourcetype: Option<Template>,
22    pub source: Option<Template>,
23    pub index: Option<Template>,
24    pub host_key: Option<OwnedValuePath>,
25    pub default_namespace: Option<String>,
26}
27
28impl<S> HecMetricsSink<S>
29where
30    S: Service<HecRequest> + Send + 'static,
31    S::Future: Send + 'static,
32    S::Response: DriverResponse + Send + 'static,
33    S::Error: fmt::Debug + Into<crate::Error> + Send,
34{
35    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36        let sourcetype = self.sourcetype.as_ref();
37        let source = self.source.as_ref();
38        let index = self.index.as_ref();
39        let host_key = self.host_key.as_ref();
40        let default_namespace = self.default_namespace.as_deref();
41        let batch_settings = self.batch_settings;
42
43        input
44            .map(|event| (event.size_of(), event.into_metric()))
45            .filter_map(move |(event_byte_size, metric)| {
46                future::ready(process_metric(
47                    metric,
48                    event_byte_size,
49                    sourcetype,
50                    source,
51                    index,
52                    host_key,
53                    default_namespace,
54                ))
55            })
56            .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
57            .request_builder(
58                default_request_builder_concurrency_limit(),
59                self.request_builder,
60            )
61            .filter_map(|request| async move {
62                match request {
63                    Err(e) => {
64                        error!("Failed to build HEC Metrics request: {:?}.", e);
65                        None
66                    }
67                    Ok(req) => Some(req),
68                }
69            })
70            .into_driver(self.service)
71            .run()
72            .await
73    }
74}
75
76#[async_trait]
77impl<S> StreamSink<Event> for HecMetricsSink<S>
78where
79    S: Service<HecRequest> + Send + 'static,
80    S::Future: Send + 'static,
81    S::Response: DriverResponse + Send + 'static,
82    S::Error: fmt::Debug + Into<crate::Error> + Send,
83{
84    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
85        self.run_inner(input).await
86    }
87}
88
89#[derive(Default)]
90struct EventPartitioner;
91
92impl Partitioner for EventPartitioner {
93    type Item = HecProcessedEvent;
94    type Key = Option<Arc<str>>;
95
96    fn partition(&self, item: &Self::Item) -> Self::Key {
97        item.event.metadata().splunk_hec_token()
98    }
99}
100
101#[derive(Serialize)]
102pub struct HecMetricsProcessedEventMetadata {
103    pub event_byte_size: usize,
104    pub sourcetype: Option<String>,
105    pub source: Option<String>,
106    pub index: Option<String>,
107    pub host: Option<String>,
108    pub metric_name: String,
109    pub metric_value: f64,
110    pub templated_field_keys: Vec<String>,
111}
112
113impl ByteSizeOf for HecMetricsProcessedEventMetadata {
114    fn allocated_bytes(&self) -> usize {
115        self.sourcetype.allocated_bytes()
116            + self.source.allocated_bytes()
117            + self.index.allocated_bytes()
118            + self.host.allocated_bytes()
119            + self.metric_name.allocated_bytes()
120            + self.templated_field_keys.allocated_bytes()
121    }
122}
123
124impl HecMetricsProcessedEventMetadata {
125    fn extract_metric_name(metric: &Metric, default_namespace: Option<&str>) -> String {
126        encode_namespace(metric.namespace().or(default_namespace), '.', metric.name())
127    }
128
129    fn extract_metric_value(metric: &Metric) -> Option<f64> {
130        match *metric.value() {
131            MetricValue::Counter { value } => Some(value),
132            MetricValue::Gauge { value } => Some(value),
133            _ => {
134                emit!(SplunkInvalidMetricReceivedError {
135                    value: metric.value(),
136                    kind: &metric.kind(),
137                    error: "Metric kind not supported.".into(),
138                });
139                None
140            }
141        }
142    }
143}
144
145pub type HecProcessedEvent = ProcessedEvent<Metric, HecMetricsProcessedEventMetadata>;
146
147pub fn process_metric(
148    metric: Metric,
149    event_byte_size: usize,
150    sourcetype: Option<&Template>,
151    source: Option<&Template>,
152    index: Option<&Template>,
153    host_key: Option<&OwnedValuePath>,
154    default_namespace: Option<&str>,
155) -> Option<HecProcessedEvent> {
156    let templated_field_keys = [index.as_ref(), source.as_ref(), sourcetype.as_ref()]
157        .iter()
158        .flatten()
159        .filter_map(|t| t.get_fields())
160        .flatten()
161        .map(|f| f.replace("tags.", ""))
162        .collect::<Vec<_>>();
163    let metric_name =
164        HecMetricsProcessedEventMetadata::extract_metric_name(&metric, default_namespace);
165    let metric_value = HecMetricsProcessedEventMetadata::extract_metric_value(&metric)?;
166
167    let sourcetype =
168        sourcetype.and_then(|sourcetype| render_template_string(sourcetype, &metric, "sourcetype"));
169    let source = source.and_then(|source| render_template_string(source, &metric, "source"));
170    let index = index.and_then(|index| render_template_string(index, &metric, "index"));
171    let host = host_key.and_then(|key| metric.tag_value(key.to_string().as_str()));
172
173    let metadata = HecMetricsProcessedEventMetadata {
174        event_byte_size,
175        sourcetype,
176        source,
177        index,
178        host,
179        metric_name,
180        metric_value,
181        templated_field_keys,
182    };
183
184    Some(HecProcessedEvent {
185        event: metric,
186        metadata,
187    })
188}
189
190impl EventCount for HecProcessedEvent {
191    fn event_count(&self) -> usize {
192        // A HecProcessedEvent is mapped one-to-one with an event.
193        1
194    }
195}