vector/api/schema/metrics/
mod.rs1mod allocated_bytes;
2mod errors;
3pub mod filter;
4mod output;
5mod received_bytes;
6mod received_events;
7mod sent_bytes;
8mod sent_events;
9mod sink;
10pub mod source;
11mod transform;
12mod uptime;
13
14#[cfg(feature = "sources-host_metrics")]
15mod host;
16
17pub use allocated_bytes::{AllocatedBytes, ComponentAllocatedBytes};
18use async_graphql::{Interface, Subscription};
19use chrono::{DateTime, Utc};
20pub use errors::{ComponentErrorsTotal, ErrorsTotal};
21pub use filter::*;
22pub use output::*;
23pub use received_bytes::{
24 ComponentReceivedBytesThroughput, ComponentReceivedBytesTotal, ReceivedBytesTotal,
25};
26pub use received_events::{
27 ComponentReceivedEventsThroughput, ComponentReceivedEventsTotal, ReceivedEventsTotal,
28};
29pub use sent_bytes::{ComponentSentBytesThroughput, ComponentSentBytesTotal, SentBytesTotal};
30pub use sent_events::{ComponentSentEventsThroughput, ComponentSentEventsTotal, SentEventsTotal};
31pub use sink::{IntoSinkMetrics, SinkMetrics};
32pub use source::{IntoSourceMetrics, SourceMetrics};
33use tokio_stream::{Stream, StreamExt};
34pub use transform::{IntoTransformMetrics, TransformMetrics};
35pub use uptime::Uptime;
36
37use crate::config::ComponentKey;
38
39#[derive(Interface)]
40#[graphql(field(name = "timestamp", ty = "Option<DateTime<Utc>>"))]
41pub enum MetricType {
42 Uptime(Uptime),
43}
44
45#[derive(Default)]
46pub struct MetricsQuery;
47
48#[cfg(feature = "sources-host_metrics")]
49#[async_graphql::Object]
50impl MetricsQuery {
51 async fn host_metrics(&self) -> host::HostMetrics {
53 host::HostMetrics::new()
54 }
55}
56
57#[derive(Default)]
58pub struct MetricsSubscription;
59
60#[Subscription]
61impl MetricsSubscription {
62 async fn uptime(
64 &self,
65 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
66 ) -> impl Stream<Item = Uptime> + use<> {
67 get_metrics(interval).filter_map(|m| match m.name() {
68 "uptime_seconds" => Some(Uptime::new(m)),
69 _ => None,
70 })
71 }
72
73 #[graphql(deprecation = "Use component_received_events_totals instead")]
75 async fn received_events_total(
76 &self,
77 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
78 ) -> impl Stream<Item = ReceivedEventsTotal> + use<> {
79 get_metrics(interval).filter_map(|m| match m.name() {
80 "component_received_events_total" => Some(ReceivedEventsTotal::new(m)),
81 _ => None,
82 })
83 }
84
85 #[graphql(deprecation = "Use component_received_events_throughputs instead")]
87 async fn received_events_throughput(
88 &self,
89 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
90 ) -> impl Stream<Item = i64> + use<> {
91 counter_throughput(interval, &|m| m.name() == "component_received_events_total")
92 .map(|(_, throughput)| throughput as i64)
93 }
94
95 async fn component_received_events_throughputs(
97 &self,
98 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
99 ) -> impl Stream<Item = Vec<ComponentReceivedEventsThroughput>> + use<> {
100 component_counter_throughputs(interval, &|m| m.name() == "component_received_events_total")
101 .map(|m| {
102 m.into_iter()
103 .map(|(m, throughput)| {
104 ComponentReceivedEventsThroughput::new(
105 ComponentKey::from(m.tag_value("component_id").unwrap()),
106 throughput as i64,
107 )
108 })
109 .collect()
110 })
111 }
112
113 async fn component_received_events_totals(
115 &self,
116 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
117 ) -> impl Stream<Item = Vec<ComponentReceivedEventsTotal>> + use<> {
118 component_counter_metrics(interval, &|m| m.name() == "component_received_events_total").map(
119 |m| {
120 m.into_iter()
121 .map(ComponentReceivedEventsTotal::new)
122 .collect()
123 },
124 )
125 }
126
127 #[graphql(deprecation = "Use component_sent_events_totals instead")]
129 async fn sent_events_total(
130 &self,
131 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
132 ) -> impl Stream<Item = SentEventsTotal> + use<> {
133 get_metrics(interval).filter_map(|m| match m.name() {
134 "component_sent_events_total" => Some(SentEventsTotal::new(m)),
135 _ => None,
136 })
137 }
138
139 #[graphql(deprecation = "Use component_sent_events_throughputs instead")]
141 async fn sent_events_throughput(
142 &self,
143 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
144 ) -> impl Stream<Item = i64> + use<> {
145 counter_throughput(interval, &|m| m.name() == "component_sent_events_total")
146 .map(|(_, throughput)| throughput as i64)
147 }
148
149 async fn component_sent_events_throughputs(
151 &self,
152 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
153 ) -> impl Stream<Item = Vec<ComponentSentEventsThroughput>> + use<> {
154 component_sent_events_total_throughputs_with_outputs(interval).map(|m| {
155 m.into_iter()
156 .map(|(key, total_throughput, outputs)| {
157 ComponentSentEventsThroughput::new(key, total_throughput, outputs)
158 })
159 .collect()
160 })
161 }
162
163 async fn component_sent_events_totals(
165 &self,
166 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
167 ) -> impl Stream<Item = Vec<ComponentSentEventsTotal>> + use<> {
168 component_sent_events_totals_metrics_with_outputs(interval).map(|ms| {
169 ms.into_iter()
170 .map(|(m, m_by_outputs)| ComponentSentEventsTotal::new(m, m_by_outputs))
171 .collect()
172 })
173 }
174
175 async fn component_received_bytes_totals(
177 &self,
178 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
179 ) -> impl Stream<Item = Vec<ComponentReceivedBytesTotal>> + use<> {
180 component_counter_metrics(interval, &|m| m.name() == "component_received_bytes_total").map(
181 |m| {
182 m.into_iter()
183 .map(ComponentReceivedBytesTotal::new)
184 .collect()
185 },
186 )
187 }
188
189 async fn component_received_bytes_throughputs(
191 &self,
192 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
193 ) -> impl Stream<Item = Vec<ComponentReceivedBytesThroughput>> + use<> {
194 component_counter_throughputs(interval, &|m| m.name() == "component_received_bytes_total")
195 .map(|m| {
196 m.into_iter()
197 .map(|(m, throughput)| {
198 ComponentReceivedBytesThroughput::new(
199 ComponentKey::from(m.tag_value("component_id").unwrap()),
200 throughput as i64,
201 )
202 })
203 .collect()
204 })
205 }
206
207 async fn component_sent_bytes_totals(
209 &self,
210 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
211 ) -> impl Stream<Item = Vec<ComponentSentBytesTotal>> + use<> {
212 component_counter_metrics(interval, &|m| m.name() == "component_sent_bytes_total")
213 .map(|m| m.into_iter().map(ComponentSentBytesTotal::new).collect())
214 }
215
216 async fn component_sent_bytes_throughputs(
218 &self,
219 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
220 ) -> impl Stream<Item = Vec<ComponentSentBytesThroughput>> + use<> {
221 component_counter_throughputs(interval, &|m| m.name() == "component_sent_bytes_total").map(
222 |m| {
223 m.into_iter()
224 .map(|(m, throughput)| {
225 ComponentSentBytesThroughput::new(
226 ComponentKey::from(m.tag_value("component_id").unwrap()),
227 throughput as i64,
228 )
229 })
230 .collect()
231 },
232 )
233 }
234
235 async fn errors_total(
237 &self,
238 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
239 ) -> impl Stream<Item = ErrorsTotal> + use<> {
240 get_metrics(interval)
241 .filter(|m| m.name().ends_with("_errors_total"))
242 .map(ErrorsTotal::new)
243 }
244
245 async fn allocated_bytes(
247 &self,
248 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
249 ) -> impl Stream<Item = AllocatedBytes> + use<> {
250 get_metrics(interval)
251 .filter(|m| m.name() == "component_allocated_bytes")
252 .map(AllocatedBytes::new)
253 }
254
255 async fn component_allocated_bytes(
257 &self,
258 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
259 ) -> impl Stream<Item = Vec<ComponentAllocatedBytes>> + use<> {
260 component_gauge_metrics(interval, &|m| m.name() == "component_allocated_bytes")
261 .map(|m| m.into_iter().map(ComponentAllocatedBytes::new).collect())
262 }
263
264 async fn component_errors_totals(
266 &self,
267 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
268 ) -> impl Stream<Item = Vec<ComponentErrorsTotal>> + use<> {
269 component_counter_metrics(interval, &|m| m.name().ends_with("_errors_total"))
270 .map(|m| m.into_iter().map(ComponentErrorsTotal::new).collect())
271 }
272
273 async fn metrics(
275 &self,
276 #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
277 ) -> impl Stream<Item = MetricType> + use<> {
278 get_metrics(interval).filter_map(|m| match m.name() {
279 "uptime_seconds" => Some(MetricType::Uptime(m.into())),
280 _ => None,
281 })
282 }
283}