vector_api_client/gql/
components.rs

1use std::fmt;
2
3use graphql_client::GraphQLQuery;
4
5use crate::{BoxedSubscription, QueryResult};
6
7/// Components query for returning sources, transforms, and sinks
8#[derive(GraphQLQuery, Debug, Copy, Clone)]
9#[graphql(
10    schema_path = "graphql/schema.json",
11    query_path = "graphql/queries/components.graphql",
12    response_derives = "Debug"
13)]
14pub struct ComponentsQuery;
15
16/// Components subscription for notification when a component has been added
17#[derive(GraphQLQuery, Debug, Copy, Clone)]
18#[graphql(
19    schema_path = "graphql/schema.json",
20    query_path = "graphql/subscriptions/component_added.graphql",
21    response_derives = "Debug"
22)]
23pub struct ComponentAddedSubscription;
24
25/// Components subscription for notification when a component has been removed
26#[derive(GraphQLQuery, Debug, Copy, Clone)]
27#[graphql(
28    schema_path = "graphql/schema.json",
29    query_path = "graphql/subscriptions/component_removed.graphql",
30    response_derives = "Debug"
31)]
32pub struct ComponentRemovedSubscription;
33
34pub trait ComponentsQueryExt {
35    async fn components_query(&self, first: i64) -> crate::QueryResult<ComponentsQuery>;
36}
37
38impl ComponentsQueryExt for crate::Client {
39    async fn components_query(&self, first: i64) -> QueryResult<ComponentsQuery> {
40        let request_body = ComponentsQuery::build_query(components_query::Variables { first });
41        self.query::<ComponentsQuery>(&request_body).await
42    }
43}
44
45pub trait ComponentsSubscriptionExt {
46    fn component_added(&self) -> crate::BoxedSubscription<ComponentAddedSubscription>;
47    fn component_removed(&self) -> crate::BoxedSubscription<ComponentRemovedSubscription>;
48}
49
50impl ComponentsSubscriptionExt for crate::SubscriptionClient {
51    /// Subscription for when a component has been added
52    fn component_added(&self) -> BoxedSubscription<ComponentAddedSubscription> {
53        let request_body =
54            ComponentAddedSubscription::build_query(component_added_subscription::Variables);
55
56        self.start::<ComponentAddedSubscription>(&request_body)
57    }
58
59    /// Subscription for when a component has been removed
60    fn component_removed(&self) -> BoxedSubscription<ComponentRemovedSubscription> {
61        let request_body =
62            ComponentRemovedSubscription::build_query(component_removed_subscription::Variables);
63
64        self.start::<ComponentRemovedSubscription>(&request_body)
65    }
66}
67
68impl components_query::ComponentsQueryComponentsEdgesNodeOn {
69    pub fn received_bytes_total(&self) -> i64 {
70        // This is network bytes received, and only sources can receive events.
71        match self {
72            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s
73                .metrics
74                .received_bytes_total
75                .as_ref()
76                .map(|p| p.received_bytes_total as i64)
77                .unwrap_or(0),
78            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(_) => 0,
79            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(_) => 0,
80        }
81    }
82
83    pub fn received_events_total(&self) -> i64 {
84        match self {
85            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s
86                .metrics
87                .received_events_total
88                .as_ref()
89                .map(|p| p.received_events_total as i64)
90                .unwrap_or(0),
91            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(t) => t
92                .metrics
93                .received_events_total
94                .as_ref()
95                .map(|p| p.received_events_total as i64)
96                .unwrap_or(0),
97            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(s) => s
98                .metrics
99                .received_events_total
100                .as_ref()
101                .map(|p| p.received_events_total as i64)
102                .unwrap_or(0),
103        }
104    }
105
106    pub fn sent_bytes_total(&self) -> i64 {
107        // This is network bytes sent, and only sinks can send out events.
108        match self {
109            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(_) => 0,
110            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(_) => 0,
111            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(s) => s
112                .metrics
113                .sent_bytes_total
114                .as_ref()
115                .map(|p| p.sent_bytes_total as i64)
116                .unwrap_or(0),
117        }
118    }
119
120    pub fn sent_events_total(&self) -> i64 {
121        match self {
122            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s
123                .metrics
124                .sent_events_total
125                .as_ref()
126                .map(|p| p.sent_events_total as i64)
127                .unwrap_or(0),
128            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(t) => t
129                .metrics
130                .sent_events_total
131                .as_ref()
132                .map(|p| p.sent_events_total as i64)
133                .unwrap_or(0),
134            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(s) => s
135                .metrics
136                .sent_events_total
137                .as_ref()
138                .map(|p| p.sent_events_total as i64)
139                .unwrap_or(0),
140        }
141    }
142
143    pub fn outputs(&self) -> Vec<(String, i64)> {
144        match self {
145            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(s) => s
146                .outputs
147                .iter()
148                .map(|o| {
149                    (
150                        o.output_id.clone(),
151                        o.sent_events_total
152                            .as_ref()
153                            .map(|p| p.sent_events_total as i64)
154                            .unwrap_or(0),
155                    )
156                })
157                .collect(),
158            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(t) => t
159                .outputs
160                .iter()
161                .map(|o| {
162                    (
163                        o.output_id.clone(),
164                        o.sent_events_total
165                            .as_ref()
166                            .map(|p| p.sent_events_total as i64)
167                            .unwrap_or(0),
168                    )
169                })
170                .collect(),
171            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(_) => vec![],
172        }
173    }
174}
175
176impl fmt::Display for components_query::ComponentsQueryComponentsEdgesNodeOn {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        let res = match self {
179            components_query::ComponentsQueryComponentsEdgesNodeOn::Source(_) => "source",
180            components_query::ComponentsQueryComponentsEdgesNodeOn::Transform(_) => "transform",
181            components_query::ComponentsQueryComponentsEdgesNodeOn::Sink(_) => "sink",
182        };
183
184        write!(f, "{res}")
185    }
186}
187
188impl fmt::Display for component_added_subscription::ComponentAddedSubscriptionComponentAddedOn {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        let res = match self {
191            component_added_subscription::ComponentAddedSubscriptionComponentAddedOn::Source => {
192                "source"
193            }
194            component_added_subscription::ComponentAddedSubscriptionComponentAddedOn::Transform => {
195                "transform"
196            }
197            component_added_subscription::ComponentAddedSubscriptionComponentAddedOn::Sink => {
198                "sink"
199            }
200        };
201
202        write!(f, "{res}")
203    }
204}
205
206impl fmt::Display
207    for component_removed_subscription::ComponentRemovedSubscriptionComponentRemovedOn
208{
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        let res = match self {
211            component_removed_subscription::ComponentRemovedSubscriptionComponentRemovedOn::Source => {
212                "source"
213            }
214            component_removed_subscription::ComponentRemovedSubscriptionComponentRemovedOn::Transform => {
215                "transform"
216            }
217            component_removed_subscription::ComponentRemovedSubscriptionComponentRemovedOn::Sink => {
218                "sink"
219            }
220        };
221
222        write!(f, "{res}")
223    }
224}