vector/api/
grpc_server.rs

1use std::{
2    error::Error as StdError,
3    net::SocketAddr,
4    sync::{
5        Arc,
6        atomic::{AtomicBool, Ordering},
7    },
8};
9
10use axum::{
11    Router,
12    extract::State,
13    http::{StatusCode, header},
14    response::IntoResponse,
15    routing::get,
16};
17use tokio::sync::oneshot;
18use tonic::transport::Server as TonicServer;
19use tonic_health::server::{HealthReporter, health_reporter};
20use vector_lib::tap::topology::WatchRx;
21
22use super::grpc::ObservabilityService;
23use crate::{config::Config, proto::observability::Server as ObservabilityServer};
24
25/// Shared flag backing the HTTP `/health` endpoint. Mirrors the gRPC
26/// `HealthReporter` serving status so HTTP and gRPC probes agree.
27type ServingState = Arc<AtomicBool>;
28
29/// gRPC API server for Vector observability.
30pub struct GrpcServer {
31    _shutdown: oneshot::Sender<()>,
32    health_reporter: HealthReporter,
33    serving: ServingState,
34    addr: SocketAddr,
35}
36
37impl GrpcServer {
38    /// Start the gRPC API server.
39    ///
40    /// This creates a new gRPC server listening on the configured address and spawns
41    /// it in the background. The server will shut down gracefully when this struct
42    /// is dropped.
43    ///
44    /// Returns an error if the server fails to bind to the configured address.
45    pub async fn start(config: &Config, watch_rx: WatchRx) -> crate::Result<Self> {
46        let addr = config.api.address.ok_or_else(|| {
47            crate::Error::from("API address not configured in config.api.address")
48        })?;
49
50        // Bind the TCP listener first to ensure the port is available
51        // This will fail fast if the address is already in use
52        let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
53            crate::Error::from(format!("Failed to bind gRPC API server to {}: {}", addr, e))
54        })?;
55
56        let actual_addr = listener
57            .local_addr()
58            .map_err(|e| crate::Error::from(format!("Failed to get local address: {}", e)))?;
59
60        info!("GRPC API server bound to {}.", actual_addr);
61
62        let service = ObservabilityService::new(watch_rx);
63
64        // Create the standard gRPC health service (grpc.health.v1.Health).
65        // The empty service ("") is registered as SERVING by default.
66        let (health_reporter, health_service) = health_reporter();
67
68        let serving: ServingState = Arc::new(AtomicBool::new(true));
69
70        let (_shutdown, rx) = oneshot::channel();
71
72        // Convert the tokio TcpListener into a std listener for hyper's Server.
73        let std_listener = listener
74            .into_std()
75            .map_err(|e| crate::Error::from(format!("Failed to convert TCP listener: {}", e)))?;
76        std_listener.set_nonblocking(true).map_err(|e| {
77            crate::Error::from(format!("Failed to set TCP listener non-blocking: {}", e))
78        })?;
79
80        let router_serving = Arc::clone(&serving);
81
82        // Spawn the server with the already-bound listener
83        tokio::spawn(async move {
84            // Build reflection service for tools like grpcurl
85            let reflection_service = tonic_reflection::server::Builder::configure()
86                .register_encoded_file_descriptor_set(
87                    crate::proto::observability::FILE_DESCRIPTOR_SET,
88                )
89                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
90                .build()
91                .expect("Failed to build reflection service");
92
93            // Build the tonic router (gRPC services) and merge with the HTTP router
94            // so both protocols share the same port. `accept_http1(true)` lets plain
95            // HTTP/1.1 requests reach the merged axum routes.
96            let router = TonicServer::builder()
97                .accept_http1(true)
98                .add_service(health_service)
99                .add_service(ObservabilityServer::new(service))
100                .add_service(reflection_service)
101                .into_router()
102                .merge(http_router(router_serving));
103
104            let result = hyper::Server::from_tcp(std_listener)
105                .expect("Failed to build HTTP server from TCP listener")
106                .serve(router.into_make_service())
107                .with_graceful_shutdown(async {
108                    rx.await.ok();
109                    info!("GRPC API server shutting down.");
110                })
111                .await;
112
113            if let Err(e) = result {
114                error!(
115                    message = "GRPC server encountered an error.",
116                    error = %e,
117                    error_source = ?e.source(),
118                    bind_addr = %actual_addr,
119                );
120            }
121        });
122
123        info!("GRPC API server started on {}.", actual_addr);
124
125        Ok(Self {
126            _shutdown,
127            health_reporter,
128            serving,
129            addr: actual_addr,
130        })
131    }
132
133    /// Signal that the server is no longer serving.
134    ///
135    /// Call this **before** draining the topology so that Kubernetes gRPC
136    /// readiness probes and HTTP `/health` probes fail early and the pod is
137    /// removed from endpoints before the process exits.
138    pub async fn set_not_serving(&mut self) {
139        self.serving.store(false, Ordering::Relaxed);
140        self.health_reporter
141            .set_service_status("", tonic_health::ServingStatus::NotServing)
142            .await;
143    }
144
145    /// Get the address the server is listening on
146    pub const fn addr(&self) -> SocketAddr {
147        self.addr
148    }
149}
150
151/// Axum router exposing `GET`/`HEAD /health`.
152///
153/// Returns `200 {"ok":true}` while the server is serving and
154/// `503 {"ok":false}` once [`GrpcServer::set_not_serving`] has been called.
155/// Matches the response shape of the pre-gRPC GraphQL-era endpoint so
156/// existing HTTP health probes (Kubernetes, load balancers) keep working.
157fn http_router(state: ServingState) -> Router {
158    Router::new()
159        .route("/health", get(health_handler).head(health_handler))
160        .with_state(state)
161}
162
163async fn health_handler(State(state): State<ServingState>) -> impl IntoResponse {
164    if state.load(Ordering::Relaxed) {
165        (
166            StatusCode::OK,
167            [(header::CONTENT_TYPE, "application/json")],
168            r#"{"ok":true}"#,
169        )
170    } else {
171        (
172            StatusCode::SERVICE_UNAVAILABLE,
173            [(header::CONTENT_TYPE, "application/json")],
174            r#"{"ok":false}"#,
175        )
176    }
177}