vector/api/schema/metrics/
output.rs

1use async_graphql::Object;
2use vector_lib::config::ComponentKey;
3
4use super::{by_component_key, sum_metrics, SentEventsTotal};
5use crate::event::Metric;
6
7#[derive(Debug, Clone)]
8pub struct Output {
9    output_id: String,
10    sent_events_total: Option<Metric>,
11}
12
13impl Output {
14    pub const fn new(output_id: String, sent_events_total: Option<Metric>) -> Self {
15        Self {
16            output_id,
17            sent_events_total,
18        }
19    }
20}
21
22#[Object]
23impl Output {
24    /// Id of the output stream
25    pub async fn output_id(&self) -> &str {
26        self.output_id.as_ref()
27    }
28
29    /// Total sent events for the current output stream
30    pub async fn sent_events_total(&self) -> Option<SentEventsTotal> {
31        self.sent_events_total
32            .as_ref()
33            .map(|metric| SentEventsTotal::new(metric.clone()))
34    }
35}
36
37#[derive(Debug, Clone)]
38pub struct OutputThroughput {
39    output_id: String,
40    throughput: i64,
41}
42
43impl OutputThroughput {
44    pub const fn new(output_id: String, throughput: i64) -> Self {
45        Self {
46            output_id,
47            throughput,
48        }
49    }
50}
51
52#[Object]
53impl OutputThroughput {
54    /// Id of the output stream
55    pub async fn output_id(&self) -> &str {
56        self.output_id.as_ref()
57    }
58
59    /// Throughput for the output stream
60    pub async fn throughput(&self) -> i64 {
61        self.throughput
62    }
63}
64
65pub fn outputs_by_component_key(component_key: &ComponentKey, outputs: &[String]) -> Vec<Output> {
66    let metrics = by_component_key(component_key)
67        .into_iter()
68        .filter(|m| m.name() == "component_sent_events_total")
69        .collect::<Vec<_>>();
70
71    outputs
72        .iter()
73        .map(|output| {
74            Output::new(
75                output.clone(),
76                filter_output_metric(&metrics, output.as_ref()),
77            )
78        })
79        .collect::<Vec<_>>()
80}
81
82pub fn filter_output_metric(metrics: &[Metric], output_name: &str) -> Option<Metric> {
83    sum_metrics(
84        metrics
85            .iter()
86            .filter(|m| m.tag_matches("output", output_name)),
87    )
88}