use std::collections::{BTreeMap, HashSet};
use async_stream::stream;
use tokio::time::Duration;
use tokio_stream::{Stream, StreamExt};
use super::{
filter_output_metric, OutputThroughput, ReceivedBytesTotal, ReceivedEventsTotal,
SentBytesTotal, SentEventsTotal,
};
use crate::{
config::ComponentKey,
event::{Metric, MetricValue},
metrics::Controller,
};
fn get_controller() -> &'static Controller {
Controller::get().expect("Metrics system not initialized. Please report.")
}
pub fn sum_metrics<'a, I: IntoIterator<Item = &'a Metric>>(metrics: I) -> Option<Metric> {
let mut iter = metrics.into_iter();
let m = iter.next()?;
Some(iter.fold(
m.clone(),
|mut m1, m2| {
if m1.update(m2) {
m1
} else {
m2.clone()
}
},
))
}
fn sum_metrics_owned<I: IntoIterator<Item = Metric>>(metrics: I) -> Option<Metric> {
let mut iter = metrics.into_iter();
let m = iter.next()?;
Some(iter.fold(m, |mut m1, m2| if m1.update(&m2) { m1 } else { m2 }))
}
pub trait MetricsFilter<'a> {
fn received_bytes_total(&self) -> Option<ReceivedBytesTotal>;
fn received_events_total(&self) -> Option<ReceivedEventsTotal>;
fn sent_bytes_total(&self) -> Option<SentBytesTotal>;
fn sent_events_total(&self) -> Option<SentEventsTotal>;
}
impl<'a> MetricsFilter<'a> for Vec<Metric> {
fn received_bytes_total(&self) -> Option<ReceivedBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_received_bytes_total"),
)?;
Some(ReceivedBytesTotal::new(sum))
}
fn received_events_total(&self) -> Option<ReceivedEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_received_events_total"),
)?;
Some(ReceivedEventsTotal::new(sum))
}
fn sent_bytes_total(&self) -> Option<SentBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_sent_bytes_total"),
)?;
Some(SentBytesTotal::new(sum))
}
fn sent_events_total(&self) -> Option<SentEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_sent_events_total"),
)?;
Some(SentEventsTotal::new(sum))
}
}
impl<'a> MetricsFilter<'a> for Vec<&'a Metric> {
fn received_bytes_total(&self) -> Option<ReceivedBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_received_bytes_total")
.copied(),
)?;
Some(ReceivedBytesTotal::new(sum))
}
fn received_events_total(&self) -> Option<ReceivedEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_received_events_total")
.copied(),
)?;
Some(ReceivedEventsTotal::new(sum))
}
fn sent_bytes_total(&self) -> Option<SentBytesTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_sent_bytes_total")
.copied(),
)?;
Some(SentBytesTotal::new(sum))
}
fn sent_events_total(&self) -> Option<SentEventsTotal> {
let sum = sum_metrics(
self.iter()
.filter(|m| m.name() == "component_sent_events_total")
.copied(),
)?;
Some(SentEventsTotal::new(sum))
}
}
pub fn get_metrics(interval: i32) -> impl Stream<Item = Metric> {
let controller = get_controller();
let mut interval = tokio::time::interval(Duration::from_millis(interval as u64));
stream! {
loop {
interval.tick().await;
for m in controller.capture_metrics() {
yield m;
}
}
}
}
pub fn get_all_metrics(interval: i32) -> impl Stream<Item = Vec<Metric>> {
let controller = get_controller();
let mut interval = tokio::time::interval(Duration::from_millis(interval as u64));
stream! {
loop {
interval.tick().await;
yield controller.capture_metrics()
}
}
}
pub fn by_component_key(component_key: &ComponentKey) -> Vec<Metric> {
get_controller()
.capture_metrics()
.into_iter()
.filter_map(|m| {
m.tag_matches("component_id", component_key.id())
.then_some(m)
})
.collect()
}
type MetricFilterFn = dyn Fn(&Metric) -> bool + Send + Sync;
pub fn component_counter_metrics(
interval: i32,
filter_fn: &'static MetricFilterFn,
) -> impl Stream<Item = Vec<Metric>> {
let mut cache = BTreeMap::new();
component_to_filtered_metrics(interval, filter_fn).map(move |map| {
map.into_iter()
.filter_map(|(id, metrics)| {
let m = sum_metrics_owned(metrics)?;
match m.value() {
MetricValue::Counter { value }
if cache.insert(id, *value).unwrap_or(0.00) < *value =>
{
Some(m)
}
_ => None,
}
})
.collect()
})
}
pub fn component_gauge_metrics(
interval: i32,
filter_fn: &'static MetricFilterFn,
) -> impl Stream<Item = Vec<Metric>> {
let mut cache = BTreeMap::new();
component_to_filtered_metrics(interval, filter_fn).map(move |map| {
map.into_iter()
.filter_map(|(id, metrics)| {
let m = sum_metrics_owned(metrics)?;
match m.value() {
MetricValue::Gauge { value }
if cache.insert(id, *value).unwrap_or(0.00) < *value =>
{
Some(m)
}
_ => None,
}
})
.collect()
})
}
pub fn counter_throughput(
interval: i32,
filter_fn: &'static MetricFilterFn,
) -> impl Stream<Item = (Metric, f64)> {
let mut last = 0.00;
get_metrics(interval)
.filter(filter_fn)
.filter_map(move |m| match m.value() {
MetricValue::Counter { value } if *value > last => {
let throughput = value - last;
last = *value;
Some((m, throughput))
}
_ => None,
})
.skip(1)
}
pub fn component_counter_throughputs(
interval: i32,
filter_fn: &'static MetricFilterFn,
) -> impl Stream<Item = Vec<(Metric, f64)>> {
let mut cache = BTreeMap::new();
component_to_filtered_metrics(interval, filter_fn)
.map(move |map| {
map.into_iter()
.filter_map(|(id, metrics)| {
let m = sum_metrics_owned(metrics)?;
match m.value() {
MetricValue::Counter { value } => {
let last = cache.insert(id, *value).unwrap_or(0.00);
let throughput = value - last;
Some((m, throughput))
}
_ => None,
}
})
.collect()
})
.skip(1)
}
pub fn component_sent_events_totals_metrics_with_outputs(
interval: i32,
) -> impl Stream<Item = Vec<(Metric, Vec<Metric>)>> {
let mut cache = BTreeMap::new();
component_to_filtered_metrics(interval, &|m| m.name() == "component_sent_events_total").map(
move |map| {
map.into_iter()
.filter_map(|(id, metrics)| {
let outputs = metrics
.iter()
.filter_map(|m| m.tag_value("output"))
.collect::<HashSet<_>>();
let metric_by_outputs = outputs
.iter()
.filter_map(|output| {
let m = filter_output_metric(metrics.as_ref(), output.as_ref())?;
match m.value() {
MetricValue::Counter { value }
if cache
.insert(format!("{}.{}", id, output), *value)
.unwrap_or(0.00)
< *value =>
{
Some(m)
}
_ => None,
}
})
.collect();
let sum = sum_metrics_owned(metrics)?;
match sum.value() {
MetricValue::Counter { value }
if cache.insert(id, *value).unwrap_or(0.00) < *value =>
{
Some((sum, metric_by_outputs))
}
_ => None,
}
})
.collect()
},
)
}
pub fn component_sent_events_total_throughputs_with_outputs(
interval: i32,
) -> impl Stream<Item = Vec<(ComponentKey, i64, Vec<OutputThroughput>)>> {
let mut cache = BTreeMap::new();
component_to_filtered_metrics(interval, &|m| m.name() == "component_sent_events_total")
.map(move |map| {
map.into_iter()
.filter_map(|(id, metrics)| {
let outputs = metrics
.iter()
.filter_map(|m| m.tag_value("output"))
.collect::<HashSet<_>>();
let throughput_by_outputs = outputs
.iter()
.filter_map(|output| {
let m = filter_output_metric(metrics.as_ref(), output.as_ref())?;
let throughput =
throughput(&m, format!("{}.{}", id, output), &mut cache)?;
Some(OutputThroughput::new(output.clone(), throughput as i64))
})
.collect::<Vec<_>>();
let sum = sum_metrics_owned(metrics)?;
let total_throughput = throughput(&sum, id.clone(), &mut cache)?;
Some((
ComponentKey::from(id),
total_throughput as i64,
throughput_by_outputs,
))
})
.collect()
})
.skip(1)
}
fn component_to_filtered_metrics(
interval: i32,
filter_fn: &'static MetricFilterFn,
) -> impl Stream<Item = BTreeMap<String, Vec<Metric>>> {
get_all_metrics(interval).map(move |m| {
m.into_iter()
.filter(filter_fn)
.filter_map(|m| m.tag_value("component_id").map(|id| (id, m)))
.fold(
BTreeMap::new(),
|mut map: BTreeMap<String, Vec<Metric>>, (id, m)| {
map.entry(id).or_default().push(m);
map
},
)
})
}
fn throughput(metric: &Metric, id: String, cache: &mut BTreeMap<String, f64>) -> Option<f64> {
match metric.value() {
MetricValue::Counter { value } => {
let last = cache.insert(id, *value).unwrap_or(0.00);
let throughput = value - last;
Some(throughput)
}
_ => None,
}
}