vector/sources/util/grpc/
mod.rs1use crate::{
2 internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent},
3 shutdown::{ShutdownSignal, ShutdownSignalToken},
4 tls::MaybeTlsSettings,
5};
6use futures::FutureExt;
7use http::{Request, Response};
8use hyper::Body;
9use std::{convert::Infallible, net::SocketAddr, time::Duration};
10use tonic::transport::server::Routes;
11use tonic::{body::BoxBody, server::NamedService, transport::server::Server};
12use tower::Service;
13use tower_http::{
14 classify::{GrpcErrorsAsFailures, SharedClassifier},
15 trace::TraceLayer,
16};
17use tracing::Span;
18
19mod decompression;
20pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer};
21
22pub async fn run_grpc_server<S>(
23 address: SocketAddr,
24 tls_settings: MaybeTlsSettings,
25 service: S,
26 shutdown: ShutdownSignal,
27) -> crate::Result<()>
28where
29 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
30 + NamedService
31 + Clone
32 + Send
33 + 'static,
34 S::Future: Send + 'static,
35{
36 let span = Span::current();
37 let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
38 let listener = tls_settings.bind(&address).await?;
39 let stream = listener.accept_stream();
40
41 info!(%address, "Building gRPC server.");
42
43 Server::builder()
44 .layer(build_grpc_trace_layer(span.clone()))
45 .layer(DecompressionAndMetricsLayer)
55 .add_service(service)
56 .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
57 .await?;
58
59 drop(rx.await);
60
61 Ok(())
62}
63
64pub async fn run_grpc_server_with_routes(
67 address: SocketAddr,
68 tls_settings: MaybeTlsSettings,
69 routes: Routes,
70 shutdown: ShutdownSignal,
71) -> crate::Result<()> {
72 let span = Span::current();
73 let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
74 let listener = tls_settings.bind(&address).await?;
75 let stream = listener.accept_stream();
76
77 info!(%address, "Building gRPC server.");
78
79 Server::builder()
80 .layer(build_grpc_trace_layer(span.clone()))
81 .layer(DecompressionAndMetricsLayer)
82 .add_routes(routes)
83 .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
84 .await?;
85
86 drop(rx.await);
87
88 Ok(())
89}
90
91pub fn build_grpc_trace_layer(
95 span: Span,
96) -> TraceLayer<
97 SharedClassifier<GrpcErrorsAsFailures>,
98 impl Fn(&Request<Body>) -> Span + Clone,
99 impl Fn(&Request<Body>, &Span) + Clone,
100 impl Fn(&Response<BoxBody>, Duration, &Span) + Clone,
101 (),
102 (),
103 (),
104> {
105 TraceLayer::new_for_grpc()
106 .make_span_with(move |request: &Request<Body>| {
107 let mut path = request.uri().path().split('/');
109 let service = path.nth(1).unwrap_or("_unknown");
110 let method = path.next().unwrap_or("_unknown");
111
112 error_span!(
114 parent: &span,
115 "grpc-request",
116 grpc_service = service,
117 grpc_method = method,
118 )
119 })
120 .on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
121 emit!(GrpcServerRequestReceived);
122 }))
123 .on_response(
124 |response: &Response<BoxBody>, latency: Duration, _span: &Span| {
125 emit!(GrpcServerResponseSent { response, latency });
126 },
127 )
128 .on_failure(())
129 .on_body_chunk(())
130 .on_eos(())
131}