vector_api_client/
client.rs

1use http::Uri;
2use tokio_stream::{Stream, StreamExt};
3use tonic::transport::{Channel, Endpoint};
4use tonic_health::pb::{
5    HealthCheckRequest, health_check_response::ServingStatus, health_client::HealthClient,
6};
7
8use crate::{
9    error::{Error, Result},
10    proto::{
11        GetAllocationTracingStatusRequest, GetAllocationTracingStatusResponse,
12        GetComponentsRequest, GetComponentsResponse, GetMetaRequest, GetMetaResponse, MetricName,
13        StreamComponentAllocatedBytesRequest, StreamComponentAllocatedBytesResponse,
14        StreamComponentMetricsRequest, StreamComponentMetricsResponse, StreamHeartbeatRequest,
15        StreamHeartbeatResponse, StreamOutputEventsRequest, StreamOutputEventsResponse,
16        StreamUptimeRequest, StreamUptimeResponse,
17        observability_service_client::ObservabilityServiceClient,
18    },
19};
20
21/// gRPC client for the Vector observability API
22#[derive(Debug, Clone)]
23pub struct Client {
24    endpoint: Endpoint,
25    channel: Option<Channel>,
26    client: Option<ObservabilityServiceClient<Channel>>,
27}
28
29impl Client {
30    /// Create a new gRPC client
31    ///
32    /// The client is not connected until `connect()` is called.
33    ///
34    /// # Arguments
35    ///
36    /// * `uri` - The gRPC server URI (e.g., `"http://localhost:9999".parse().unwrap()`)
37    pub fn new(uri: Uri) -> Self {
38        Self {
39            endpoint: Endpoint::from(uri),
40            channel: None,
41            client: None,
42        }
43    }
44
45    /// Connect to the gRPC server
46    pub async fn connect(&mut self) -> Result<()> {
47        let channel = self.endpoint.connect().await?;
48        self.client = Some(ObservabilityServiceClient::new(channel.clone()));
49        self.channel = Some(channel);
50        Ok(())
51    }
52
53    /// Ensure the client is connected
54    fn ensure_connected(&mut self) -> Result<&mut ObservabilityServiceClient<Channel>> {
55        self.client.as_mut().ok_or(Error::NotConnected)
56    }
57
58    /// Get the underlying channel
59    fn channel(&self) -> Result<&Channel> {
60        self.channel.as_ref().ok_or(Error::NotConnected)
61    }
62
63    // ========== Unary RPCs ==========
64
65    /// Check if the API server is healthy using the standard gRPC health check
66    /// protocol (grpc.health.v1.Health/Check).
67    ///
68    /// Queries the empty service name (`""`), which represents whole-server
69    /// health. This is the default used by Kubernetes gRPC probes and
70    /// `grpc-health-probe`.
71    ///
72    /// Returns `Ok(())` if the server is `SERVING`, or an error otherwise.
73    pub async fn health(&mut self) -> Result<()> {
74        let channel = self.channel()?.clone();
75        let mut health_client = HealthClient::new(channel);
76        let response = health_client
77            .check(HealthCheckRequest {
78                service: String::new(),
79            })
80            .await?;
81        let status = response.into_inner().status;
82        if status != ServingStatus::Serving as i32 {
83            return Err(Error::NotServing { status });
84        }
85        Ok(())
86    }
87
88    /// Get metadata about the Vector instance
89    pub async fn get_meta(&mut self) -> Result<GetMetaResponse> {
90        let client = self.ensure_connected()?;
91        let response = client.get_meta(GetMetaRequest {}).await?;
92        Ok(response.into_inner())
93    }
94
95    /// Get information about configured components
96    ///
97    /// # Arguments
98    ///
99    /// * `limit` - Maximum number of components to return (0 = no limit)
100    pub async fn get_components(&mut self, limit: i32) -> Result<GetComponentsResponse> {
101        let client = self.ensure_connected()?;
102        let response = client
103            .get_components(GetComponentsRequest { limit })
104            .await?;
105        Ok(response.into_inner())
106    }
107
108    /// Check whether allocation tracing is active on the connected Vector instance
109    pub async fn get_allocation_tracing_status(
110        &mut self,
111    ) -> Result<GetAllocationTracingStatusResponse> {
112        let client = self.ensure_connected()?;
113        let response = client
114            .get_allocation_tracing_status(GetAllocationTracingStatusRequest {})
115            .await?;
116        Ok(response.into_inner())
117    }
118
119    // ========== Streaming RPCs ==========
120
121    /// Stream periodic heartbeat timestamps
122    ///
123    /// # Arguments
124    ///
125    /// * `interval_ms` - Update interval in milliseconds
126    pub async fn stream_heartbeat(
127        &mut self,
128        interval_ms: i32,
129    ) -> Result<impl Stream<Item = Result<StreamHeartbeatResponse>>> {
130        let client = self.ensure_connected()?;
131        let response = client
132            .stream_heartbeat(StreamHeartbeatRequest { interval_ms })
133            .await?;
134        Ok(response.into_inner().map(|r| r.map_err(Error::from)))
135    }
136
137    /// Stream uptime in seconds
138    ///
139    /// # Arguments
140    ///
141    /// * `interval_ms` - Update interval in milliseconds
142    pub async fn stream_uptime(
143        &mut self,
144        interval_ms: i32,
145    ) -> Result<impl Stream<Item = Result<StreamUptimeResponse>>> {
146        let client = self.ensure_connected()?;
147        let response = client
148            .stream_uptime(StreamUptimeRequest { interval_ms })
149            .await?;
150        Ok(response.into_inner().map(|r| r.map_err(Error::from)))
151    }
152
153    /// Stream memory allocated per component
154    ///
155    /// # Arguments
156    ///
157    /// * `interval_ms` - Update interval in milliseconds
158    pub async fn stream_component_allocated_bytes(
159        &mut self,
160        interval_ms: i32,
161    ) -> Result<impl Stream<Item = Result<StreamComponentAllocatedBytesResponse>>> {
162        let client = self.ensure_connected()?;
163        let response = client
164            .stream_component_allocated_bytes(StreamComponentAllocatedBytesRequest { interval_ms })
165            .await?;
166        Ok(response.into_inner().map(|r| r.map_err(Error::from)))
167    }
168
169    /// Stream per-component metrics for a chosen metric name.
170    ///
171    /// # Arguments
172    ///
173    /// * `metric` - Which metric to stream
174    /// * `interval_ms` - Update interval in milliseconds
175    pub async fn stream_component_metrics(
176        &mut self,
177        metric: MetricName,
178        interval_ms: i32,
179    ) -> Result<impl Stream<Item = Result<StreamComponentMetricsResponse>>> {
180        let client = self.ensure_connected()?;
181        let response = client
182            .stream_component_metrics(StreamComponentMetricsRequest {
183                interval_ms,
184                metric: metric as i32,
185            })
186            .await?;
187        Ok(response.into_inner().map(|r| r.map_err(Error::from)))
188    }
189
190    /// Stream events from components matching patterns
191    ///
192    /// This is used by `vector tap` to capture events.
193    pub async fn stream_output_events(
194        &mut self,
195        request: StreamOutputEventsRequest,
196    ) -> Result<impl Stream<Item = Result<StreamOutputEventsResponse>> + use<>> {
197        let client = self.ensure_connected()?;
198        let response = client.stream_output_events(request).await?;
199        Ok(response.into_inner().map(|r| r.map_err(Error::from)))
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[tokio::test]
208    async fn test_not_connected_error() {
209        let mut client = Client::new("http://localhost:9999".parse().unwrap());
210        let result = client.health().await;
211        assert!(matches!(result, Err(Error::NotConnected)));
212    }
213
214    #[test]
215    fn test_ensure_connected() {
216        let mut client = Client::new("http://localhost:9999".parse().unwrap());
217        let result = client.ensure_connected();
218        assert!(result.is_err());
219        assert!(matches!(result.unwrap_err(), Error::NotConnected));
220    }
221}