vector/sources/util/grpc/
mod.rs

1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use http::{Request, Response};
5use hyper::Body;
6use tonic::{
7    body::BoxBody,
8    server::NamedService,
9    transport::server::{Routes, Server},
10};
11use tower::Service;
12use tower_http::{
13    classify::{GrpcErrorsAsFailures, SharedClassifier},
14    trace::TraceLayer,
15};
16use tracing::Span;
17
18use crate::{
19    internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent},
20    shutdown::{ShutdownSignal, ShutdownSignalToken},
21    tls::MaybeTlsSettings,
22};
23
24mod decompression;
25pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer};
26
27pub async fn run_grpc_server<S>(
28    address: SocketAddr,
29    tls_settings: MaybeTlsSettings,
30    service: S,
31    shutdown: ShutdownSignal,
32) -> crate::Result<()>
33where
34    S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
35        + NamedService
36        + Clone
37        + Send
38        + 'static,
39    S::Future: Send + 'static,
40{
41    let span = Span::current();
42    let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
43    let listener = tls_settings.bind(&address).await?;
44    let stream = listener.accept_stream();
45
46    info!(%address, "Building gRPC server.");
47
48    Server::builder()
49        .layer(build_grpc_trace_layer(span.clone()))
50        // This layer explicitly decompresses payloads, if compressed, and reports the number of message bytes we've
51        // received if the message is processed successfully, aka `BytesReceived`. We do this because otherwise the only
52        // access we have is either the event-specific bytes (the in-memory representation) or the raw bytes over the
53        // wire prior to decompression... and if that case, any bytes at all, not just the ones we successfully process.
54        //
55        // The weaving of `tonic`, `axum`, `tower`, and `hyper` is fairly complex and there currently exists no way to
56        // use independent `tower` layers when the request body itself (the body type, not the actual bytes) must be
57        // modified or wrapped.. so instead of a cleaner design, we're opting here to bake it all together until the
58        // crates are sufficiently flexible for us to craft a better design.
59        .layer(DecompressionAndMetricsLayer)
60        .add_service(service)
61        .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
62        .await?;
63
64    drop(rx.await);
65
66    Ok(())
67}
68
69// This is a bit of a ugly hack to allow us to run two services on the same port.
70// I just don't know how to convert the generic type with associated types into a Vec<Box<trait object>>.
71pub async fn run_grpc_server_with_routes(
72    address: SocketAddr,
73    tls_settings: MaybeTlsSettings,
74    routes: Routes,
75    shutdown: ShutdownSignal,
76) -> crate::Result<()> {
77    let span = Span::current();
78    let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
79    let listener = tls_settings.bind(&address).await?;
80    let stream = listener.accept_stream();
81
82    info!(%address, "Building gRPC server.");
83
84    Server::builder()
85        .layer(build_grpc_trace_layer(span.clone()))
86        .layer(DecompressionAndMetricsLayer)
87        .add_routes(routes)
88        .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
89        .await?;
90
91    drop(rx.await);
92
93    Ok(())
94}
95
96/// Builds a [TraceLayer] configured for a gRPC server.
97///
98/// This layer emits gPRC specific telemetry for messages received/sent and handler duration.
99pub fn build_grpc_trace_layer(
100    span: Span,
101) -> TraceLayer<
102    SharedClassifier<GrpcErrorsAsFailures>,
103    impl Fn(&Request<Body>) -> Span + Clone,
104    impl Fn(&Request<Body>, &Span) + Clone,
105    impl Fn(&Response<BoxBody>, Duration, &Span) + Clone,
106    (),
107    (),
108    (),
109> {
110    TraceLayer::new_for_grpc()
111        .make_span_with(move |request: &Request<Body>| {
112            // The path is defined as “/” {service name} “/” {method name}.
113            let mut path = request.uri().path().split('/');
114            let service = path.nth(1).unwrap_or("_unknown");
115            let method = path.next().unwrap_or("_unknown");
116
117            // This is an error span so that the labels are always present for metrics.
118            error_span!(
119               parent: &span,
120               "grpc-request",
121               grpc_service = service,
122               grpc_method = method,
123            )
124        })
125        .on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
126            emit!(GrpcServerRequestReceived);
127        }))
128        .on_response(
129            |response: &Response<BoxBody>, latency: Duration, _span: &Span| {
130                emit!(GrpcServerResponseSent { response, latency });
131            },
132        )
133        .on_failure(())
134        .on_body_chunk(())
135        .on_eos(())
136}