vector/api/schema/metrics/
mod.rs

1mod allocated_bytes;
2mod errors;
3pub mod filter;
4mod output;
5mod received_bytes;
6mod received_events;
7mod sent_bytes;
8mod sent_events;
9mod sink;
10pub mod source;
11mod transform;
12mod uptime;
13
14#[cfg(feature = "sources-host_metrics")]
15mod host;
16
17pub use allocated_bytes::{AllocatedBytes, ComponentAllocatedBytes};
18use async_graphql::{Interface, Subscription};
19use chrono::{DateTime, Utc};
20pub use errors::{ComponentErrorsTotal, ErrorsTotal};
21pub use filter::*;
22pub use output::*;
23pub use received_bytes::{
24    ComponentReceivedBytesThroughput, ComponentReceivedBytesTotal, ReceivedBytesTotal,
25};
26pub use received_events::{
27    ComponentReceivedEventsThroughput, ComponentReceivedEventsTotal, ReceivedEventsTotal,
28};
29pub use sent_bytes::{ComponentSentBytesThroughput, ComponentSentBytesTotal, SentBytesTotal};
30pub use sent_events::{ComponentSentEventsThroughput, ComponentSentEventsTotal, SentEventsTotal};
31pub use sink::{IntoSinkMetrics, SinkMetrics};
32pub use source::{IntoSourceMetrics, SourceMetrics};
33use tokio_stream::{Stream, StreamExt};
34pub use transform::{IntoTransformMetrics, TransformMetrics};
35pub use uptime::Uptime;
36
37use crate::config::ComponentKey;
38
39#[derive(Interface)]
40#[graphql(field(name = "timestamp", ty = "Option<DateTime<Utc>>"))]
41pub enum MetricType {
42    Uptime(Uptime),
43}
44
45#[derive(Default)]
46pub struct MetricsQuery;
47
48#[cfg(feature = "sources-host_metrics")]
49#[async_graphql::Object]
50impl MetricsQuery {
51    /// Vector host metrics
52    async fn host_metrics(&self) -> host::HostMetrics {
53        host::HostMetrics::new()
54    }
55}
56
57#[derive(Default)]
58pub struct MetricsSubscription;
59
60#[Subscription]
61impl MetricsSubscription {
62    /// Metrics for how long the Vector instance has been running
63    async fn uptime(
64        &self,
65        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
66    ) -> impl Stream<Item = Uptime> + use<> {
67        get_metrics(interval).filter_map(|m| match m.name() {
68            "uptime_seconds" => Some(Uptime::new(m)),
69            _ => None,
70        })
71    }
72
73    /// Total received events metrics
74    #[graphql(deprecation = "Use component_received_events_totals instead")]
75    async fn received_events_total(
76        &self,
77        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
78    ) -> impl Stream<Item = ReceivedEventsTotal> + use<> {
79        get_metrics(interval).filter_map(|m| match m.name() {
80            "component_received_events_total" => Some(ReceivedEventsTotal::new(m)),
81            _ => None,
82        })
83    }
84
85    /// Total received events throughput sampled over the provided millisecond `interval`
86    #[graphql(deprecation = "Use component_received_events_throughputs instead")]
87    async fn received_events_throughput(
88        &self,
89        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
90    ) -> impl Stream<Item = i64> + use<> {
91        counter_throughput(interval, &|m| m.name() == "component_received_events_total")
92            .map(|(_, throughput)| throughput as i64)
93    }
94
95    /// Total incoming component events throughput metrics over `interval`
96    async fn component_received_events_throughputs(
97        &self,
98        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
99    ) -> impl Stream<Item = Vec<ComponentReceivedEventsThroughput>> + use<> {
100        component_counter_throughputs(interval, &|m| m.name() == "component_received_events_total")
101            .map(|m| {
102                m.into_iter()
103                    .map(|(m, throughput)| {
104                        ComponentReceivedEventsThroughput::new(
105                            ComponentKey::from(m.tag_value("component_id").unwrap()),
106                            throughput as i64,
107                        )
108                    })
109                    .collect()
110            })
111    }
112
113    /// Total received component event metrics over `interval`
114    async fn component_received_events_totals(
115        &self,
116        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
117    ) -> impl Stream<Item = Vec<ComponentReceivedEventsTotal>> + use<> {
118        component_counter_metrics(interval, &|m| m.name() == "component_received_events_total").map(
119            |m| {
120                m.into_iter()
121                    .map(ComponentReceivedEventsTotal::new)
122                    .collect()
123            },
124        )
125    }
126
127    /// Total sent events metrics
128    #[graphql(deprecation = "Use component_sent_events_totals instead")]
129    async fn sent_events_total(
130        &self,
131        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
132    ) -> impl Stream<Item = SentEventsTotal> + use<> {
133        get_metrics(interval).filter_map(|m| match m.name() {
134            "component_sent_events_total" => Some(SentEventsTotal::new(m)),
135            _ => None,
136        })
137    }
138
139    /// Total outgoing events throughput sampled over the provided millisecond `interval`
140    #[graphql(deprecation = "Use component_sent_events_throughputs instead")]
141    async fn sent_events_throughput(
142        &self,
143        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
144    ) -> impl Stream<Item = i64> + use<> {
145        counter_throughput(interval, &|m| m.name() == "component_sent_events_total")
146            .map(|(_, throughput)| throughput as i64)
147    }
148
149    /// Total outgoing component event throughput metrics over `interval`
150    async fn component_sent_events_throughputs(
151        &self,
152        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
153    ) -> impl Stream<Item = Vec<ComponentSentEventsThroughput>> + use<> {
154        component_sent_events_total_throughputs_with_outputs(interval).map(|m| {
155            m.into_iter()
156                .map(|(key, total_throughput, outputs)| {
157                    ComponentSentEventsThroughput::new(key, total_throughput, outputs)
158                })
159                .collect()
160        })
161    }
162
163    /// Total outgoing component event metrics over `interval`
164    async fn component_sent_events_totals(
165        &self,
166        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
167    ) -> impl Stream<Item = Vec<ComponentSentEventsTotal>> + use<> {
168        component_sent_events_totals_metrics_with_outputs(interval).map(|ms| {
169            ms.into_iter()
170                .map(|(m, m_by_outputs)| ComponentSentEventsTotal::new(m, m_by_outputs))
171                .collect()
172        })
173    }
174
175    /// Component bytes received metrics over `interval`.
176    async fn component_received_bytes_totals(
177        &self,
178        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
179    ) -> impl Stream<Item = Vec<ComponentReceivedBytesTotal>> + use<> {
180        component_counter_metrics(interval, &|m| m.name() == "component_received_bytes_total").map(
181            |m| {
182                m.into_iter()
183                    .map(ComponentReceivedBytesTotal::new)
184                    .collect()
185            },
186        )
187    }
188
189    /// Component bytes received throughput over `interval`
190    async fn component_received_bytes_throughputs(
191        &self,
192        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
193    ) -> impl Stream<Item = Vec<ComponentReceivedBytesThroughput>> + use<> {
194        component_counter_throughputs(interval, &|m| m.name() == "component_received_bytes_total")
195            .map(|m| {
196                m.into_iter()
197                    .map(|(m, throughput)| {
198                        ComponentReceivedBytesThroughput::new(
199                            ComponentKey::from(m.tag_value("component_id").unwrap()),
200                            throughput as i64,
201                        )
202                    })
203                    .collect()
204            })
205    }
206
207    /// Component bytes sent metrics over `interval`.
208    async fn component_sent_bytes_totals(
209        &self,
210        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
211    ) -> impl Stream<Item = Vec<ComponentSentBytesTotal>> + use<> {
212        component_counter_metrics(interval, &|m| m.name() == "component_sent_bytes_total")
213            .map(|m| m.into_iter().map(ComponentSentBytesTotal::new).collect())
214    }
215
216    /// Component bytes sent throughput over `interval`
217    async fn component_sent_bytes_throughputs(
218        &self,
219        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
220    ) -> impl Stream<Item = Vec<ComponentSentBytesThroughput>> + use<> {
221        component_counter_throughputs(interval, &|m| m.name() == "component_sent_bytes_total").map(
222            |m| {
223                m.into_iter()
224                    .map(|(m, throughput)| {
225                        ComponentSentBytesThroughput::new(
226                            ComponentKey::from(m.tag_value("component_id").unwrap()),
227                            throughput as i64,
228                        )
229                    })
230                    .collect()
231            },
232        )
233    }
234
235    /// Total error metrics.
236    async fn errors_total(
237        &self,
238        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
239    ) -> impl Stream<Item = ErrorsTotal> + use<> {
240        get_metrics(interval)
241            .filter(|m| m.name().ends_with("_errors_total"))
242            .map(ErrorsTotal::new)
243    }
244
245    /// Allocated bytes metrics.
246    async fn allocated_bytes(
247        &self,
248        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
249    ) -> impl Stream<Item = AllocatedBytes> + use<> {
250        get_metrics(interval)
251            .filter(|m| m.name() == "component_allocated_bytes")
252            .map(AllocatedBytes::new)
253    }
254
255    /// Component allocation metrics
256    async fn component_allocated_bytes(
257        &self,
258        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
259    ) -> impl Stream<Item = Vec<ComponentAllocatedBytes>> + use<> {
260        component_gauge_metrics(interval, &|m| m.name() == "component_allocated_bytes")
261            .map(|m| m.into_iter().map(ComponentAllocatedBytes::new).collect())
262    }
263
264    /// Component error metrics over `interval`.
265    async fn component_errors_totals(
266        &self,
267        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
268    ) -> impl Stream<Item = Vec<ComponentErrorsTotal>> + use<> {
269        component_counter_metrics(interval, &|m| m.name().ends_with("_errors_total"))
270            .map(|m| m.into_iter().map(ComponentErrorsTotal::new).collect())
271    }
272
273    /// All metrics.
274    async fn metrics(
275        &self,
276        #[graphql(default = 1000, validator(minimum = 10, maximum = 60_000))] interval: i32,
277    ) -> impl Stream<Item = MetricType> + use<> {
278        get_metrics(interval).filter_map(|m| match m.name() {
279            "uptime_seconds" => Some(MetricType::Uptime(m.into())),
280            _ => None,
281        })
282    }
283}