vector/api/schema/metrics/source/
file.rs

1use std::{cmp::Ordering, collections::BTreeMap};
2
3use async_graphql::{Enum, InputObject, Object};
4
5use crate::{
6    api::schema::{
7        filter::{CustomFilter, StringFilter, filter_items},
8        metrics::{self, MetricsFilter},
9        relay, sort,
10    },
11    event::Metric,
12    filter_check,
13};
14
15#[derive(Clone)]
16pub struct FileSourceMetricFile<'a> {
17    name: String,
18    metrics: Vec<&'a Metric>,
19}
20
21impl<'a> FileSourceMetricFile<'a> {
22    /// Returns a new FileSourceMetricFile from a (name, Vec<&Metric>) tuple
23    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
24    fn from_tuple((name, metrics): (String, Vec<&'a Metric>)) -> Self {
25        Self { name, metrics }
26    }
27
28    pub const fn get_name(&self) -> &str {
29        self.name.as_str()
30    }
31}
32
33#[Object]
34impl FileSourceMetricFile<'_> {
35    /// File name
36    async fn name(&self) -> &str {
37        &*self.name
38    }
39
40    /// Metric indicating bytes received for the current file
41    async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
42        self.metrics.received_bytes_total()
43    }
44
45    /// Metric indicating received events for the current file
46    async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
47        self.metrics.received_events_total()
48    }
49
50    /// Metric indicating outgoing events for the current file
51    async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
52        self.metrics.sent_events_total()
53    }
54}
55
56#[derive(Debug, Clone)]
57pub struct FileSourceMetrics(Vec<Metric>);
58
59impl FileSourceMetrics {
60    pub const fn new(metrics: Vec<Metric>) -> Self {
61        Self(metrics)
62    }
63
64    pub fn get_files(&self) -> Vec<FileSourceMetricFile<'_>> {
65        self.0
66            .iter()
67            .filter_map(|m| m.tag_value("file").map(|file| (file, m)))
68            .fold(
69                BTreeMap::new(),
70                |mut map: BTreeMap<String, Vec<&Metric>>, (file, m)| {
71                    map.entry(file).or_default().push(m);
72                    map
73                },
74            )
75            .into_iter()
76            .map(FileSourceMetricFile::from_tuple)
77            .collect()
78    }
79}
80
81#[derive(Enum, Copy, Clone, Eq, PartialEq)]
82pub enum FileSourceMetricFilesSortFieldName {
83    Name,
84    ReceivedBytesTotal,
85    ReceivedEventsTotal,
86    SentEventsTotal,
87}
88
89impl sort::SortableByField<FileSourceMetricFilesSortFieldName> for FileSourceMetricFile<'_> {
90    fn sort(&self, rhs: &Self, field: &FileSourceMetricFilesSortFieldName) -> Ordering {
91        match field {
92            FileSourceMetricFilesSortFieldName::Name => Ord::cmp(&self.name, &rhs.name),
93            FileSourceMetricFilesSortFieldName::ReceivedBytesTotal => Ord::cmp(
94                &self
95                    .metrics
96                    .received_bytes_total()
97                    .map(|m| m.get_received_bytes_total() as i64)
98                    .unwrap_or(0),
99                &rhs.metrics
100                    .received_bytes_total()
101                    .map(|m| m.get_received_bytes_total() as i64)
102                    .unwrap_or(0),
103            ),
104            FileSourceMetricFilesSortFieldName::ReceivedEventsTotal => Ord::cmp(
105                &self
106                    .metrics
107                    .received_events_total()
108                    .map(|m| m.get_received_events_total() as i64)
109                    .unwrap_or(0),
110                &rhs.metrics
111                    .received_events_total()
112                    .map(|m| m.get_received_events_total() as i64)
113                    .unwrap_or(0),
114            ),
115            FileSourceMetricFilesSortFieldName::SentEventsTotal => Ord::cmp(
116                &self
117                    .metrics
118                    .sent_events_total()
119                    .map(|m| m.get_sent_events_total() as i64)
120                    .unwrap_or(0),
121                &rhs.metrics
122                    .sent_events_total()
123                    .map(|m| m.get_sent_events_total() as i64)
124                    .unwrap_or(0),
125            ),
126        }
127    }
128}
129
130#[derive(Default, InputObject)]
131pub struct FileSourceMetricsFilesFilter {
132    name: Option<Vec<StringFilter>>,
133    or: Option<Vec<Self>>,
134}
135
136impl CustomFilter<FileSourceMetricFile<'_>> for FileSourceMetricsFilesFilter {
137    fn matches(&self, file: &FileSourceMetricFile<'_>) -> bool {
138        filter_check!(
139            self.name
140                .as_ref()
141                .map(|f| f.iter().all(|f| f.filter_value(file.get_name())))
142        );
143        true
144    }
145
146    fn or(&self) -> Option<&Vec<Self>> {
147        self.or.as_ref()
148    }
149}
150
151#[allow(clippy::too_many_arguments)]
152#[Object]
153impl FileSourceMetrics {
154    /// File metrics
155    pub async fn files(
156        &self,
157        after: Option<String>,
158        before: Option<String>,
159        first: Option<i32>,
160        last: Option<i32>,
161        filter: Option<FileSourceMetricsFilesFilter>,
162        sort: Option<Vec<sort::SortField<FileSourceMetricFilesSortFieldName>>>,
163    ) -> relay::ConnectionResult<FileSourceMetricFile<'_>> {
164        let filter = filter.unwrap_or_default();
165        let mut files = filter_items(self.get_files().into_iter(), &filter);
166
167        if let Some(sort_fields) = sort {
168            sort::by_fields(&mut files, &sort_fields);
169        }
170
171        relay::query(
172            files.into_iter(),
173            relay::Params::new(after, before, first, last),
174            10,
175        )
176        .await
177    }
178
179    /// Total received bytes for the current file source
180    pub async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
181        self.0.received_bytes_total()
182    }
183
184    /// Total received events for the current file source
185    pub async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
186        self.0.received_events_total()
187    }
188
189    /// Total sent events for the current file source
190    pub async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
191        self.0.sent_events_total()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use crate::{
199        api::schema::sort::SortField,
200        event::{MetricKind, MetricValue},
201    };
202
203    struct FileSourceMetricTest {
204        name: &'static str,
205        events_metric: Metric,
206        bytes_metric: Metric,
207    }
208
209    impl FileSourceMetricTest {
210        fn new(name: &'static str, events_processed: f64, bytes_processed: f64) -> Self {
211            Self {
212                name,
213                events_metric: metric("component_sent_events_total", events_processed),
214                bytes_metric: metric("component_received_bytes_total", bytes_processed),
215            }
216        }
217
218        fn get_metric(&self) -> FileSourceMetricFile<'_> {
219            FileSourceMetricFile::from_tuple((
220                self.name.to_string(),
221                vec![&self.bytes_metric, &self.events_metric],
222            ))
223        }
224    }
225
226    fn metric(name: &str, value: f64) -> Metric {
227        Metric::new(
228            name,
229            MetricKind::Incremental,
230            MetricValue::Counter { value },
231        )
232    }
233
234    fn by_name(name: &'static str) -> FileSourceMetricTest {
235        FileSourceMetricTest::new(name, 0.00, 0.00)
236    }
237
238    #[test]
239    fn sort_name_asc() {
240        let t1 = by_name("/path/to/file/2");
241        let t2 = by_name("/path/to/file/3");
242        let t3 = by_name("/path/to/file/1");
243
244        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
245        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
246            field: FileSourceMetricFilesSortFieldName::Name,
247            direction: sort::Direction::Asc,
248        }];
249
250        sort::by_fields(&mut files, &fields);
251
252        for (i, f) in ["1", "2", "3"].iter().enumerate() {
253            assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
254        }
255    }
256
257    #[test]
258    fn sort_name_desc() {
259        let t1 = by_name("/path/to/file/2");
260        let t2 = by_name("/path/to/file/3");
261        let t3 = by_name("/path/to/file/1");
262
263        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
264        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
265            field: FileSourceMetricFilesSortFieldName::Name,
266            direction: sort::Direction::Desc,
267        }];
268
269        sort::by_fields(&mut files, &fields);
270
271        for (i, f) in ["3", "2", "1"].iter().enumerate() {
272            assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
273        }
274    }
275
276    #[test]
277    fn processed_events_asc() {
278        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
279        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
280        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
281
282        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
283        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
284            field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
285            direction: sort::Direction::Asc,
286        }];
287
288        sort::by_fields(&mut files, &fields);
289
290        for (i, f) in ["c", "b", "a"].iter().enumerate() {
291            assert_eq!(&files[i].name, *f);
292        }
293    }
294
295    #[test]
296    fn processed_events_desc() {
297        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
298        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
299        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
300
301        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
302        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
303            field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
304            direction: sort::Direction::Desc,
305        }];
306
307        sort::by_fields(&mut files, &fields);
308
309        for (i, f) in ["a", "b", "c"].iter().enumerate() {
310            assert_eq!(&files[i].name, *f);
311        }
312    }
313
314    #[test]
315    fn received_bytes_asc() {
316        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
317        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
318        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
319
320        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
321        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
322            field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
323            direction: sort::Direction::Asc,
324        }];
325
326        sort::by_fields(&mut files, &fields);
327
328        for (i, f) in ["a", "c", "b"].iter().enumerate() {
329            assert_eq!(&files[i].name, *f);
330        }
331    }
332
333    #[test]
334    fn received_bytes_desc() {
335        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
336        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
337        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
338
339        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
340        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
341            field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
342            direction: sort::Direction::Desc,
343        }];
344
345        sort::by_fields(&mut files, &fields);
346
347        for (i, f) in ["b", "c", "a"].iter().enumerate() {
348            assert_eq!(&files[i].name, *f);
349        }
350    }
351}