vector/sinks/splunk_hec/metrics/
sink.rs1use std::{fmt, sync::Arc};
2
3use serde::Serialize;
4use vector_lib::event::{Metric, MetricValue};
5use vrl::path::OwnedValuePath;
6
7use super::request_builder::HecMetricsRequestBuilder;
8use crate::{
9 internal_events::SplunkInvalidMetricReceivedError,
10 sinks::{
11 prelude::*,
12 splunk_hec::common::{render_template_string, request::HecRequest},
13 util::{encode_namespace, processed_event::ProcessedEvent},
14 },
15};
16
17pub struct HecMetricsSink<S> {
18 pub service: S,
19 pub batch_settings: BatcherSettings,
20 pub request_builder: HecMetricsRequestBuilder,
21 pub sourcetype: Option<Template>,
22 pub source: Option<Template>,
23 pub index: Option<Template>,
24 pub host_key: Option<OwnedValuePath>,
25 pub default_namespace: Option<String>,
26}
27
28impl<S> HecMetricsSink<S>
29where
30 S: Service<HecRequest> + Send + 'static,
31 S::Future: Send + 'static,
32 S::Response: DriverResponse + Send + 'static,
33 S::Error: fmt::Debug + Into<crate::Error> + Send,
34{
35 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36 let sourcetype = self.sourcetype.as_ref();
37 let source = self.source.as_ref();
38 let index = self.index.as_ref();
39 let host_key = self.host_key.as_ref();
40 let default_namespace = self.default_namespace.as_deref();
41 let batch_settings = self.batch_settings;
42
43 input
44 .map(|event| (event.size_of(), event.into_metric()))
45 .filter_map(move |(event_byte_size, metric)| {
46 future::ready(process_metric(
47 metric,
48 event_byte_size,
49 sourcetype,
50 source,
51 index,
52 host_key,
53 default_namespace,
54 ))
55 })
56 .batched_partitioned(EventPartitioner, batch_settings.timeout, |_| {
57 batch_settings.as_byte_size_config()
58 })
59 .request_builder(
60 default_request_builder_concurrency_limit(),
61 self.request_builder,
62 )
63 .filter_map(|request| async move {
64 match request {
65 Err(e) => {
66 error!("Failed to build HEC Metrics request: {:?}.", e);
67 None
68 }
69 Ok(req) => Some(req),
70 }
71 })
72 .into_driver(self.service)
73 .run()
74 .await
75 }
76}
77
78#[async_trait]
79impl<S> StreamSink<Event> for HecMetricsSink<S>
80where
81 S: Service<HecRequest> + Send + 'static,
82 S::Future: Send + 'static,
83 S::Response: DriverResponse + Send + 'static,
84 S::Error: fmt::Debug + Into<crate::Error> + Send,
85{
86 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87 self.run_inner(input).await
88 }
89}
90
91#[derive(Default)]
92struct EventPartitioner;
93
94impl Partitioner for EventPartitioner {
95 type Item = HecProcessedEvent;
96 type Key = Option<Arc<str>>;
97
98 fn partition(&self, item: &Self::Item) -> Self::Key {
99 item.event.metadata().splunk_hec_token()
100 }
101}
102
103#[derive(Serialize)]
104pub struct HecMetricsProcessedEventMetadata {
105 pub event_byte_size: usize,
106 pub sourcetype: Option<String>,
107 pub source: Option<String>,
108 pub index: Option<String>,
109 pub host: Option<String>,
110 pub metric_name: String,
111 pub metric_value: f64,
112 pub templated_field_keys: Vec<String>,
113}
114
115impl ByteSizeOf for HecMetricsProcessedEventMetadata {
116 fn allocated_bytes(&self) -> usize {
117 self.sourcetype.allocated_bytes()
118 + self.source.allocated_bytes()
119 + self.index.allocated_bytes()
120 + self.host.allocated_bytes()
121 + self.metric_name.allocated_bytes()
122 + self.templated_field_keys.allocated_bytes()
123 }
124}
125
126impl HecMetricsProcessedEventMetadata {
127 fn extract_metric_name(metric: &Metric, default_namespace: Option<&str>) -> String {
128 encode_namespace(metric.namespace().or(default_namespace), '.', metric.name())
129 }
130
131 fn extract_metric_value(metric: &Metric) -> Option<f64> {
132 match *metric.value() {
133 MetricValue::Counter { value } => Some(value),
134 MetricValue::Gauge { value } => Some(value),
135 _ => {
136 emit!(SplunkInvalidMetricReceivedError {
137 value: metric.value(),
138 kind: &metric.kind(),
139 error: "Metric kind not supported.".into(),
140 });
141 None
142 }
143 }
144 }
145}
146
147pub type HecProcessedEvent = ProcessedEvent<Metric, HecMetricsProcessedEventMetadata>;
148
149pub fn process_metric(
150 metric: Metric,
151 event_byte_size: usize,
152 sourcetype: Option<&Template>,
153 source: Option<&Template>,
154 index: Option<&Template>,
155 host_key: Option<&OwnedValuePath>,
156 default_namespace: Option<&str>,
157) -> Option<HecProcessedEvent> {
158 let templated_field_keys = [index.as_ref(), source.as_ref(), sourcetype.as_ref()]
159 .iter()
160 .flatten()
161 .filter_map(|t| t.get_fields())
162 .flatten()
163 .map(|f| f.replace("tags.", ""))
164 .collect::<Vec<_>>();
165 let metric_name =
166 HecMetricsProcessedEventMetadata::extract_metric_name(&metric, default_namespace);
167 let metric_value = HecMetricsProcessedEventMetadata::extract_metric_value(&metric)?;
168
169 let sourcetype =
170 sourcetype.and_then(|sourcetype| render_template_string(sourcetype, &metric, "sourcetype"));
171 let source = source.and_then(|source| render_template_string(source, &metric, "source"));
172 let index = index.and_then(|index| render_template_string(index, &metric, "index"));
173 let host = host_key.and_then(|key| metric.tag_value(key.to_string().as_str()));
174
175 let metadata = HecMetricsProcessedEventMetadata {
176 event_byte_size,
177 sourcetype,
178 source,
179 index,
180 host,
181 metric_name,
182 metric_value,
183 templated_field_keys,
184 };
185
186 Some(HecProcessedEvent {
187 event: metric,
188 metadata,
189 })
190}
191
192impl EventCount for HecProcessedEvent {
193 fn event_count(&self) -> usize {
194 1
196 }
197}