use std::collections::BTreeMap;
use std::num::NonZeroU32;
use std::time::Duration;
use chrono::Utc;
use futures::StreamExt;
use serde_with::serde_as;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
};
use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
use crate::{
config::{SourceConfig, SourceContext, SourceOutput},
event::{
metric::{MetricData, MetricName, MetricSeries, MetricTime, MetricValue},
EventMetadata, Metric, MetricKind,
},
internal_events::{EventsReceived, StreamClosedError},
shutdown::ShutdownSignal,
SourceSender,
};
#[serde_as]
#[configurable_component(source(
"static_metrics",
"Produce static metrics defined in configuration."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct StaticMetricsConfig {
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
#[serde(default = "default_interval")]
#[configurable(metadata(docs::human_name = "Emitting interval"))]
pub interval_secs: Duration,
#[serde(default = "default_namespace")]
pub namespace: String,
#[configurable(derived)]
#[serde(default)]
pub metrics: Vec<StaticMetricConfig>,
}
impl Default for StaticMetricsConfig {
fn default() -> Self {
Self {
interval_secs: default_interval(),
metrics: Vec::default(),
namespace: default_namespace(),
}
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct StaticMetricConfig {
pub name: String,
pub value: MetricValue,
pub kind: MetricKind,
#[configurable(metadata(
docs::additional_props_description = "An individual tag - value pair."
))]
pub tags: BTreeMap<String, String>,
}
fn default_interval() -> Duration {
Duration::from_secs_f64(1.0)
}
fn default_namespace() -> String {
"static".to_owned()
}
impl_generate_config_from_default!(StaticMetricsConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "static_metrics")]
impl SourceConfig for StaticMetricsConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
if self.interval_secs.is_zero() {
warn!(
"Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
);
}
let interval = self.interval_secs;
let namespace = self.namespace.clone();
let metrics = self.metrics.clone();
Ok(Box::pin(
StaticMetrics {
namespace,
metrics,
interval,
out: cx.out,
shutdown: cx.shutdown,
}
.run(),
))
}
fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_metrics()]
}
fn can_acknowledge(&self) -> bool {
false
}
}
struct StaticMetrics {
namespace: String,
metrics: Vec<StaticMetricConfig>,
interval: time::Duration,
out: SourceSender,
shutdown: ShutdownSignal,
}
impl StaticMetrics {
async fn run(mut self) -> Result<(), ()> {
let events_received = register!(EventsReceived);
let bytes_received = register!(BytesReceived::from(Protocol::STATIC));
let mut interval =
IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
let metrics: Vec<Metric> = self
.metrics
.into_iter()
.map(
|StaticMetricConfig {
name,
value,
kind,
tags,
}| {
Metric::from_parts(
MetricSeries {
name: MetricName {
name,
namespace: Some(self.namespace.clone()),
},
tags: Some(tags.into()),
},
MetricData {
time: MetricTime {
timestamp: None,
interval_ms: NonZeroU32::new(self.interval.as_millis() as u32),
},
kind,
value: value.clone(),
},
EventMetadata::default(),
)
},
)
.collect();
while interval.next().await.is_some() {
let count = metrics.len();
let byte_size = metrics.size_of();
let json_size = metrics.estimated_json_encoded_size_of();
bytes_received.emit(ByteSize(byte_size));
events_received.emit(CountByteSize(count, json_size));
let batch = metrics
.clone()
.into_iter()
.map(|metric| metric.with_timestamp(Some(Utc::now())));
if (self.out.send_batch(batch).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
event::Event,
test_util::{
self,
components::{run_and_assert_source_compliance, SOURCE_TAGS},
},
};
#[test]
fn generate_config() {
test_util::test_generate_config::<StaticMetricsConfig>();
}
async fn events_from_config(config: StaticMetricsConfig) -> Vec<Event> {
run_and_assert_source_compliance(config, time::Duration::from_millis(100), &SOURCE_TAGS)
.await
}
fn default_metric() -> StaticMetricConfig {
StaticMetricConfig {
name: "".to_string(),
value: MetricValue::Gauge { value: 0.0 },
kind: MetricKind::Absolute,
tags: BTreeMap::default(),
}
}
#[tokio::test]
async fn default_empty() {
let events = events_from_config(StaticMetricsConfig::default()).await;
assert!(events.is_empty());
}
#[tokio::test]
async fn default_namespace() {
let mut events = events_from_config(StaticMetricsConfig {
metrics: vec![default_metric()],
..Default::default()
})
.await;
assert!(!events.is_empty());
let event = events.remove(0);
assert_eq!(event.as_metric().namespace(), Some("static"));
}
#[tokio::test]
async fn default_namespace_multiple_events() {
let mut events = events_from_config(StaticMetricsConfig {
metrics: vec![default_metric(), default_metric()],
..Default::default()
})
.await;
assert!(!events.is_empty());
let event = events.remove(0);
assert_eq!(event.as_metric().namespace(), Some("static"));
let event = events.remove(0);
assert_eq!(event.as_metric().namespace(), Some("static"));
}
#[tokio::test]
async fn namespace() {
let namespace = "totally_custom";
let config = StaticMetricsConfig {
namespace: namespace.to_owned(),
metrics: vec![default_metric()],
..StaticMetricsConfig::default()
};
let mut events = events_from_config(config).await;
assert!(!events.is_empty());
let event = events.remove(0);
assert_eq!(event.as_metric().namespace(), Some(namespace));
}
#[tokio::test]
async fn sets_custom_tags() {
let mut events = events_from_config(StaticMetricsConfig {
metrics: vec![StaticMetricConfig {
name: "test".to_string(),
value: MetricValue::Gauge { value: 2.3 },
kind: MetricKind::Absolute,
tags: BTreeMap::from([("custom_tag".to_string(), "custom_tag_value".to_string())]),
}],
..Default::default()
})
.await;
assert!(!events.is_empty());
let event = events.remove(0);
let metric = event.as_metric();
assert_eq!(metric.name(), "test");
assert!(matches!(metric.value(), MetricValue::Gauge { value: 2.3 }));
assert_eq!(
metric.tag_value("custom_tag"),
Some("custom_tag_value".to_string())
);
}
}