1use std::{cmp::Ordering, collections::BTreeMap};
2
3use async_graphql::{Enum, InputObject, Object};
4
5use crate::{
6 api::schema::{
7 filter::{CustomFilter, StringFilter, filter_items},
8 metrics::{self, MetricsFilter},
9 relay, sort,
10 },
11 event::Metric,
12 filter_check,
13};
14
15#[derive(Clone)]
16pub struct FileSourceMetricFile<'a> {
17 name: String,
18 metrics: Vec<&'a Metric>,
19}
20
21impl<'a> FileSourceMetricFile<'a> {
22 #[allow(clippy::missing_const_for_fn)] fn from_tuple((name, metrics): (String, Vec<&'a Metric>)) -> Self {
25 Self { name, metrics }
26 }
27
28 pub const fn get_name(&self) -> &str {
29 self.name.as_str()
30 }
31}
32
33#[Object]
34impl FileSourceMetricFile<'_> {
35 async fn name(&self) -> &str {
37 &*self.name
38 }
39
40 async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
42 self.metrics.received_bytes_total()
43 }
44
45 async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
47 self.metrics.received_events_total()
48 }
49
50 async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
52 self.metrics.sent_events_total()
53 }
54}
55
56#[derive(Debug, Clone)]
57pub struct FileSourceMetrics(Vec<Metric>);
58
59impl FileSourceMetrics {
60 pub const fn new(metrics: Vec<Metric>) -> Self {
61 Self(metrics)
62 }
63
64 pub fn get_files(&self) -> Vec<FileSourceMetricFile<'_>> {
65 self.0
66 .iter()
67 .filter_map(|m| m.tag_value("file").map(|file| (file, m)))
68 .fold(
69 BTreeMap::new(),
70 |mut map: BTreeMap<String, Vec<&Metric>>, (file, m)| {
71 map.entry(file).or_default().push(m);
72 map
73 },
74 )
75 .into_iter()
76 .map(FileSourceMetricFile::from_tuple)
77 .collect()
78 }
79}
80
81#[derive(Enum, Copy, Clone, Eq, PartialEq)]
82pub enum FileSourceMetricFilesSortFieldName {
83 Name,
84 ReceivedBytesTotal,
85 ReceivedEventsTotal,
86 SentEventsTotal,
87}
88
89impl sort::SortableByField<FileSourceMetricFilesSortFieldName> for FileSourceMetricFile<'_> {
90 fn sort(&self, rhs: &Self, field: &FileSourceMetricFilesSortFieldName) -> Ordering {
91 match field {
92 FileSourceMetricFilesSortFieldName::Name => Ord::cmp(&self.name, &rhs.name),
93 FileSourceMetricFilesSortFieldName::ReceivedBytesTotal => Ord::cmp(
94 &self
95 .metrics
96 .received_bytes_total()
97 .map(|m| m.get_received_bytes_total() as i64)
98 .unwrap_or(0),
99 &rhs.metrics
100 .received_bytes_total()
101 .map(|m| m.get_received_bytes_total() as i64)
102 .unwrap_or(0),
103 ),
104 FileSourceMetricFilesSortFieldName::ReceivedEventsTotal => Ord::cmp(
105 &self
106 .metrics
107 .received_events_total()
108 .map(|m| m.get_received_events_total() as i64)
109 .unwrap_or(0),
110 &rhs.metrics
111 .received_events_total()
112 .map(|m| m.get_received_events_total() as i64)
113 .unwrap_or(0),
114 ),
115 FileSourceMetricFilesSortFieldName::SentEventsTotal => Ord::cmp(
116 &self
117 .metrics
118 .sent_events_total()
119 .map(|m| m.get_sent_events_total() as i64)
120 .unwrap_or(0),
121 &rhs.metrics
122 .sent_events_total()
123 .map(|m| m.get_sent_events_total() as i64)
124 .unwrap_or(0),
125 ),
126 }
127 }
128}
129
130#[derive(Default, InputObject)]
131pub struct FileSourceMetricsFilesFilter {
132 name: Option<Vec<StringFilter>>,
133 or: Option<Vec<Self>>,
134}
135
136impl CustomFilter<FileSourceMetricFile<'_>> for FileSourceMetricsFilesFilter {
137 fn matches(&self, file: &FileSourceMetricFile<'_>) -> bool {
138 filter_check!(
139 self.name
140 .as_ref()
141 .map(|f| f.iter().all(|f| f.filter_value(file.get_name())))
142 );
143 true
144 }
145
146 fn or(&self) -> Option<&Vec<Self>> {
147 self.or.as_ref()
148 }
149}
150
151#[allow(clippy::too_many_arguments)]
152#[Object]
153impl FileSourceMetrics {
154 pub async fn files(
156 &self,
157 after: Option<String>,
158 before: Option<String>,
159 first: Option<i32>,
160 last: Option<i32>,
161 filter: Option<FileSourceMetricsFilesFilter>,
162 sort: Option<Vec<sort::SortField<FileSourceMetricFilesSortFieldName>>>,
163 ) -> relay::ConnectionResult<FileSourceMetricFile<'_>> {
164 let filter = filter.unwrap_or_default();
165 let mut files = filter_items(self.get_files().into_iter(), &filter);
166
167 if let Some(sort_fields) = sort {
168 sort::by_fields(&mut files, &sort_fields);
169 }
170
171 relay::query(
172 files.into_iter(),
173 relay::Params::new(after, before, first, last),
174 10,
175 )
176 .await
177 }
178
179 pub async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
181 self.0.received_bytes_total()
182 }
183
184 pub async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
186 self.0.received_events_total()
187 }
188
189 pub async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
191 self.0.sent_events_total()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use crate::{
199 api::schema::sort::SortField,
200 event::{MetricKind, MetricValue},
201 };
202
203 struct FileSourceMetricTest {
204 name: &'static str,
205 events_metric: Metric,
206 bytes_metric: Metric,
207 }
208
209 impl FileSourceMetricTest {
210 fn new(name: &'static str, events_processed: f64, bytes_processed: f64) -> Self {
211 Self {
212 name,
213 events_metric: metric("component_sent_events_total", events_processed),
214 bytes_metric: metric("component_received_bytes_total", bytes_processed),
215 }
216 }
217
218 fn get_metric(&self) -> FileSourceMetricFile<'_> {
219 FileSourceMetricFile::from_tuple((
220 self.name.to_string(),
221 vec![&self.bytes_metric, &self.events_metric],
222 ))
223 }
224 }
225
226 fn metric(name: &str, value: f64) -> Metric {
227 Metric::new(
228 name,
229 MetricKind::Incremental,
230 MetricValue::Counter { value },
231 )
232 }
233
234 fn by_name(name: &'static str) -> FileSourceMetricTest {
235 FileSourceMetricTest::new(name, 0.00, 0.00)
236 }
237
238 #[test]
239 fn sort_name_asc() {
240 let t1 = by_name("/path/to/file/2");
241 let t2 = by_name("/path/to/file/3");
242 let t3 = by_name("/path/to/file/1");
243
244 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
245 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
246 field: FileSourceMetricFilesSortFieldName::Name,
247 direction: sort::Direction::Asc,
248 }];
249
250 sort::by_fields(&mut files, &fields);
251
252 for (i, f) in ["1", "2", "3"].iter().enumerate() {
253 assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
254 }
255 }
256
257 #[test]
258 fn sort_name_desc() {
259 let t1 = by_name("/path/to/file/2");
260 let t2 = by_name("/path/to/file/3");
261 let t3 = by_name("/path/to/file/1");
262
263 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
264 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
265 field: FileSourceMetricFilesSortFieldName::Name,
266 direction: sort::Direction::Desc,
267 }];
268
269 sort::by_fields(&mut files, &fields);
270
271 for (i, f) in ["3", "2", "1"].iter().enumerate() {
272 assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
273 }
274 }
275
276 #[test]
277 fn processed_events_asc() {
278 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
279 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
280 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
281
282 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
283 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
284 field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
285 direction: sort::Direction::Asc,
286 }];
287
288 sort::by_fields(&mut files, &fields);
289
290 for (i, f) in ["c", "b", "a"].iter().enumerate() {
291 assert_eq!(&files[i].name, *f);
292 }
293 }
294
295 #[test]
296 fn processed_events_desc() {
297 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
298 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
299 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
300
301 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
302 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
303 field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
304 direction: sort::Direction::Desc,
305 }];
306
307 sort::by_fields(&mut files, &fields);
308
309 for (i, f) in ["a", "b", "c"].iter().enumerate() {
310 assert_eq!(&files[i].name, *f);
311 }
312 }
313
314 #[test]
315 fn received_bytes_asc() {
316 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
317 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
318 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
319
320 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
321 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
322 field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
323 direction: sort::Direction::Asc,
324 }];
325
326 sort::by_fields(&mut files, &fields);
327
328 for (i, f) in ["a", "c", "b"].iter().enumerate() {
329 assert_eq!(&files[i].name, *f);
330 }
331 }
332
333 #[test]
334 fn received_bytes_desc() {
335 let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
336 let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
337 let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
338
339 let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
340 let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
341 field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
342 direction: sort::Direction::Desc,
343 }];
344
345 sort::by_fields(&mut files, &fields);
346
347 for (i, f) in ["b", "c", "a"].iter().enumerate() {
348 assert_eq!(&files[i].name, *f);
349 }
350 }
351}