vector/api/
grpc_server.rs1use std::{
2 error::Error as StdError,
3 net::SocketAddr,
4 sync::{
5 Arc,
6 atomic::{AtomicBool, Ordering},
7 },
8};
9
10use axum::{
11 Router,
12 extract::State,
13 http::{StatusCode, header},
14 response::IntoResponse,
15 routing::get,
16};
17use tokio::sync::oneshot;
18use tonic::transport::Server as TonicServer;
19use tonic_health::server::{HealthReporter, health_reporter};
20use vector_lib::tap::topology::WatchRx;
21
22use super::grpc::ObservabilityService;
23use crate::{config::Config, proto::observability::Server as ObservabilityServer};
24
25type ServingState = Arc<AtomicBool>;
28
29pub struct GrpcServer {
31 _shutdown: oneshot::Sender<()>,
32 health_reporter: HealthReporter,
33 serving: ServingState,
34 addr: SocketAddr,
35}
36
37impl GrpcServer {
38 pub async fn start(config: &Config, watch_rx: WatchRx) -> crate::Result<Self> {
46 let addr = config.api.address.ok_or_else(|| {
47 crate::Error::from("API address not configured in config.api.address")
48 })?;
49
50 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
53 crate::Error::from(format!("Failed to bind gRPC API server to {}: {}", addr, e))
54 })?;
55
56 let actual_addr = listener
57 .local_addr()
58 .map_err(|e| crate::Error::from(format!("Failed to get local address: {}", e)))?;
59
60 info!("GRPC API server bound to {}.", actual_addr);
61
62 let service = ObservabilityService::new(watch_rx);
63
64 let (health_reporter, health_service) = health_reporter();
67
68 let serving: ServingState = Arc::new(AtomicBool::new(true));
69
70 let (_shutdown, rx) = oneshot::channel();
71
72 let std_listener = listener
74 .into_std()
75 .map_err(|e| crate::Error::from(format!("Failed to convert TCP listener: {}", e)))?;
76 std_listener.set_nonblocking(true).map_err(|e| {
77 crate::Error::from(format!("Failed to set TCP listener non-blocking: {}", e))
78 })?;
79
80 let router_serving = Arc::clone(&serving);
81
82 tokio::spawn(async move {
84 let reflection_service = tonic_reflection::server::Builder::configure()
86 .register_encoded_file_descriptor_set(
87 crate::proto::observability::FILE_DESCRIPTOR_SET,
88 )
89 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
90 .build()
91 .expect("Failed to build reflection service");
92
93 let router = TonicServer::builder()
97 .accept_http1(true)
98 .add_service(health_service)
99 .add_service(ObservabilityServer::new(service))
100 .add_service(reflection_service)
101 .into_router()
102 .merge(http_router(router_serving));
103
104 let result = hyper::Server::from_tcp(std_listener)
105 .expect("Failed to build HTTP server from TCP listener")
106 .serve(router.into_make_service())
107 .with_graceful_shutdown(async {
108 rx.await.ok();
109 info!("GRPC API server shutting down.");
110 })
111 .await;
112
113 if let Err(e) = result {
114 error!(
115 message = "GRPC server encountered an error.",
116 error = %e,
117 error_source = ?e.source(),
118 bind_addr = %actual_addr,
119 );
120 }
121 });
122
123 info!("GRPC API server started on {}.", actual_addr);
124
125 Ok(Self {
126 _shutdown,
127 health_reporter,
128 serving,
129 addr: actual_addr,
130 })
131 }
132
133 pub async fn set_not_serving(&mut self) {
139 self.serving.store(false, Ordering::Relaxed);
140 self.health_reporter
141 .set_service_status("", tonic_health::ServingStatus::NotServing)
142 .await;
143 }
144
145 pub const fn addr(&self) -> SocketAddr {
147 self.addr
148 }
149}
150
151fn http_router(state: ServingState) -> Router {
158 Router::new()
159 .route("/health", get(health_handler).head(health_handler))
160 .with_state(state)
161}
162
163async fn health_handler(State(state): State<ServingState>) -> impl IntoResponse {
164 if state.load(Ordering::Relaxed) {
165 (
166 StatusCode::OK,
167 [(header::CONTENT_TYPE, "application/json")],
168 r#"{"ok":true}"#,
169 )
170 } else {
171 (
172 StatusCode::SERVICE_UNAVAILABLE,
173 [(header::CONTENT_TYPE, "application/json")],
174 r#"{"ok":false}"#,
175 )
176 }
177}