vector/api/schema/components/
sink.rs1use std::cmp;
2
3use async_graphql::{Enum, InputObject, Object};
4
5use super::{source, state, transform, Component};
6use crate::{
7 api::schema::{
8 filter,
9 metrics::{self, IntoSinkMetrics},
10 sort,
11 },
12 config::{ComponentKey, Inputs, OutputId},
13 filter_check,
14};
15
16#[derive(Debug, Clone)]
17pub struct Data {
18 pub component_key: ComponentKey,
19 pub component_type: String,
20 pub inputs: Inputs<OutputId>,
21}
22
23#[derive(Debug, Clone)]
24pub struct Sink(pub Data);
25
26impl Sink {
27 pub const fn get_component_key(&self) -> &ComponentKey {
28 &self.0.component_key
29 }
30
31 pub fn get_component_type(&self) -> &str {
32 self.0.component_type.as_str()
33 }
34}
35
36#[derive(Default, InputObject)]
37pub struct SinksFilter {
38 component_id: Option<Vec<filter::StringFilter>>,
39 component_type: Option<Vec<filter::StringFilter>>,
40 or: Option<Vec<Self>>,
41}
42
43impl filter::CustomFilter<Sink> for SinksFilter {
44 fn matches(&self, sink: &Sink) -> bool {
45 filter_check!(
46 self.component_id.as_ref().map(|f| f
47 .iter()
48 .all(|f| f.filter_value(&sink.get_component_key().to_string()))),
49 self.component_type
50 .as_ref()
51 .map(|f| f.iter().all(|f| f.filter_value(sink.get_component_type())))
52 );
53 true
54 }
55
56 fn or(&self) -> Option<&Vec<Self>> {
57 self.or.as_ref()
58 }
59}
60
61#[derive(Enum, Copy, Clone, Eq, PartialEq)]
62pub enum SinksSortFieldName {
63 ComponentKey,
64 ComponentType,
65}
66
67impl sort::SortableByField<SinksSortFieldName> for Sink {
68 fn sort(&self, rhs: &Self, field: &SinksSortFieldName) -> cmp::Ordering {
69 match field {
70 SinksSortFieldName::ComponentKey => {
71 Ord::cmp(self.get_component_key(), rhs.get_component_key())
72 }
73 SinksSortFieldName::ComponentType => {
74 Ord::cmp(self.get_component_type(), rhs.get_component_type())
75 }
76 }
77 }
78}
79
80#[Object]
81impl Sink {
82 pub async fn component_id(&self) -> &str {
84 self.get_component_key().id()
85 }
86
87 pub async fn component_type(&self) -> &str {
89 self.get_component_type()
90 }
91
92 pub async fn sources(&self) -> Vec<source::Source> {
94 self.0
95 .inputs
96 .iter()
97 .filter_map(|output_id| match state::component_by_output_id(output_id) {
98 Some(Component::Source(s)) => Some(s),
99 _ => None,
100 })
101 .collect()
102 }
103
104 pub async fn transforms(&self) -> Vec<transform::Transform> {
106 self.0
107 .inputs
108 .iter()
109 .filter_map(|output_id| match state::component_by_output_id(output_id) {
110 Some(Component::Transform(t)) => Some(t),
111 _ => None,
112 })
113 .collect()
114 }
115
116 pub async fn metrics(&self) -> metrics::SinkMetrics {
118 metrics::by_component_key(self.get_component_key())
119 .into_sink_metrics(self.get_component_type())
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 fn sink_fixtures() -> Vec<Sink> {
128 vec![
129 Sink(Data {
130 component_key: ComponentKey::from("webserver"),
131 component_type: "http".to_string(),
132 inputs: Inputs::default(),
133 }),
134 Sink(Data {
135 component_key: ComponentKey::from("db"),
136 component_type: "clickhouse".to_string(),
137 inputs: Inputs::default(),
138 }),
139 Sink(Data {
140 component_key: ComponentKey::from("zip_drive"),
141 component_type: "file".to_string(),
142 inputs: Inputs::default(),
143 }),
144 ]
145 }
146
147 #[test]
148 fn sort_component_id_asc() {
149 let mut sinks = sink_fixtures();
150 let fields = vec![sort::SortField::<SinksSortFieldName> {
151 field: SinksSortFieldName::ComponentKey,
152 direction: sort::Direction::Asc,
153 }];
154 sort::by_fields(&mut sinks, &fields);
155
156 for (i, component_id) in ["db", "webserver", "zip_drive"].iter().enumerate() {
157 assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
158 }
159 }
160
161 #[test]
162 fn sort_component_id_desc() {
163 let mut sinks = sink_fixtures();
164 let fields = vec![sort::SortField::<SinksSortFieldName> {
165 field: SinksSortFieldName::ComponentKey,
166 direction: sort::Direction::Desc,
167 }];
168 sort::by_fields(&mut sinks, &fields);
169
170 for (i, component_id) in ["zip_drive", "webserver", "db"].iter().enumerate() {
171 assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
172 }
173 }
174
175 #[test]
176 fn sort_component_type_asc() {
177 let mut sinks = sink_fixtures();
178 let fields = vec![sort::SortField::<SinksSortFieldName> {
179 field: SinksSortFieldName::ComponentType,
180 direction: sort::Direction::Asc,
181 }];
182 sort::by_fields(&mut sinks, &fields);
183
184 for (i, component_id) in ["db", "zip_drive", "webserver"].iter().enumerate() {
185 assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
186 }
187 }
188
189 #[test]
190 fn sort_component_type_desc() {
191 let mut sinks = sink_fixtures();
192 let fields = vec![sort::SortField::<SinksSortFieldName> {
193 field: SinksSortFieldName::ComponentType,
194 direction: sort::Direction::Desc,
195 }];
196 sort::by_fields(&mut sinks, &fields);
197
198 for (i, component_id) in ["webserver", "zip_drive", "db"].iter().enumerate() {
199 assert_eq!(sinks[i].get_component_key().to_string(), *component_id);
200 }
201 }
202}