vector/api/schema/metrics/
mod.rs1mod allocated_bytes;
2mod errors;
3pub mod filter;
4mod output;
5mod received_bytes;
6mod received_events;
7mod sent_bytes;
8mod sent_events;
9mod sink;
10pub mod source;
11mod transform;
12mod uptime;
13
14#[cfg(feature = "sources-host_metrics")]
15mod host;
16
17pub use allocated_bytes::{AllocatedBytes, ComponentAllocatedBytes};
18use async_graphql::{Interface, Subscription};
19use chrono::{DateTime, Utc};
20pub use errors::{ComponentErrorsTotal, ErrorsTotal};
21pub use filter::*;
22pub use output::*;
23pub use received_bytes::{
24 ComponentReceivedBytesThroughput, ComponentReceivedBytesTotal, ReceivedBytesTotal,
25};
26pub use received_events::{
27 ComponentReceivedEventsThroughput, ComponentReceivedEventsTotal, ReceivedEventsTotal,
28};
29pub use sent_bytes::{ComponentSentBytesThroughput, ComponentSentBytesTotal, SentBytesTotal};
30pub use sent_events::{ComponentSentEventsThroughput, ComponentSentEventsTotal, SentEventsTotal};
31pub use sink::{IntoSinkMetrics, SinkMetrics};
32pub use source::{IntoSourceMetrics, SourceMetrics};
33use tokio_stream::{Stream, StreamExt};
34pub use transform::{IntoTransformMetrics, TransformMetrics};
35pub use uptime::Uptime;
36
37use crate::config::ComponentKey;
38
39#[derive(Interface)]
40#[graphql(field(name = "timestamp", ty = "Option<DateTime<Utc>>"))]
41pub enum MetricType {
42 Uptime(Uptime),
43}
44
45#[cfg(feature = "sources-host_metrics")]
46#[derive(Default)]
47pub struct MetricsQuery;
48
49#[cfg(feature = "sources-host_metrics")]
50#[async_graphql::Object]
51impl MetricsQuery {
52 async fn host_metrics(&self) -> host::HostMetrics {
54 host::HostMetrics::new()
55 }
56}
57
58#[derive(Default)]
59pub struct MetricsSubscription;
60
61#[Subscription]
62impl MetricsSubscription {
63 async fn uptime(
65 &self,
66 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
67 ) -> impl Stream<Item = Uptime> + use<> {
68 get_metrics(interval).filter_map(|m| match m.name() {
69 "uptime_seconds" => Some(Uptime::new(m)),
70 _ => None,
71 })
72 }
73
74 #[graphql(deprecation = "Use component_received_events_totals instead")]
76 async fn received_events_total(
77 &self,
78 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
79 ) -> impl Stream<Item = ReceivedEventsTotal> + use<> {
80 get_metrics(interval).filter_map(|m| match m.name() {
81 "component_received_events_total" => Some(ReceivedEventsTotal::new(m)),
82 _ => None,
83 })
84 }
85
86 #[graphql(deprecation = "Use component_received_events_throughputs instead")]
88 async fn received_events_throughput(
89 &self,
90 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
91 ) -> impl Stream<Item = i64> + use<> {
92 counter_throughput(interval, &|m| m.name() == "component_received_events_total")
93 .map(|(_, throughput)| throughput as i64)
94 }
95
96 async fn component_received_events_throughputs(
98 &self,
99 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
100 ) -> impl Stream<Item = Vec<ComponentReceivedEventsThroughput>> + use<> {
101 component_counter_throughputs(interval, &|m| m.name() == "component_received_events_total")
102 .map(|m| {
103 m.into_iter()
104 .map(|(m, throughput)| {
105 ComponentReceivedEventsThroughput::new(
106 ComponentKey::from(m.tag_value("component_id").unwrap()),
107 throughput as i64,
108 )
109 })
110 .collect()
111 })
112 }
113
114 async fn component_received_events_totals(
116 &self,
117 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
118 ) -> impl Stream<Item = Vec<ComponentReceivedEventsTotal>> + use<> {
119 component_counter_metrics(interval, &|m| m.name() == "component_received_events_total").map(
120 |m| {
121 m.into_iter()
122 .map(ComponentReceivedEventsTotal::new)
123 .collect()
124 },
125 )
126 }
127
128 #[graphql(deprecation = "Use component_sent_events_totals instead")]
130 async fn sent_events_total(
131 &self,
132 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
133 ) -> impl Stream<Item = SentEventsTotal> + use<> {
134 get_metrics(interval).filter_map(|m| match m.name() {
135 "component_sent_events_total" => Some(SentEventsTotal::new(m)),
136 _ => None,
137 })
138 }
139
140 #[graphql(deprecation = "Use component_sent_events_throughputs instead")]
142 async fn sent_events_throughput(
143 &self,
144 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
145 ) -> impl Stream<Item = i64> + use<> {
146 counter_throughput(interval, &|m| m.name() == "component_sent_events_total")
147 .map(|(_, throughput)| throughput as i64)
148 }
149
150 async fn component_sent_events_throughputs(
152 &self,
153 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
154 ) -> impl Stream<Item = Vec<ComponentSentEventsThroughput>> + use<> {
155 component_sent_events_total_throughputs_with_outputs(interval).map(|m| {
156 m.into_iter()
157 .map(|(key, total_throughput, outputs)| {
158 ComponentSentEventsThroughput::new(key, total_throughput, outputs)
159 })
160 .collect()
161 })
162 }
163
164 async fn component_sent_events_totals(
166 &self,
167 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
168 ) -> impl Stream<Item = Vec<ComponentSentEventsTotal>> + use<> {
169 component_sent_events_totals_metrics_with_outputs(interval).map(|ms| {
170 ms.into_iter()
171 .map(|(m, m_by_outputs)| ComponentSentEventsTotal::new(m, m_by_outputs))
172 .collect()
173 })
174 }
175
176 async fn component_received_bytes_totals(
178 &self,
179 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
180 ) -> impl Stream<Item = Vec<ComponentReceivedBytesTotal>> + use<> {
181 component_counter_metrics(interval, &|m| m.name() == "component_received_bytes_total").map(
182 |m| {
183 m.into_iter()
184 .map(ComponentReceivedBytesTotal::new)
185 .collect()
186 },
187 )
188 }
189
190 async fn component_received_bytes_throughputs(
192 &self,
193 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
194 ) -> impl Stream<Item = Vec<ComponentReceivedBytesThroughput>> + use<> {
195 component_counter_throughputs(interval, &|m| m.name() == "component_received_bytes_total")
196 .map(|m| {
197 m.into_iter()
198 .map(|(m, throughput)| {
199 ComponentReceivedBytesThroughput::new(
200 ComponentKey::from(m.tag_value("component_id").unwrap()),
201 throughput as i64,
202 )
203 })
204 .collect()
205 })
206 }
207
208 async fn component_sent_bytes_totals(
210 &self,
211 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
212 ) -> impl Stream<Item = Vec<ComponentSentBytesTotal>> + use<> {
213 component_counter_metrics(interval, &|m| m.name() == "component_sent_bytes_total")
214 .map(|m| m.into_iter().map(ComponentSentBytesTotal::new).collect())
215 }
216
217 async fn component_sent_bytes_throughputs(
219 &self,
220 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
221 ) -> impl Stream<Item = Vec<ComponentSentBytesThroughput>> + use<> {
222 component_counter_throughputs(interval, &|m| m.name() == "component_sent_bytes_total").map(
223 |m| {
224 m.into_iter()
225 .map(|(m, throughput)| {
226 ComponentSentBytesThroughput::new(
227 ComponentKey::from(m.tag_value("component_id").unwrap()),
228 throughput as i64,
229 )
230 })
231 .collect()
232 },
233 )
234 }
235
236 async fn errors_total(
238 &self,
239 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
240 ) -> impl Stream<Item = ErrorsTotal> + use<> {
241 get_metrics(interval)
242 .filter(|m| m.name().ends_with("_errors_total"))
243 .map(ErrorsTotal::new)
244 }
245
246 async fn allocated_bytes(
248 &self,
249 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
250 ) -> impl Stream<Item = AllocatedBytes> + use<> {
251 get_metrics(interval)
252 .filter(|m| m.name() == "component_allocated_bytes")
253 .map(AllocatedBytes::new)
254 }
255
256 async fn component_allocated_bytes(
258 &self,
259 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
260 ) -> impl Stream<Item = Vec<ComponentAllocatedBytes>> + use<> {
261 component_gauge_metrics(interval, &|m| m.name() == "component_allocated_bytes")
262 .map(|m| m.into_iter().map(ComponentAllocatedBytes::new).collect())
263 }
264
265 async fn component_errors_totals(
267 &self,
268 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
269 ) -> impl Stream<Item = Vec<ComponentErrorsTotal>> + use<> {
270 component_counter_metrics(interval, &|m| m.name().ends_with("_errors_total"))
271 .map(|m| m.into_iter().map(ComponentErrorsTotal::new).collect())
272 }
273
274 async fn metrics(
276 &self,
277 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
278 ) -> impl Stream<Item = MetricType> + use<> {
279 get_metrics(interval).filter_map(|m| match m.name() {
280 "uptime_seconds" => Some(MetricType::Uptime(m.into())),
281 _ => None,
282 })
283 }
284}