vector/api/schema/metrics/source/
file.rs

1use std::{cmp::Ordering, collections::BTreeMap};
2
3use async_graphql::{Enum, InputObject, Object};
4
5use crate::{
6    api::schema::{
7        filter::{filter_items, CustomFilter, StringFilter},
8        metrics::{self, MetricsFilter},
9        relay, sort,
10    },
11    event::Metric,
12    filter_check,
13};
14
15#[derive(Clone)]
16pub struct FileSourceMetricFile<'a> {
17    name: String,
18    metrics: Vec<&'a Metric>,
19}
20
21impl<'a> FileSourceMetricFile<'a> {
22    /// Returns a new FileSourceMetricFile from a (name, Vec<&Metric>) tuple
23    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
24    fn from_tuple((name, metrics): (String, Vec<&'a Metric>)) -> Self {
25        Self { name, metrics }
26    }
27
28    pub fn get_name(&self) -> &str {
29        self.name.as_str()
30    }
31}
32
33#[Object]
34impl FileSourceMetricFile<'_> {
35    /// File name
36    async fn name(&self) -> &str {
37        &*self.name
38    }
39
40    /// Metric indicating bytes received for the current file
41    async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
42        self.metrics.received_bytes_total()
43    }
44
45    /// Metric indicating received events for the current file
46    async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
47        self.metrics.received_events_total()
48    }
49
50    /// Metric indicating outgoing events for the current file
51    async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
52        self.metrics.sent_events_total()
53    }
54}
55
56#[derive(Debug, Clone)]
57pub struct FileSourceMetrics(Vec<Metric>);
58
59impl FileSourceMetrics {
60    pub const fn new(metrics: Vec<Metric>) -> Self {
61        Self(metrics)
62    }
63
64    pub fn get_files(&self) -> Vec<FileSourceMetricFile<'_>> {
65        self.0
66            .iter()
67            .filter_map(|m| m.tag_value("file").map(|file| (file, m)))
68            .fold(
69                BTreeMap::new(),
70                |mut map: BTreeMap<String, Vec<&Metric>>, (file, m)| {
71                    map.entry(file).or_default().push(m);
72                    map
73                },
74            )
75            .into_iter()
76            .map(FileSourceMetricFile::from_tuple)
77            .collect()
78    }
79}
80
81#[derive(Enum, Copy, Clone, Eq, PartialEq)]
82pub enum FileSourceMetricFilesSortFieldName {
83    Name,
84    ReceivedBytesTotal,
85    ReceivedEventsTotal,
86    SentEventsTotal,
87}
88
89impl sort::SortableByField<FileSourceMetricFilesSortFieldName> for FileSourceMetricFile<'_> {
90    fn sort(&self, rhs: &Self, field: &FileSourceMetricFilesSortFieldName) -> Ordering {
91        match field {
92            FileSourceMetricFilesSortFieldName::Name => Ord::cmp(&self.name, &rhs.name),
93            FileSourceMetricFilesSortFieldName::ReceivedBytesTotal => Ord::cmp(
94                &self
95                    .metrics
96                    .received_bytes_total()
97                    .map(|m| m.get_received_bytes_total() as i64)
98                    .unwrap_or(0),
99                &rhs.metrics
100                    .received_bytes_total()
101                    .map(|m| m.get_received_bytes_total() as i64)
102                    .unwrap_or(0),
103            ),
104            FileSourceMetricFilesSortFieldName::ReceivedEventsTotal => Ord::cmp(
105                &self
106                    .metrics
107                    .received_events_total()
108                    .map(|m| m.get_received_events_total() as i64)
109                    .unwrap_or(0),
110                &rhs.metrics
111                    .received_events_total()
112                    .map(|m| m.get_received_events_total() as i64)
113                    .unwrap_or(0),
114            ),
115            FileSourceMetricFilesSortFieldName::SentEventsTotal => Ord::cmp(
116                &self
117                    .metrics
118                    .sent_events_total()
119                    .map(|m| m.get_sent_events_total() as i64)
120                    .unwrap_or(0),
121                &rhs.metrics
122                    .sent_events_total()
123                    .map(|m| m.get_sent_events_total() as i64)
124                    .unwrap_or(0),
125            ),
126        }
127    }
128}
129
130#[derive(Default, InputObject)]
131pub struct FileSourceMetricsFilesFilter {
132    name: Option<Vec<StringFilter>>,
133    or: Option<Vec<Self>>,
134}
135
136impl CustomFilter<FileSourceMetricFile<'_>> for FileSourceMetricsFilesFilter {
137    fn matches(&self, file: &FileSourceMetricFile<'_>) -> bool {
138        filter_check!(self
139            .name
140            .as_ref()
141            .map(|f| f.iter().all(|f| f.filter_value(file.get_name()))));
142        true
143    }
144
145    fn or(&self) -> Option<&Vec<Self>> {
146        self.or.as_ref()
147    }
148}
149
150#[allow(clippy::too_many_arguments)]
151#[Object]
152impl FileSourceMetrics {
153    /// File metrics
154    pub async fn files(
155        &self,
156        after: Option<String>,
157        before: Option<String>,
158        first: Option<i32>,
159        last: Option<i32>,
160        filter: Option<FileSourceMetricsFilesFilter>,
161        sort: Option<Vec<sort::SortField<FileSourceMetricFilesSortFieldName>>>,
162    ) -> relay::ConnectionResult<FileSourceMetricFile<'_>> {
163        let filter = filter.unwrap_or_default();
164        let mut files = filter_items(self.get_files().into_iter(), &filter);
165
166        if let Some(sort_fields) = sort {
167            sort::by_fields(&mut files, &sort_fields);
168        }
169
170        relay::query(
171            files.into_iter(),
172            relay::Params::new(after, before, first, last),
173            10,
174        )
175        .await
176    }
177
178    /// Total received bytes for the current file source
179    pub async fn received_bytes_total(&self) -> Option<metrics::ReceivedBytesTotal> {
180        self.0.received_bytes_total()
181    }
182
183    /// Total received events for the current file source
184    pub async fn received_events_total(&self) -> Option<metrics::ReceivedEventsTotal> {
185        self.0.received_events_total()
186    }
187
188    /// Total sent events for the current file source
189    pub async fn sent_events_total(&self) -> Option<metrics::SentEventsTotal> {
190        self.0.sent_events_total()
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use crate::{
198        api::schema::sort::SortField,
199        event::{MetricKind, MetricValue},
200    };
201
202    struct FileSourceMetricTest {
203        name: &'static str,
204        events_metric: Metric,
205        bytes_metric: Metric,
206    }
207
208    impl FileSourceMetricTest {
209        fn new(name: &'static str, events_processed: f64, bytes_processed: f64) -> Self {
210            Self {
211                name,
212                events_metric: metric("component_sent_events_total", events_processed),
213                bytes_metric: metric("component_received_bytes_total", bytes_processed),
214            }
215        }
216
217        fn get_metric(&self) -> FileSourceMetricFile {
218            FileSourceMetricFile::from_tuple((
219                self.name.to_string(),
220                vec![&self.bytes_metric, &self.events_metric],
221            ))
222        }
223    }
224
225    fn metric(name: &str, value: f64) -> Metric {
226        Metric::new(
227            name,
228            MetricKind::Incremental,
229            MetricValue::Counter { value },
230        )
231    }
232
233    fn by_name(name: &'static str) -> FileSourceMetricTest {
234        FileSourceMetricTest::new(name, 0.00, 0.00)
235    }
236
237    #[test]
238    fn sort_name_asc() {
239        let t1 = by_name("/path/to/file/2");
240        let t2 = by_name("/path/to/file/3");
241        let t3 = by_name("/path/to/file/1");
242
243        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
244        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
245            field: FileSourceMetricFilesSortFieldName::Name,
246            direction: sort::Direction::Asc,
247        }];
248
249        sort::by_fields(&mut files, &fields);
250
251        for (i, f) in ["1", "2", "3"].iter().enumerate() {
252            assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
253        }
254    }
255
256    #[test]
257    fn sort_name_desc() {
258        let t1 = by_name("/path/to/file/2");
259        let t2 = by_name("/path/to/file/3");
260        let t3 = by_name("/path/to/file/1");
261
262        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
263        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
264            field: FileSourceMetricFilesSortFieldName::Name,
265            direction: sort::Direction::Desc,
266        }];
267
268        sort::by_fields(&mut files, &fields);
269
270        for (i, f) in ["3", "2", "1"].iter().enumerate() {
271            assert_eq!(files[i].name.as_str(), format!("/path/to/file/{f}"));
272        }
273    }
274
275    #[test]
276    fn processed_events_asc() {
277        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
278        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
279        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
280
281        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
282        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
283            field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
284            direction: sort::Direction::Asc,
285        }];
286
287        sort::by_fields(&mut files, &fields);
288
289        for (i, f) in ["c", "b", "a"].iter().enumerate() {
290            assert_eq!(&files[i].name, *f);
291        }
292    }
293
294    #[test]
295    fn processed_events_desc() {
296        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
297        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
298        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
299
300        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
301        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
302            field: FileSourceMetricFilesSortFieldName::SentEventsTotal,
303            direction: sort::Direction::Desc,
304        }];
305
306        sort::by_fields(&mut files, &fields);
307
308        for (i, f) in ["a", "b", "c"].iter().enumerate() {
309            assert_eq!(&files[i].name, *f);
310        }
311    }
312
313    #[test]
314    fn received_bytes_asc() {
315        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
316        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
317        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
318
319        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
320        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
321            field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
322            direction: sort::Direction::Asc,
323        }];
324
325        sort::by_fields(&mut files, &fields);
326
327        for (i, f) in ["a", "c", "b"].iter().enumerate() {
328            assert_eq!(&files[i].name, *f);
329        }
330    }
331
332    #[test]
333    fn received_bytes_desc() {
334        let t1 = FileSourceMetricTest::new("a", 1000.00, 100.00);
335        let t2 = FileSourceMetricTest::new("b", 500.00, 300.00);
336        let t3 = FileSourceMetricTest::new("c", 250.00, 200.00);
337
338        let mut files = vec![t1.get_metric(), t2.get_metric(), t3.get_metric()];
339        let fields = vec![SortField::<FileSourceMetricFilesSortFieldName> {
340            field: FileSourceMetricFilesSortFieldName::ReceivedBytesTotal,
341            direction: sort::Direction::Desc,
342        }];
343
344        sort::by_fields(&mut files, &fields);
345
346        for (i, f) in ["b", "c", "a"].iter().enumerate() {
347            assert_eq!(&files[i].name, *f);
348        }
349    }
350}