vector/api/schema/metrics/
mod.rs

1mod allocated_bytes;
2mod errors;
3pub mod filter;
4mod output;
5mod received_bytes;
6mod received_events;
7mod sent_bytes;
8mod sent_events;
9mod sink;
10pub mod source;
11mod transform;
12mod uptime;
13
14#[cfg(feature = "sources-host_metrics")]
15mod host;
16
17pub use allocated_bytes::{AllocatedBytes, ComponentAllocatedBytes};
18use async_graphql::{Interface, Subscription};
19use chrono::{DateTime, Utc};
20pub use errors::{ComponentErrorsTotal, ErrorsTotal};
21pub use filter::*;
22pub use output::*;
23pub use received_bytes::{
24    ComponentReceivedBytesThroughput, ComponentReceivedBytesTotal, ReceivedBytesTotal,
25};
26pub use received_events::{
27    ComponentReceivedEventsThroughput, ComponentReceivedEventsTotal, ReceivedEventsTotal,
28};
29pub use sent_bytes::{ComponentSentBytesThroughput, ComponentSentBytesTotal, SentBytesTotal};
30pub use sent_events::{ComponentSentEventsThroughput, ComponentSentEventsTotal, SentEventsTotal};
31pub use sink::{IntoSinkMetrics, SinkMetrics};
32pub use source::{IntoSourceMetrics, SourceMetrics};
33use tokio_stream::{Stream, StreamExt};
34pub use transform::{IntoTransformMetrics, TransformMetrics};
35pub use uptime::Uptime;
36
37use crate::config::ComponentKey;
38
39#[derive(Interface)]
40#[graphql(field(name = "timestamp", ty = "Option<DateTime<Utc>>"))]
41pub enum MetricType {
42    Uptime(Uptime),
43}
44
45#[cfg(feature = "sources-host_metrics")]
46#[derive(Default)]
47pub struct MetricsQuery;
48
49#[cfg(feature = "sources-host_metrics")]
50#[async_graphql::Object]
51impl MetricsQuery {
52    /// Vector host metrics
53    async fn host_metrics(&self) -> host::HostMetrics {
54        host::HostMetrics::new()
55    }
56}
57
58#[derive(Default)]
59pub struct MetricsSubscription;
60
61#[Subscription]
62impl MetricsSubscription {
63    /// Metrics for how long the Vector instance has been running
64    async fn uptime(
65        &self,
66        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
67    ) -> impl Stream<Item = Uptime> + use<> {
68        get_metrics(interval).filter_map(|m| match m.name() {
69            "uptime_seconds" => Some(Uptime::new(m)),
70            _ => None,
71        })
72    }
73
74    /// Total received events metrics
75    #[graphql(deprecation = "Use component_received_events_totals instead")]
76    async fn received_events_total(
77        &self,
78        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
79    ) -> impl Stream<Item = ReceivedEventsTotal> + use<> {
80        get_metrics(interval).filter_map(|m| match m.name() {
81            "component_received_events_total" => Some(ReceivedEventsTotal::new(m)),
82            _ => None,
83        })
84    }
85
86    /// Total received events throughput sampled over the provided millisecond `interval`
87    #[graphql(deprecation = "Use component_received_events_throughputs instead")]
88    async fn received_events_throughput(
89        &self,
90        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
91    ) -> impl Stream<Item = i64> + use<> {
92        counter_throughput(interval, &|m| m.name() == "component_received_events_total")
93            .map(|(_, throughput)| throughput as i64)
94    }
95
96    /// Total incoming component events throughput metrics over `interval`
97    async fn component_received_events_throughputs(
98        &self,
99        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
100    ) -> impl Stream<Item = Vec<ComponentReceivedEventsThroughput>> + use<> {
101        component_counter_throughputs(interval, &|m| m.name() == "component_received_events_total")
102            .map(|m| {
103                m.into_iter()
104                    .map(|(m, throughput)| {
105                        ComponentReceivedEventsThroughput::new(
106                            ComponentKey::from(m.tag_value("component_id").unwrap()),
107                            throughput as i64,
108                        )
109                    })
110                    .collect()
111            })
112    }
113
114    /// Total received component event metrics over `interval`
115    async fn component_received_events_totals(
116        &self,
117        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
118    ) -> impl Stream<Item = Vec<ComponentReceivedEventsTotal>> + use<> {
119        component_counter_metrics(interval, &|m| m.name() == "component_received_events_total").map(
120            |m| {
121                m.into_iter()
122                    .map(ComponentReceivedEventsTotal::new)
123                    .collect()
124            },
125        )
126    }
127
128    /// Total sent events metrics
129    #[graphql(deprecation = "Use component_sent_events_totals instead")]
130    async fn sent_events_total(
131        &self,
132        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
133    ) -> impl Stream<Item = SentEventsTotal> + use<> {
134        get_metrics(interval).filter_map(|m| match m.name() {
135            "component_sent_events_total" => Some(SentEventsTotal::new(m)),
136            _ => None,
137        })
138    }
139
140    /// Total outgoing events throughput sampled over the provided millisecond `interval`
141    #[graphql(deprecation = "Use component_sent_events_throughputs instead")]
142    async fn sent_events_throughput(
143        &self,
144        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
145    ) -> impl Stream<Item = i64> + use<> {
146        counter_throughput(interval, &|m| m.name() == "component_sent_events_total")
147            .map(|(_, throughput)| throughput as i64)
148    }
149
150    /// Total outgoing component event throughput metrics over `interval`
151    async fn component_sent_events_throughputs(
152        &self,
153        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
154    ) -> impl Stream<Item = Vec<ComponentSentEventsThroughput>> + use<> {
155        component_sent_events_total_throughputs_with_outputs(interval).map(|m| {
156            m.into_iter()
157                .map(|(key, total_throughput, outputs)| {
158                    ComponentSentEventsThroughput::new(key, total_throughput, outputs)
159                })
160                .collect()
161        })
162    }
163
164    /// Total outgoing component event metrics over `interval`
165    async fn component_sent_events_totals(
166        &self,
167        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
168    ) -> impl Stream<Item = Vec<ComponentSentEventsTotal>> + use<> {
169        component_sent_events_totals_metrics_with_outputs(interval).map(|ms| {
170            ms.into_iter()
171                .map(|(m, m_by_outputs)| ComponentSentEventsTotal::new(m, m_by_outputs))
172                .collect()
173        })
174    }
175
176    /// Component bytes received metrics over `interval`.
177    async fn component_received_bytes_totals(
178        &self,
179        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
180    ) -> impl Stream<Item = Vec<ComponentReceivedBytesTotal>> + use<> {
181        component_counter_metrics(interval, &|m| m.name() == "component_received_bytes_total").map(
182            |m| {
183                m.into_iter()
184                    .map(ComponentReceivedBytesTotal::new)
185                    .collect()
186            },
187        )
188    }
189
190    /// Component bytes received throughput over `interval`
191    async fn component_received_bytes_throughputs(
192        &self,
193        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
194    ) -> impl Stream<Item = Vec<ComponentReceivedBytesThroughput>> + use<> {
195        component_counter_throughputs(interval, &|m| m.name() == "component_received_bytes_total")
196            .map(|m| {
197                m.into_iter()
198                    .map(|(m, throughput)| {
199                        ComponentReceivedBytesThroughput::new(
200                            ComponentKey::from(m.tag_value("component_id").unwrap()),
201                            throughput as i64,
202                        )
203                    })
204                    .collect()
205            })
206    }
207
208    /// Component bytes sent metrics over `interval`.
209    async fn component_sent_bytes_totals(
210        &self,
211        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
212    ) -> impl Stream<Item = Vec<ComponentSentBytesTotal>> + use<> {
213        component_counter_metrics(interval, &|m| m.name() == "component_sent_bytes_total")
214            .map(|m| m.into_iter().map(ComponentSentBytesTotal::new).collect())
215    }
216
217    /// Component bytes sent throughput over `interval`
218    async fn component_sent_bytes_throughputs(
219        &self,
220        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
221    ) -> impl Stream<Item = Vec<ComponentSentBytesThroughput>> + use<> {
222        component_counter_throughputs(interval, &|m| m.name() == "component_sent_bytes_total").map(
223            |m| {
224                m.into_iter()
225                    .map(|(m, throughput)| {
226                        ComponentSentBytesThroughput::new(
227                            ComponentKey::from(m.tag_value("component_id").unwrap()),
228                            throughput as i64,
229                        )
230                    })
231                    .collect()
232            },
233        )
234    }
235
236    /// Total error metrics.
237    async fn errors_total(
238        &self,
239        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
240    ) -> impl Stream<Item = ErrorsTotal> + use<> {
241        get_metrics(interval)
242            .filter(|m| m.name().ends_with("_errors_total"))
243            .map(ErrorsTotal::new)
244    }
245
246    /// Allocated bytes metrics.
247    async fn allocated_bytes(
248        &self,
249        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
250    ) -> impl Stream<Item = AllocatedBytes> + use<> {
251        get_metrics(interval)
252            .filter(|m| m.name() == "component_allocated_bytes")
253            .map(AllocatedBytes::new)
254    }
255
256    /// Component allocation metrics
257    async fn component_allocated_bytes(
258        &self,
259        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
260    ) -> impl Stream<Item = Vec<ComponentAllocatedBytes>> + use<> {
261        component_gauge_metrics(interval, &|m| m.name() == "component_allocated_bytes")
262            .map(|m| m.into_iter().map(ComponentAllocatedBytes::new).collect())
263    }
264
265    /// Component error metrics over `interval`.
266    async fn component_errors_totals(
267        &self,
268        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
269    ) -> impl Stream<Item = Vec<ComponentErrorsTotal>> + use<> {
270        component_counter_metrics(interval, &|m| m.name().ends_with("_errors_total"))
271            .map(|m| m.into_iter().map(ComponentErrorsTotal::new).collect())
272    }
273
274    /// All metrics.
275    async fn metrics(
276        &self,
277        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
278    ) -> impl Stream<Item = MetricType> + use<> {
279        get_metrics(interval).filter_map(|m| match m.name() {
280            "uptime_seconds" => Some(MetricType::Uptime(m.into())),
281            _ => None,
282        })
283    }
284}