use std::{fmt, sync::Arc};
use serde::Serialize;
use vector_lib::event::{Metric, MetricValue};
use vrl::path::OwnedValuePath;
use super::request_builder::HecMetricsRequestBuilder;
use crate::{
internal_events::SplunkInvalidMetricReceivedError,
sinks::{
prelude::*,
splunk_hec::common::{render_template_string, request::HecRequest},
util::{encode_namespace, processed_event::ProcessedEvent},
},
};
pub struct HecMetricsSink<S> {
pub service: S,
pub batch_settings: BatcherSettings,
pub request_builder: HecMetricsRequestBuilder,
pub sourcetype: Option<Template>,
pub source: Option<Template>,
pub index: Option<Template>,
pub host_key: Option<OwnedValuePath>,
pub default_namespace: Option<String>,
}
impl<S> HecMetricsSink<S>
where
S: Service<HecRequest> + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse + Send + 'static,
S::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let sourcetype = self.sourcetype.as_ref();
let source = self.source.as_ref();
let index = self.index.as_ref();
let host_key = self.host_key.as_ref();
let default_namespace = self.default_namespace.as_deref();
let batch_settings = self.batch_settings;
input
.map(|event| (event.size_of(), event.into_metric()))
.filter_map(move |(event_byte_size, metric)| {
future::ready(process_metric(
metric,
event_byte_size,
sourcetype,
source,
index,
host_key,
default_namespace,
))
})
.batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
.request_builder(
default_request_builder_concurrency_limit(),
self.request_builder,
)
.filter_map(|request| async move {
match request {
Err(e) => {
error!("Failed to build HEC Metrics request: {:?}.", e);
None
}
Ok(req) => Some(req),
}
})
.into_driver(self.service)
.run()
.await
}
}
#[async_trait]
impl<S> StreamSink<Event> for HecMetricsSink<S>
where
S: Service<HecRequest> + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse + Send + 'static,
S::Error: fmt::Debug + Into<crate::Error> + Send,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}
#[derive(Default)]
struct EventPartitioner;
impl Partitioner for EventPartitioner {
type Item = HecProcessedEvent;
type Key = Option<Arc<str>>;
fn partition(&self, item: &Self::Item) -> Self::Key {
item.event.metadata().splunk_hec_token()
}
}
#[derive(Serialize)]
pub struct HecMetricsProcessedEventMetadata {
pub event_byte_size: usize,
pub sourcetype: Option<String>,
pub source: Option<String>,
pub index: Option<String>,
pub host: Option<String>,
pub metric_name: String,
pub metric_value: f64,
pub templated_field_keys: Vec<String>,
}
impl ByteSizeOf for HecMetricsProcessedEventMetadata {
fn allocated_bytes(&self) -> usize {
self.sourcetype.allocated_bytes()
+ self.source.allocated_bytes()
+ self.index.allocated_bytes()
+ self.host.allocated_bytes()
+ self.metric_name.allocated_bytes()
+ self.templated_field_keys.allocated_bytes()
}
}
impl HecMetricsProcessedEventMetadata {
fn extract_metric_name(metric: &Metric, default_namespace: Option<&str>) -> String {
encode_namespace(metric.namespace().or(default_namespace), '.', metric.name())
}
fn extract_metric_value(metric: &Metric) -> Option<f64> {
match *metric.value() {
MetricValue::Counter { value } => Some(value),
MetricValue::Gauge { value } => Some(value),
_ => {
emit!(SplunkInvalidMetricReceivedError {
value: metric.value(),
kind: &metric.kind(),
error: "Metric kind not supported.".into(),
});
None
}
}
}
}
pub type HecProcessedEvent = ProcessedEvent<Metric, HecMetricsProcessedEventMetadata>;
pub fn process_metric(
metric: Metric,
event_byte_size: usize,
sourcetype: Option<&Template>,
source: Option<&Template>,
index: Option<&Template>,
host_key: Option<&OwnedValuePath>,
default_namespace: Option<&str>,
) -> Option<HecProcessedEvent> {
let templated_field_keys = [index.as_ref(), source.as_ref(), sourcetype.as_ref()]
.iter()
.flatten()
.filter_map(|t| t.get_fields())
.flatten()
.map(|f| f.replace("tags.", ""))
.collect::<Vec<_>>();
let metric_name =
HecMetricsProcessedEventMetadata::extract_metric_name(&metric, default_namespace);
let metric_value = HecMetricsProcessedEventMetadata::extract_metric_value(&metric)?;
let sourcetype =
sourcetype.and_then(|sourcetype| render_template_string(sourcetype, &metric, "sourcetype"));
let source = source.and_then(|source| render_template_string(source, &metric, "source"));
let index = index.and_then(|index| render_template_string(index, &metric, "index"));
let host = host_key.and_then(|key| metric.tag_value(key.to_string().as_str()));
let metadata = HecMetricsProcessedEventMetadata {
event_byte_size,
sourcetype,
source,
index,
host,
metric_name,
metric_value,
templated_field_keys,
};
Some(HecProcessedEvent {
event: metric,
metadata,
})
}
impl EventCount for HecProcessedEvent {
fn event_count(&self) -> usize {
1
}
}