vector/sources/util/grpc/
mod.rs

1use crate::{
2    internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent},
3    shutdown::{ShutdownSignal, ShutdownSignalToken},
4    tls::MaybeTlsSettings,
5};
6use futures::FutureExt;
7use http::{Request, Response};
8use hyper::Body;
9use std::{convert::Infallible, net::SocketAddr, time::Duration};
10use tonic::transport::server::Routes;
11use tonic::{body::BoxBody, server::NamedService, transport::server::Server};
12use tower::Service;
13use tower_http::{
14    classify::{GrpcErrorsAsFailures, SharedClassifier},
15    trace::TraceLayer,
16};
17use tracing::Span;
18
19mod decompression;
20pub use self::decompression::{DecompressionAndMetrics, DecompressionAndMetricsLayer};
21
22pub async fn run_grpc_server<S>(
23    address: SocketAddr,
24    tls_settings: MaybeTlsSettings,
25    service: S,
26    shutdown: ShutdownSignal,
27) -> crate::Result<()>
28where
29    S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
30        + NamedService
31        + Clone
32        + Send
33        + 'static,
34    S::Future: Send + 'static,
35{
36    let span = Span::current();
37    let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
38    let listener = tls_settings.bind(&address).await?;
39    let stream = listener.accept_stream();
40
41    info!(%address, "Building gRPC server.");
42
43    Server::builder()
44        .layer(build_grpc_trace_layer(span.clone()))
45        // This layer explicitly decompresses payloads, if compressed, and reports the number of message bytes we've
46        // received if the message is processed successfully, aka `BytesReceived`. We do this because otherwise the only
47        // access we have is either the event-specific bytes (the in-memory representation) or the raw bytes over the
48        // wire prior to decompression... and if that case, any bytes at all, not just the ones we successfully process.
49        //
50        // The weaving of `tonic`, `axum`, `tower`, and `hyper` is fairly complex and there currently exists no way to
51        // use independent `tower` layers when the request body itself (the body type, not the actual bytes) must be
52        // modified or wrapped.. so instead of a cleaner design, we're opting here to bake it all together until the
53        // crates are sufficiently flexible for us to craft a better design.
54        .layer(DecompressionAndMetricsLayer)
55        .add_service(service)
56        .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
57        .await?;
58
59    drop(rx.await);
60
61    Ok(())
62}
63
64// This is a bit of a ugly hack to allow us to run two services on the same port.
65// I just don't know how to convert the generic type with associated types into a Vec<Box<trait object>>.
66pub async fn run_grpc_server_with_routes(
67    address: SocketAddr,
68    tls_settings: MaybeTlsSettings,
69    routes: Routes,
70    shutdown: ShutdownSignal,
71) -> crate::Result<()> {
72    let span = Span::current();
73    let (tx, rx) = tokio::sync::oneshot::channel::<ShutdownSignalToken>();
74    let listener = tls_settings.bind(&address).await?;
75    let stream = listener.accept_stream();
76
77    info!(%address, "Building gRPC server.");
78
79    Server::builder()
80        .layer(build_grpc_trace_layer(span.clone()))
81        .layer(DecompressionAndMetricsLayer)
82        .add_routes(routes)
83        .serve_with_incoming_shutdown(stream, shutdown.map(|token| tx.send(token).unwrap()))
84        .await?;
85
86    drop(rx.await);
87
88    Ok(())
89}
90
91/// Builds a [TraceLayer] configured for a gRPC server.
92///
93/// This layer emits gPRC specific telemetry for messages received/sent and handler duration.
94pub fn build_grpc_trace_layer(
95    span: Span,
96) -> TraceLayer<
97    SharedClassifier<GrpcErrorsAsFailures>,
98    impl Fn(&Request<Body>) -> Span + Clone,
99    impl Fn(&Request<Body>, &Span) + Clone,
100    impl Fn(&Response<BoxBody>, Duration, &Span) + Clone,
101    (),
102    (),
103    (),
104> {
105    TraceLayer::new_for_grpc()
106        .make_span_with(move |request: &Request<Body>| {
107            // The path is defined as “/” {service name} “/” {method name}.
108            let mut path = request.uri().path().split('/');
109            let service = path.nth(1).unwrap_or("_unknown");
110            let method = path.next().unwrap_or("_unknown");
111
112            // This is an error span so that the labels are always present for metrics.
113            error_span!(
114               parent: &span,
115               "grpc-request",
116               grpc_service = service,
117               grpc_method = method,
118            )
119        })
120        .on_request(Box::new(|_request: &Request<Body>, _span: &Span| {
121            emit!(GrpcServerRequestReceived);
122        }))
123        .on_response(
124            |response: &Response<BoxBody>, latency: Duration, _span: &Span| {
125                emit!(GrpcServerResponseSent { response, latency });
126            },
127        )
128        .on_failure(())
129        .on_body_chunk(())
130        .on_eos(())
131}