vector/sources/util/grpc/
mod.rs1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use http::{Request, Response};
5use hyper::Body;
6use tonic::{
7 body::BoxBody,
8 server::NamedService,
9 transport::server::{Routes, Server},
10};
11use tower::Service;
12use tower_http::{
13 classify::{GrpcErrorsAsFailures, SharedClassifier},
14 trace::TraceLayer,
15};
16use tracing::Span;
17
18use crate::{
19 internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent},
20 shutdown::{ShutdownSignal, ShutdownSignalToken},
21 tls::MaybeTlsSettings,
22};
23
24mod decompression;
25pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer};
26
27pub async fn run_grpc_server<S>(
28 address: SocketAddr,
29 tls_settings: MaybeTlsSettings,
30 service: S,
31 shutdown: ShutdownSignal,
32) -> crate::Result<()>
33where
34 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
35 + NamedService
36 + Clone
37 + Send
38 + 'static,
39 S::Future: Send + 'static,
40{
41 let span = Span::current();
42 let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
43 let listener = tls_settings.bind(&address).await?;
44 let stream = listener.accept_stream();
45
46 info!(%address, "Building gRPC server.");
47
48 Server::builder()
49 .layer(build_grpc_trace_layer(span.clone()))
50 .layer(DecompressionAndMetricsLayer)
60 .add_service(service)
61 .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
62 .await?;
63
64 drop(rx.await);
65
66 Ok(())
67}
68
69pub async fn run_grpc_server_with_routes(
72 address: SocketAddr,
73 tls_settings: MaybeTlsSettings,
74 routes: Routes,
75 shutdown: ShutdownSignal,
76) -> crate::Result<()> {
77 let span = Span::current();
78 let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
79 let listener = tls_settings.bind(&address).await?;
80 let stream = listener.accept_stream();
81
82 info!(%address, "Building gRPC server.");
83
84 Server::builder()
85 .layer(build_grpc_trace_layer(span.clone()))
86 .layer(DecompressionAndMetricsLayer)
87 .add_routes(routes)
88 .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
89 .await?;
90
91 drop(rx.await);
92
93 Ok(())
94}
95
96pub fn build_grpc_trace_layer(
100 span: Span,
101) -> TraceLayer<
102 SharedClassifier<GrpcErrorsAsFailures>,
103 impl Fn(&Request<Body>) -> Span + Clone,
104 impl Fn(&Request<Body>, &Span) + Clone,
105 impl Fn(&Response<BoxBody>, Duration, &Span) + Clone,
106 (),
107 (),
108 (),
109> {
110 TraceLayer::new_for_grpc()
111 .make_span_with(move |request: &Request<Body>| {
112 let mut path = request.uri().path().split('/');
114 let service = path.nth(1).unwrap_or("_unknown");
115 let method = path.next().unwrap_or("_unknown");
116
117 error_span!(
119 parent: &span,
120 "grpc-request",
121 grpc_service = service,
122 grpc_method = method,
123 )
124 })
125 .on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
126 emit!(GrpcServerRequestReceived);
127 }))
128 .on_response(
129 |response: &Response<BoxBody>, latency: Duration, _span: &Span| {
130 emit!(GrpcServerResponseSent { response, latency });
131 },
132 )
133 .on_failure(())
134 .on_body_chunk(())
135 .on_eos(())
136}