vector/api/schema/components/
sink.rs

1use std::cmp;
2
3use async_graphql::{Enum, InputObject, Object};
4
5use super::{source, state, transform, Component};
6use crate::{
7    api::schema::{
8        filter,
9        metrics::{self, IntoSinkMetrics},
10        sort,
11    },
12    config::{ComponentKey, Inputs, OutputId},
13    filter_check,
14};
15
16#[derive(Debug, Clone)]
17pub struct Data {
18    pub component_key: ComponentKey,
19    pub component_type: String,
20    pub inputs: Inputs<OutputId>,
21}
22
23#[derive(Debug, Clone)]
24pub struct Sink(pub Data);
25
26impl Sink {
27    pub const fn get_component_key(&self) -> &ComponentKey {
28        &self.0.component_key
29    }
30
31    pub fn get_component_type(&self) -> &str {
32        self.0.component_type.as_str()
33    }
34}
35
36#[derive(Default, InputObject)]
37pub struct SinksFilter {
38    component_id: Option<Vec<filter::StringFilter>>,
39    component_type: Option<Vec<filter::StringFilter>>,
40    or: Option<Vec<Self>>,
41}
42
43impl filter::CustomFilter<Sink> for SinksFilter {
44    fn matches(&self, sink: &Sink) -> bool {
45        filter_check!(
46            self.component_id.as_ref().map(|f| f
47                .iter()
48                .all(|f| f.filter_value(&sink.get_component_key().to_string()))),
49            self.component_type
50                .as_ref()
51                .map(|f| f.iter().all(|f| f.filter_value(sink.get_component_type())))
52        );
53        true
54    }
55
56    fn or(&self) -> Option<&Vec<Self>> {
57        self.or.as_ref()
58    }
59}
60
61#[derive(Enum, Copy, Clone, Eq, PartialEq)]
62pub enum SinksSortFieldName {
63    ComponentKey,
64    ComponentType,
65}
66
67impl sort::SortableByField<SinksSortFieldName> for Sink {
68    fn sort(&self, rhs: &Self, field: &SinksSortFieldName) -> cmp::Ordering {
69        match field {
70            SinksSortFieldName::ComponentKey => {
71                Ord::cmp(self.get_component_key(), rhs.get_component_key())
72            }
73            SinksSortFieldName::ComponentType => {
74                Ord::cmp(self.get_component_type(), rhs.get_component_type())
75            }
76        }
77    }
78}
79
80#[Object]
81impl Sink {
82    /// Sink component_id
83    pub async fn component_id(&self) -> &str {
84        self.get_component_key().id()
85    }
86
87    /// Sink type
88    pub async fn component_type(&self) -> &str {
89        self.get_component_type()
90    }
91
92    /// Source inputs
93    pub async fn sources(&self) -> Vec<source::Source> {
94        self.0
95            .inputs
96            .iter()
97            .filter_map(|output_id| match state::component_by_output_id(output_id) {
98                Some(Component::Source(s)) => Some(s),
99                _ => None,
100            })
101            .collect()
102    }
103
104    /// Transform inputs
105    pub async fn transforms(&self) -> Vec<transform::Transform> {
106        self.0
107            .inputs
108            .iter()
109            .filter_map(|output_id| match state::component_by_output_id(output_id) {
110                Some(Component::Transform(t)) => Some(t),
111                _ => None,
112            })
113            .collect()
114    }
115
116    /// Sink metrics
117    pub async fn metrics(&self) -> metrics::SinkMetrics {
118        metrics::by_component_key(self.get_component_key())
119            .into_sink_metrics(self.get_component_type())
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    fn sink_fixtures() -> Vec<Sink> {
128        vec![
129            Sink(Data {
130                component_key: ComponentKey::from("webserver"),
131                component_type: "http".to_string(),
132                inputs: Inputs::default(),
133            }),
134            Sink(Data {
135                component_key: ComponentKey::from("db"),
136                component_type: "clickhouse".to_string(),
137                inputs: Inputs::default(),
138            }),
139            Sink(Data {
140                component_key: ComponentKey::from("zip_drive"),
141                component_type: "file".to_string(),
142                inputs: Inputs::default(),
143            }),
144        ]
145    }
146
147    #[test]
148    fn sort_component_id_asc() {
149        let mut sinks = sink_fixtures();
150        let fields = vec![sort::SortField::<SinksSortFieldName> {
151            field: SinksSortFieldName::ComponentKey,
152            direction: sort::Direction::Asc,
153        }];
154        sort::by_fields(&mut sinks, &fields);
155
156        for (i, component_id) in ["db", "webserver", "zip_drive"].iter().enumerate() {
157            assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
158        }
159    }
160
161    #[test]
162    fn sort_component_id_desc() {
163        let mut sinks = sink_fixtures();
164        let fields = vec![sort::SortField::<SinksSortFieldName> {
165            field: SinksSortFieldName::ComponentKey,
166            direction: sort::Direction::Desc,
167        }];
168        sort::by_fields(&mut sinks, &fields);
169
170        for (i, component_id) in ["zip_drive", "webserver", "db"].iter().enumerate() {
171            assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
172        }
173    }
174
175    #[test]
176    fn sort_component_type_asc() {
177        let mut sinks = sink_fixtures();
178        let fields = vec![sort::SortField::<SinksSortFieldName> {
179            field: SinksSortFieldName::ComponentType,
180            direction: sort::Direction::Asc,
181        }];
182        sort::by_fields(&mut sinks, &fields);
183
184        for (i, component_id) in ["db", "zip_drive", "webserver"].iter().enumerate() {
185            assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
186        }
187    }
188
189    #[test]
190    fn sort_component_type_desc() {
191        let mut sinks = sink_fixtures();
192        let fields = vec![sort::SortField::<SinksSortFieldName> {
193            field: SinksSortFieldName::ComponentType,
194            direction: sort::Direction::Desc,
195        }];
196        sort::by_fields(&mut sinks, &fields);
197
198        for (i, component_id) in ["webserver", "zip_drive", "db"].iter().enumerate() {
199            assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
200        }
201    }
202}