vector/sinks/splunk_hec/metrics/
sink.rs1use std::{fmt, sync::Arc};
2
3use serde::Serialize;
4use vector_lib::event::{Metric, MetricValue};
5use vrl::path::OwnedValuePath;
6
7use super::request_builder::HecMetricsRequestBuilder;
8use crate::{
9 internal_events::SplunkInvalidMetricReceivedError,
10 sinks::{
11 prelude::*,
12 splunk_hec::common::{render_template_string, request::HecRequest},
13 util::{encode_namespace, processed_event::ProcessedEvent},
14 },
15};
16
17pub struct HecMetricsSink<S> {
18 pub service: S,
19 pub batch_settings: BatcherSettings,
20 pub request_builder: HecMetricsRequestBuilder,
21 pub sourcetype: Option<Template>,
22 pub source: Option<Template>,
23 pub index: Option<Template>,
24 pub host_key: Option<OwnedValuePath>,
25 pub default_namespace: Option<String>,
26}
27
28impl<S> HecMetricsSink<S>
29where
30 S: Service<HecRequest> + Send + 'static,
31 S::Future: Send + 'static,
32 S::Response: DriverResponse + Send + 'static,
33 S::Error: fmt::Debug + Into<crate::Error> + Send,
34{
35 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36 let sourcetype = self.sourcetype.as_ref();
37 let source = self.source.as_ref();
38 let index = self.index.as_ref();
39 let host_key = self.host_key.as_ref();
40 let default_namespace = self.default_namespace.as_deref();
41 let batch_settings = self.batch_settings;
42
43 input
44 .map(|event| (event.size_of(), event.into_metric()))
45 .filter_map(move |(event_byte_size, metric)| {
46 future::ready(process_metric(
47 metric,
48 event_byte_size,
49 sourcetype,
50 source,
51 index,
52 host_key,
53 default_namespace,
54 ))
55 })
56 .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
57 .request_builder(
58 default_request_builder_concurrency_limit(),
59 self.request_builder,
60 )
61 .filter_map(|request| async move {
62 match request {
63 Err(e) => {
64 error!("Failed to build HEC Metrics request: {:?}.", e);
65 None
66 }
67 Ok(req) => Some(req),
68 }
69 })
70 .into_driver(self.service)
71 .run()
72 .await
73 }
74}
75
76#[async_trait]
77impl<S> StreamSink<Event> for HecMetricsSink<S>
78where
79 S: Service<HecRequest> + Send + 'static,
80 S::Future: Send + 'static,
81 S::Response: DriverResponse + Send + 'static,
82 S::Error: fmt::Debug + Into<crate::Error> + Send,
83{
84 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
85 self.run_inner(input).await
86 }
87}
88
89#[derive(Default)]
90struct EventPartitioner;
91
92impl Partitioner for EventPartitioner {
93 type Item = HecProcessedEvent;
94 type Key = Option<Arc<str>>;
95
96 fn partition(&self, item: &Self::Item) -> Self::Key {
97 item.event.metadata().splunk_hec_token()
98 }
99}
100
101#[derive(Serialize)]
102pub struct HecMetricsProcessedEventMetadata {
103 pub event_byte_size: usize,
104 pub sourcetype: Option<String>,
105 pub source: Option<String>,
106 pub index: Option<String>,
107 pub host: Option<String>,
108 pub metric_name: String,
109 pub metric_value: f64,
110 pub templated_field_keys: Vec<String>,
111}
112
113impl ByteSizeOf for HecMetricsProcessedEventMetadata {
114 fn allocated_bytes(&self) -> usize {
115 self.sourcetype.allocated_bytes()
116 + self.source.allocated_bytes()
117 + self.index.allocated_bytes()
118 + self.host.allocated_bytes()
119 + self.metric_name.allocated_bytes()
120 + self.templated_field_keys.allocated_bytes()
121 }
122}
123
124impl HecMetricsProcessedEventMetadata {
125 fn extract_metric_name(metric: &Metric, default_namespace: Option<&str>) -> String {
126 encode_namespace(metric.namespace().or(default_namespace), '.', metric.name())
127 }
128
129 fn extract_metric_value(metric: &Metric) -> Option<f64> {
130 match *metric.value() {
131 MetricValue::Counter { value } => Some(value),
132 MetricValue::Gauge { value } => Some(value),
133 _ => {
134 emit!(SplunkInvalidMetricReceivedError {
135 value: metric.value(),
136 kind: &metric.kind(),
137 error: "Metric kind not supported.".into(),
138 });
139 None
140 }
141 }
142 }
143}
144
145pub type HecProcessedEvent = ProcessedEvent<Metric, HecMetricsProcessedEventMetadata>;
146
147pub fn process_metric(
148 metric: Metric,
149 event_byte_size: usize,
150 sourcetype: Option<&Template>,
151 source: Option<&Template>,
152 index: Option<&Template>,
153 host_key: Option<&OwnedValuePath>,
154 default_namespace: Option<&str>,
155) -> Option<HecProcessedEvent> {
156 let templated_field_keys = [index.as_ref(), source.as_ref(), sourcetype.as_ref()]
157 .iter()
158 .flatten()
159 .filter_map(|t| t.get_fields())
160 .flatten()
161 .map(|f| f.replace("tags.", ""))
162 .collect::<Vec<_>>();
163 let metric_name =
164 HecMetricsProcessedEventMetadata::extract_metric_name(&metric, default_namespace);
165 let metric_value = HecMetricsProcessedEventMetadata::extract_metric_value(&metric)?;
166
167 let sourcetype =
168 sourcetype.and_then(|sourcetype| render_template_string(sourcetype, &metric, "sourcetype"));
169 let source = source.and_then(|source| render_template_string(source, &metric, "source"));
170 let index = index.and_then(|index| render_template_string(index, &metric, "index"));
171 let host = host_key.and_then(|key| metric.tag_value(key.to_string().as_str()));
172
173 let metadata = HecMetricsProcessedEventMetadata {
174 event_byte_size,
175 sourcetype,
176 source,
177 index,
178 host,
179 metric_name,
180 metric_value,
181 templated_field_keys,
182 };
183
184 Some(HecProcessedEvent {
185 event: metric,
186 metadata,
187 })
188}
189
190impl EventCount for HecProcessedEvent {
191 fn event_count(&self) -> usize {
192 1
194 }
195}