vector/api/schema/components/
transform.rs1use std::cmp;
2
3use async_graphql::{Enum, InputObject, Object};
4
5use super::{sink, source, state, Component};
6use crate::{
7 api::schema::{
8 filter,
9 metrics::{self, outputs_by_component_key, IntoTransformMetrics, Output},
10 sort,
11 },
12 config::{ComponentKey, Inputs, OutputId},
13 filter_check,
14};
15
16#[derive(Debug, Clone)]
17pub struct Data {
18 pub component_key: ComponentKey,
19 pub component_type: String,
20 pub inputs: Inputs<OutputId>,
21 pub outputs: Vec<String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct Transform(pub Data);
26
27impl Transform {
28 pub const fn get_component_key(&self) -> &ComponentKey {
29 &self.0.component_key
30 }
31 pub fn get_component_type(&self) -> &str {
32 self.0.component_type.as_str()
33 }
34 pub fn get_outputs(&self) -> &[String] {
35 self.0.outputs.as_ref()
36 }
37}
38
39#[derive(Enum, Copy, Clone, Eq, PartialEq)]
40pub enum TransformsSortFieldName {
41 ComponentKey,
42 ComponentType,
43}
44
45impl sort::SortableByField<TransformsSortFieldName> for Transform {
46 fn sort(&self, rhs: &Self, field: &TransformsSortFieldName) -> cmp::Ordering {
47 match field {
48 TransformsSortFieldName::ComponentKey => {
49 Ord::cmp(self.get_component_key(), rhs.get_component_key())
50 }
51 TransformsSortFieldName::ComponentType => {
52 Ord::cmp(self.get_component_type(), rhs.get_component_type())
53 }
54 }
55 }
56}
57
58#[Object]
59impl Transform {
60 pub async fn component_id(&self) -> &str {
62 self.0.component_key.id()
63 }
64
65 pub async fn component_type(&self) -> &str {
67 self.get_component_type()
68 }
69
70 pub async fn outputs(&self) -> Vec<Output> {
72 outputs_by_component_key(self.get_component_key(), self.get_outputs())
73 }
74
75 pub async fn sources(&self) -> Vec<source::Source> {
77 self.0
78 .inputs
79 .iter()
80 .filter_map(|output_id| match state::component_by_output_id(output_id) {
81 Some(Component::Source(s)) => Some(s),
82 _ => None,
83 })
84 .collect()
85 }
86
87 pub async fn transforms(&self) -> Vec<Transform> {
89 state::filter_components(|(_component_key, components)| match components {
90 Component::Transform(t)
91 if t.0.inputs.contains(&OutputId::from(&self.0.component_key)) =>
92 {
93 Some(t.clone())
94 }
95 _ => None,
96 })
97 }
98
99 pub async fn sinks(&self) -> Vec<sink::Sink> {
101 state::filter_components(|(_component_key, components)| match components {
102 Component::Sink(s) if s.0.inputs.contains(&OutputId::from(&self.0.component_key)) => {
103 Some(s.clone())
104 }
105 _ => None,
106 })
107 }
108
109 pub async fn metrics(&self) -> metrics::TransformMetrics {
111 metrics::by_component_key(&self.0.component_key)
112 .into_transform_metrics(self.get_component_type())
113 }
114}
115
116#[derive(Default, InputObject)]
117pub struct TransformsFilter {
118 component_id: Option<Vec<filter::StringFilter>>,
119 component_type: Option<Vec<filter::StringFilter>>,
120 or: Option<Vec<Self>>,
121}
122
123impl filter::CustomFilter<Transform> for TransformsFilter {
124 fn matches(&self, transform: &Transform) -> bool {
125 filter_check!(
126 self.component_id.as_ref().map(|f| f
127 .iter()
128 .all(|f| f.filter_value(&transform.get_component_key().to_string()))),
129 self.component_type.as_ref().map(|f| f
130 .iter()
131 .all(|f| f.filter_value(transform.get_component_type())))
132 );
133 true
134 }
135
136 fn or(&self) -> Option<&Vec<Self>> {
137 self.or.as_ref()
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144
145 fn transform_fixtures() -> Vec<Transform> {
146 vec![
147 Transform(Data {
148 component_key: ComponentKey::from("parse_json"),
149 component_type: "json".to_string(),
150 inputs: Inputs::default(),
151 outputs: vec![],
152 }),
153 Transform(Data {
154 component_key: ComponentKey::from("field_adder"),
155 component_type: "add_fields".to_string(),
156 inputs: Inputs::default(),
157 outputs: vec![],
158 }),
159 Transform(Data {
160 component_key: ComponentKey::from("append"),
161 component_type: "concat".to_string(),
162 inputs: Inputs::default(),
163 outputs: vec![],
164 }),
165 ]
166 }
167
168 #[test]
169 fn sort_component_id_asc() {
170 let mut transforms = transform_fixtures();
171 let fields = vec![sort::SortField::<TransformsSortFieldName> {
172 field: TransformsSortFieldName::ComponentKey,
173 direction: sort::Direction::Asc,
174 }];
175 sort::by_fields(&mut transforms, &fields);
176
177 for (i, component_id) in ["append", "field_adder", "parse_json"].iter().enumerate() {
178 assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
179 }
180 }
181
182 #[test]
183 fn sort_component_id_desc() {
184 let mut transforms = transform_fixtures();
185 let fields = vec![sort::SortField::<TransformsSortFieldName> {
186 field: TransformsSortFieldName::ComponentKey,
187 direction: sort::Direction::Desc,
188 }];
189 sort::by_fields(&mut transforms, &fields);
190
191 for (i, component_id) in ["parse_json", "field_adder", "append"].iter().enumerate() {
192 assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
193 }
194 }
195
196 #[test]
197 fn sort_component_type_asc() {
198 let mut transforms = transform_fixtures();
199 let fields = vec![sort::SortField::<TransformsSortFieldName> {
200 field: TransformsSortFieldName::ComponentType,
201 direction: sort::Direction::Asc,
202 }];
203 sort::by_fields(&mut transforms, &fields);
204
205 for (i, component_id) in ["field_adder", "append", "parse_json"].iter().enumerate() {
206 assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
207 }
208 }
209
210 #[test]
211 fn sort_component_type_desc() {
212 let mut transforms = transform_fixtures();
213 let fields = vec![sort::SortField::<TransformsSortFieldName> {
214 field: TransformsSortFieldName::ComponentType,
215 direction: sort::Direction::Desc,
216 }];
217 sort::by_fields(&mut transforms, &fields);
218
219 for (i, component_id) in ["parse_json", "append", "field_adder"].iter().enumerate() {
220 assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
221 }
222 }
223}