vector/api/schema/components/
transform.rs

1use std::cmp;
2
3use async_graphql::{Enum, InputObject, Object};
4
5use super::{sink, source, state, Component};
6use crate::{
7    api::schema::{
8        filter,
9        metrics::{self, outputs_by_component_key, IntoTransformMetrics, Output},
10        sort,
11    },
12    config::{ComponentKey, Inputs, OutputId},
13    filter_check,
14};
15
16#[derive(Debug, Clone)]
17pub struct Data {
18    pub component_key: ComponentKey,
19    pub component_type: String,
20    pub inputs: Inputs<OutputId>,
21    pub outputs: Vec<String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct Transform(pub Data);
26
27impl Transform {
28    pub const fn get_component_key(&self) -> &ComponentKey {
29        &self.0.component_key
30    }
31    pub fn get_component_type(&self) -> &str {
32        self.0.component_type.as_str()
33    }
34    pub fn get_outputs(&self) -> &[String] {
35        self.0.outputs.as_ref()
36    }
37}
38
39#[derive(Enum, Copy, Clone, Eq, PartialEq)]
40pub enum TransformsSortFieldName {
41    ComponentKey,
42    ComponentType,
43}
44
45impl sort::SortableByField<TransformsSortFieldName> for Transform {
46    fn sort(&self, rhs: &Self, field: &TransformsSortFieldName) -> cmp::Ordering {
47        match field {
48            TransformsSortFieldName::ComponentKey => {
49                Ord::cmp(self.get_component_key(), rhs.get_component_key())
50            }
51            TransformsSortFieldName::ComponentType => {
52                Ord::cmp(self.get_component_type(), rhs.get_component_type())
53            }
54        }
55    }
56}
57
58#[Object]
59impl Transform {
60    /// Transform component_id
61    pub async fn component_id(&self) -> &str {
62        self.0.component_key.id()
63    }
64
65    /// Transform type
66    pub async fn component_type(&self) -> &str {
67        self.get_component_type()
68    }
69
70    /// Transform output streams
71    pub async fn outputs(&self) -> Vec<Output> {
72        outputs_by_component_key(self.get_component_key(), self.get_outputs())
73    }
74
75    /// Source inputs
76    pub async fn sources(&self) -> Vec<source::Source> {
77        self.0
78            .inputs
79            .iter()
80            .filter_map(|output_id| match state::component_by_output_id(output_id) {
81                Some(Component::Source(s)) => Some(s),
82                _ => None,
83            })
84            .collect()
85    }
86
87    /// Transform outputs
88    pub async fn transforms(&self) -> Vec<Transform> {
89        state::filter_components(|(_component_key, components)| match components {
90            Component::Transform(t)
91                if t.0.inputs.contains(&OutputId::from(&self.0.component_key)) =>
92            {
93                Some(t.clone())
94            }
95            _ => None,
96        })
97    }
98
99    /// Sink outputs
100    pub async fn sinks(&self) -> Vec<sink::Sink> {
101        state::filter_components(|(_component_key, components)| match components {
102            Component::Sink(s) if s.0.inputs.contains(&OutputId::from(&self.0.component_key)) => {
103                Some(s.clone())
104            }
105            _ => None,
106        })
107    }
108
109    /// Transform metrics
110    pub async fn metrics(&self) -> metrics::TransformMetrics {
111        metrics::by_component_key(&self.0.component_key)
112            .into_transform_metrics(self.get_component_type())
113    }
114}
115
116#[derive(Default, InputObject)]
117pub struct TransformsFilter {
118    component_id: Option<Vec<filter::StringFilter>>,
119    component_type: Option<Vec<filter::StringFilter>>,
120    or: Option<Vec<Self>>,
121}
122
123impl filter::CustomFilter<Transform> for TransformsFilter {
124    fn matches(&self, transform: &Transform) -> bool {
125        filter_check!(
126            self.component_id.as_ref().map(|f| f
127                .iter()
128                .all(|f| f.filter_value(&transform.get_component_key().to_string()))),
129            self.component_type.as_ref().map(|f| f
130                .iter()
131                .all(|f| f.filter_value(transform.get_component_type())))
132        );
133        true
134    }
135
136    fn or(&self) -> Option<&Vec<Self>> {
137        self.or.as_ref()
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144
145    fn transform_fixtures() -> Vec<Transform> {
146        vec![
147            Transform(Data {
148                component_key: ComponentKey::from("parse_json"),
149                component_type: "json".to_string(),
150                inputs: Inputs::default(),
151                outputs: vec![],
152            }),
153            Transform(Data {
154                component_key: ComponentKey::from("field_adder"),
155                component_type: "add_fields".to_string(),
156                inputs: Inputs::default(),
157                outputs: vec![],
158            }),
159            Transform(Data {
160                component_key: ComponentKey::from("append"),
161                component_type: "concat".to_string(),
162                inputs: Inputs::default(),
163                outputs: vec![],
164            }),
165        ]
166    }
167
168    #[test]
169    fn sort_component_id_asc() {
170        let mut transforms = transform_fixtures();
171        let fields = vec![sort::SortField::<TransformsSortFieldName> {
172            field: TransformsSortFieldName::ComponentKey,
173            direction: sort::Direction::Asc,
174        }];
175        sort::by_fields(&mut transforms, &fields);
176
177        for (i, component_id) in ["append", "field_adder", "parse_json"].iter().enumerate() {
178            assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
179        }
180    }
181
182    #[test]
183    fn sort_component_id_desc() {
184        let mut transforms = transform_fixtures();
185        let fields = vec![sort::SortField::<TransformsSortFieldName> {
186            field: TransformsSortFieldName::ComponentKey,
187            direction: sort::Direction::Desc,
188        }];
189        sort::by_fields(&mut transforms, &fields);
190
191        for (i, component_id) in ["parse_json", "field_adder", "append"].iter().enumerate() {
192            assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
193        }
194    }
195
196    #[test]
197    fn sort_component_type_asc() {
198        let mut transforms = transform_fixtures();
199        let fields = vec![sort::SortField::<TransformsSortFieldName> {
200            field: TransformsSortFieldName::ComponentType,
201            direction: sort::Direction::Asc,
202        }];
203        sort::by_fields(&mut transforms, &fields);
204
205        for (i, component_id) in ["field_adder", "append", "parse_json"].iter().enumerate() {
206            assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
207        }
208    }
209
210    #[test]
211    fn sort_component_type_desc() {
212        let mut transforms = transform_fixtures();
213        let fields = vec![sort::SortField::<TransformsSortFieldName> {
214            field: TransformsSortFieldName::ComponentType,
215            direction: sort::Direction::Desc,
216        }];
217        sort::by_fields(&mut transforms, &fields);
218
219        for (i, component_id) in ["parse_json", "append", "field_adder"].iter().enumerate() {
220            assert_eq!(transforms[i].get_component_key().to_string(), *component_id);
221        }
222    }
223}