1use std::{cmp::Ordering, collections::BTreeMap};
2
3use async_graphql::{Enum, InputObject, Object};
4
5use crate::{
6 api::schema::{
7 filter::{filter_items, CustomFilter, StringFilter},
8 metrics::{self, MetricsFilter},
9 relay, sort,
10 },
11 event::Metric,
12 filter_check,
13};
14
15#[derive(Clone)]
16pub struct FileSourceMetricFile<'a> {
17 name: String,
18 metrics: Vec<&'a Metric>,
19}
20
21impl<'a> FileSourceMetricFile<'a> {
22 #[allow(clippy::missing_const_for_fn)] fn from_tuple((name, metrics): (String, Vec<&'a Metric>)) -> Self {
25 Self { name, metrics }
26 }
27
28 pub fn get_name(&self) -> &str {
29 self.name.as_str()
30 }
31}
32
33#[Object]
34impl FileSourceMetricFile<'_> {
35 async fn name(&self) -> &str {
37 &*self.name
38 }
39
40 async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
42 self.metrics.received_bytes_total()
43 }
44
45 async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
47 self.metrics.received_events_total()
48 }
49
50 async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
52 self.metrics.sent_events_total()
53 }
54}
55
56#[derive(Debug, Clone)]
57pub struct FileSourceMetrics(Vec<Metric>);
58
59impl FileSourceMetrics {
60 pub const fn new(metrics: Vec<Metric>) -> Self {
61 Self(metrics)
62 }
63
64 pub fn get_files(&self) -> Vec<FileSourceMetricFile<'_>> {
65 self.0
66 .iter()
67 .filter_map(|m| m.tag_value("file").map(|file| (file, m)))
68 .fold(
69 BTreeMap::new(),
70 |mut map: BTreeMap<String, Vec<&Metric>>, (file, m)| {
71 map.entry(file).or_default().push(m);
72 map
73 },
74 )
75 .into_iter()
76 .map(FileSourceMetricFile::from_tuple)
77 .collect()
78 }
79}
80
81#[derive(Enum, Copy, Clone, Eq, PartialEq)]
82pub enum FileSourceMetricFilesSortFieldName {
83 Name,
84 ReceivedBytesTotal,
85 ReceivedEventsTotal,
86 SentEventsTotal,
87}
88
89impl sort::SortableByField<FileSourceMetricFilesSortFieldName> for FileSourceMetricFile<'_> {
90 fn sort(&self, rhs: &Self, field: &FileSourceMetricFilesSortFieldName) -> Ordering {
91 match field {
92 FileSourceMetricFilesSortFieldName::Name => Ord::cmp(&self.name, &rhs.name),
93 FileSourceMetricFilesSortFieldName::ReceivedBytesTotal => Ord::cmp(
94 &self
95 .metrics
96 .received_bytes_total()
97 .map(|m| m.get_received_bytes_total() as i64)
98 .unwrap_or(0),
99 &rhs.metrics
100 .received_bytes_total()
101 .map(|m| m.get_received_bytes_total() as i64)
102 .unwrap_or(0),
103 ),
104 FileSourceMetricFilesSortFieldName::ReceivedEventsTotal => Ord::cmp(
105 &self
106 .metrics
107 .received_events_total()
108 .map(|m| m.get_received_events_total() as i64)
109 .unwrap_or(0),
110 &rhs.metrics
111 .received_events_total()
112 .map(|m| m.get_received_events_total() as i64)
113 .unwrap_or(0),
114 ),
115 FileSourceMetricFilesSortFieldName::SentEventsTotal => Ord::cmp(
116 &self
117 .metrics
118 .sent_events_total()
119 .map(|m| m.get_sent_events_total() as i64)
120 .unwrap_or(0),
121 &rhs.metrics
122 .sent_events_total()
123 .map(|m| m.get_sent_events_total() as i64)
124 .unwrap_or(0),
125 ),
126 }
127 }
128}
129
130#[derive(Default, InputObject)]
131pub struct FileSourceMetricsFilesFilter {
132 name: Option<Vec<StringFilter>>,
133 or: Option<Vec<Self>>,
134}
135
136impl CustomFilter<FileSourceMetricFile<'_>> for FileSourceMetricsFilesFilter {
137 fn matches(&self, file: &FileSourceMetricFile<'_>) -> bool {
138 filter_check!(self
139 .name
140 .as_ref()
141 .map(|f| f.iter().all(|f| f.filter_value(file.get_name()))));
142 true
143 }
144
145 fn or(&self) -> Option<&Vec<Self>> {
146 self.or.as_ref()
147 }
148}
149
150#[allow(clippy::too_many_arguments)]
151#[Object]
152impl FileSourceMetrics {
153 pub async fn files(
155 &self,
156 after: Option<String>,
157 before: Option<String>,
158 first: Option<i32>,
159 last: Option<i32>,
160 filter: Option<FileSourceMetricsFilesFilter>,
161 sort: Option<Vec<sort::SortField<FileSourceMetricFilesSortFieldName>>>,
162 ) -> relay::ConnectionResult<FileSourceMetricFile<'_>> {
163 let filter = filter.unwrap_or_default();
164 let mut files = filter_items(self.get_files().into_iter(), &filter);
165
166 if let Some(sort_fields) = sort {
167 sort::by_fields(&mut files, &sort_fields);
168 }
169
170 relay::query(
171 files.into_iter(),
172 relay::Params::new(after, before, first, last),
173 10,
174 )
175 .await
176 }
177
178 pub async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
180 self.0.received_bytes_total()
181 }
182
183 pub async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
185 self.0.received_events_total()
186 }
187
188 pub async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
190 self.0.sent_events_total()
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use crate::{
198 api::schema::sort::SortField,
199 event::{MetricKind, MetricValue},
200 };
201
202 struct FileSourceMetricTest {
203 name: &'static str,
204 events_metric: Metric,
205 bytes_metric: Metric,
206 }
207
208 impl FileSourceMetricTest {
209 fn new(name: &'static str, events_processed: f64, bytes_processed: f64) -> Self {
210 Self {
211 name,
212 events_metric: metric("component_sent_events_total", events_processed),
213 bytes_metric: metric("component_received_bytes_total", bytes_processed),
214 }
215 }
216
217 fn get_metric(&self) -> FileSourceMetricFile {
218 FileSourceMetricFile::from_tuple((
219 self.name.to_string(),
220 vec![&self.bytes_metric, &self.events_metric],
221 ))
222 }
223 }
224
225 fn metric(name: &str, value: f64) -> Metric {
226 Metric::new(
227 name,
228 MetricKind::Incremental,
229 MetricValue::Counter { value },
230 )
231 }
232
233 fn by_name(name: &'static str) -> FileSourceMetricTest {
234 FileSourceMetricTest::new(name, 0.00, 0.00)
235 }
236
237 #[test]
238 fn sort_name_asc() {
239 let t1 = by_name("/path/to/file/2");
240 let t2 = by_name("/path/to/file/3");
241 let t3 = by_name("/path/to/file/1");
242
243 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
244 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
245 field: FileSourceMetricFilesSortFieldName::Name,
246 direction: sort::Direction::Asc,
247 }];
248
249 sort::by_fields(&mut files, &fields);
250
251 for (i, f) in ["1", "2", "3"].iter().enumerate() {
252 assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
253 }
254 }
255
256 #[test]
257 fn sort_name_desc() {
258 let t1 = by_name("/path/to/file/2");
259 let t2 = by_name("/path/to/file/3");
260 let t3 = by_name("/path/to/file/1");
261
262 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
263 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
264 field: FileSourceMetricFilesSortFieldName::Name,
265 direction: sort::Direction::Desc,
266 }];
267
268 sort::by_fields(&mut files, &fields);
269
270 for (i, f) in ["3", "2", "1"].iter().enumerate() {
271 assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
272 }
273 }
274
275 #[test]
276 fn processed_events_asc() {
277 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
278 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
279 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
280
281 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
282 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
283 field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
284 direction: sort::Direction::Asc,
285 }];
286
287 sort::by_fields(&mut files, &fields);
288
289 for (i, f) in ["c", "b", "a"].iter().enumerate() {
290 assert_eq!(&files[i].name, *f);
291 }
292 }
293
294 #[test]
295 fn processed_events_desc() {
296 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
297 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
298 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
299
300 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
301 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
302 field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
303 direction: sort::Direction::Desc,
304 }];
305
306 sort::by_fields(&mut files, &fields);
307
308 for (i, f) in ["a", "b", "c"].iter().enumerate() {
309 assert_eq!(&files[i].name, *f);
310 }
311 }
312
313 #[test]
314 fn received_bytes_asc() {
315 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
316 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
317 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
318
319 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
320 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
321 field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
322 direction: sort::Direction::Asc,
323 }];
324
325 sort::by_fields(&mut files, &fields);
326
327 for (i, f) in ["a", "c", "b"].iter().enumerate() {
328 assert_eq!(&files[i].name, *f);
329 }
330 }
331
332 #[test]
333 fn received_bytes_desc() {
334 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
335 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
336 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
337
338 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
339 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
340 field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
341 direction: sort::Direction::Desc,
342 }];
343
344 sort::by_fields(&mut files, &fields);
345
346 for (i, f) in ["b", "c", "a"].iter().enumerate() {
347 assert_eq!(&files[i].name, *f);
348 }
349 }
350}