vector_api_client/
client.rs1use http::Uri;
2use tokio_stream::{Stream, StreamExt};
3use tonic::transport::{Channel, Endpoint};
4use tonic_health::pb::{
5 HealthCheckRequest, health_check_response::ServingStatus, health_client::HealthClient,
6};
7
8use crate::{
9 error::{Error, Result},
10 proto::{
11 GetAllocationTracingStatusRequest, GetAllocationTracingStatusResponse,
12 GetComponentsRequest, GetComponentsResponse, GetMetaRequest, GetMetaResponse, MetricName,
13 StreamComponentAllocatedBytesRequest, StreamComponentAllocatedBytesResponse,
14 StreamComponentMetricsRequest, StreamComponentMetricsResponse, StreamHeartbeatRequest,
15 StreamHeartbeatResponse, StreamOutputEventsRequest, StreamOutputEventsResponse,
16 StreamUptimeRequest, StreamUptimeResponse,
17 observability_service_client::ObservabilityServiceClient,
18 },
19};
20
21#[derive(Debug, Clone)]
23pub struct Client {
24 endpoint: Endpoint,
25 channel: Option<Channel>,
26 client: Option<ObservabilityServiceClient<Channel>>,
27}
28
29impl Client {
30 pub fn new(uri: Uri) -> Self {
38 Self {
39 endpoint: Endpoint::from(uri),
40 channel: None,
41 client: None,
42 }
43 }
44
45 pub async fn connect(&mut self) -> Result<()> {
47 let channel = self.endpoint.connect().await?;
48 self.client = Some(ObservabilityServiceClient::new(channel.clone()));
49 self.channel = Some(channel);
50 Ok(())
51 }
52
53 fn ensure_connected(&mut self) -> Result<&mut ObservabilityServiceClient<Channel>> {
55 self.client.as_mut().ok_or(Error::NotConnected)
56 }
57
58 fn channel(&self) -> Result<&Channel> {
60 self.channel.as_ref().ok_or(Error::NotConnected)
61 }
62
63 pub async fn health(&mut self) -> Result<()> {
74 let channel = self.channel()?.clone();
75 let mut health_client = HealthClient::new(channel);
76 let response = health_client
77 .check(HealthCheckRequest {
78 service: String::new(),
79 })
80 .await?;
81 let status = response.into_inner().status;
82 if status != ServingStatus::Serving as i32 {
83 return Err(Error::NotServing { status });
84 }
85 Ok(())
86 }
87
88 pub async fn get_meta(&mut self) -> Result<GetMetaResponse> {
90 let client = self.ensure_connected()?;
91 let response = client.get_meta(GetMetaRequest {}).await?;
92 Ok(response.into_inner())
93 }
94
95 pub async fn get_components(&mut self, limit: i32) -> Result<GetComponentsResponse> {
101 let client = self.ensure_connected()?;
102 let response = client
103 .get_components(GetComponentsRequest { limit })
104 .await?;
105 Ok(response.into_inner())
106 }
107
108 pub async fn get_allocation_tracing_status(
110 &mut self,
111 ) -> Result<GetAllocationTracingStatusResponse> {
112 let client = self.ensure_connected()?;
113 let response = client
114 .get_allocation_tracing_status(GetAllocationTracingStatusRequest {})
115 .await?;
116 Ok(response.into_inner())
117 }
118
119 pub async fn stream_heartbeat(
127 &mut self,
128 interval_ms: i32,
129 ) -> Result<impl Stream<Item = Result<StreamHeartbeatResponse>>> {
130 let client = self.ensure_connected()?;
131 let response = client
132 .stream_heartbeat(StreamHeartbeatRequest { interval_ms })
133 .await?;
134 Ok(response.into_inner().map(|r| r.map_err(Error::from)))
135 }
136
137 pub async fn stream_uptime(
143 &mut self,
144 interval_ms: i32,
145 ) -> Result<impl Stream<Item = Result<StreamUptimeResponse>>> {
146 let client = self.ensure_connected()?;
147 let response = client
148 .stream_uptime(StreamUptimeRequest { interval_ms })
149 .await?;
150 Ok(response.into_inner().map(|r| r.map_err(Error::from)))
151 }
152
153 pub async fn stream_component_allocated_bytes(
159 &mut self,
160 interval_ms: i32,
161 ) -> Result<impl Stream<Item = Result<StreamComponentAllocatedBytesResponse>>> {
162 let client = self.ensure_connected()?;
163 let response = client
164 .stream_component_allocated_bytes(StreamComponentAllocatedBytesRequest { interval_ms })
165 .await?;
166 Ok(response.into_inner().map(|r| r.map_err(Error::from)))
167 }
168
169 pub async fn stream_component_metrics(
176 &mut self,
177 metric: MetricName,
178 interval_ms: i32,
179 ) -> Result<impl Stream<Item = Result<StreamComponentMetricsResponse>>> {
180 let client = self.ensure_connected()?;
181 let response = client
182 .stream_component_metrics(StreamComponentMetricsRequest {
183 interval_ms,
184 metric: metric as i32,
185 })
186 .await?;
187 Ok(response.into_inner().map(|r| r.map_err(Error::from)))
188 }
189
190 pub async fn stream_output_events(
194 &mut self,
195 request: StreamOutputEventsRequest,
196 ) -> Result<impl Stream<Item = Result<StreamOutputEventsResponse>> + use<>> {
197 let client = self.ensure_connected()?;
198 let response = client.stream_output_events(request).await?;
199 Ok(response.into_inner().map(|r| r.map_err(Error::from)))
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206
207 #[tokio::test]
208 async fn test_not_connected_error() {
209 let mut client = Client::new("http://localhost:9999".parse().unwrap());
210 let result = client.health().await;
211 assert!(matches!(result, Err(Error::NotConnected)));
212 }
213
214 #[test]
215 fn test_ensure_connected() {
216 let mut client = Client::new("http://localhost:9999".parse().unwrap());
217 let result = client.ensure_connected();
218 assert!(result.is_err());
219 assert!(matches!(result.unwrap_err(), Error::NotConnected));
220 }
221}