vector/internal_events/
grpc.rs

1use std::time::Duration;
2
3use http::response::Response;
4use metrics::{counter, histogram};
5use tonic::Code;
6use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
7
8const GRPC_STATUS_LABEL: &str = "grpc_status";
9
10#[derive(Debug)]
11pub struct GrpcServerRequestReceived;
12
13impl InternalEvent for GrpcServerRequestReceived {
14    fn emit(self) {
15        counter!("grpc_server_messages_received_total").increment(1);
16    }
17}
18
19#[derive(Debug)]
20pub struct GrpcServerResponseSent<'a, B> {
21    pub response: &'a Response<B>,
22    pub latency: Duration,
23}
24
25impl<B> InternalEvent for GrpcServerResponseSent<'_, B> {
26    fn emit(self) {
27        let grpc_code = self
28            .response
29            .headers()
30            .get("grpc-status")
31            // The header value is missing on success.
32            .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
33        let grpc_code = grpc_code_to_name(grpc_code);
34
35        let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
36        counter!("grpc_server_messages_sent_total", labels).increment(1);
37        histogram!("grpc_server_handler_duration_seconds", labels).record(self.latency);
38    }
39}
40
41#[derive(Debug)]
42pub struct GrpcInvalidCompressionSchemeError<'a> {
43    pub status: &'a tonic::Status,
44}
45
46impl InternalEvent for GrpcInvalidCompressionSchemeError<'_> {
47    fn emit(self) {
48        error!(
49            message = "Invalid compression scheme.",
50            error = ?self.status.message(),
51            error_type = error_type::REQUEST_FAILED,
52            stage = error_stage::RECEIVING,
53            internal_log_rate_limit = true
54        );
55        counter!(
56            "component_errors_total",
57            "error_type" => error_type::REQUEST_FAILED,
58            "stage" => error_stage::RECEIVING,
59        )
60        .increment(1);
61    }
62}
63
64#[derive(Debug)]
65pub struct GrpcError<E> {
66    pub error: E,
67}
68
69impl<E> InternalEvent for GrpcError<E>
70where
71    E: std::fmt::Display,
72{
73    fn emit(self) {
74        error!(
75            message = "Grpc error.",
76            error = %self.error,
77            error_type = error_type::REQUEST_FAILED,
78            stage = error_stage::RECEIVING,
79            internal_log_rate_limit = true
80        );
81        counter!(
82            "component_errors_total",
83            "error_type" => error_type::REQUEST_FAILED,
84            "stage" => error_stage::RECEIVING,
85        )
86        .increment(1);
87    }
88}
89
90const fn grpc_code_to_name(code: Code) -> &'static str {
91    match code {
92        Code::Ok => "Ok",
93        Code::Cancelled => "Cancelled",
94        Code::Unknown => "Unknown",
95        Code::InvalidArgument => "InvalidArgument",
96        Code::DeadlineExceeded => "DeadlineExceeded",
97        Code::NotFound => "NotFound",
98        Code::AlreadyExists => "AlreadyExists",
99        Code::PermissionDenied => "PermissionDenied",
100        Code::ResourceExhausted => "ResourceExhausted",
101        Code::FailedPrecondition => "FailedPrecondition",
102        Code::Aborted => "Aborted",
103        Code::OutOfRange => "OutOfRange",
104        Code::Unimplemented => "Unimplemented",
105        Code::Internal => "Internal",
106        Code::Unavailable => "Unavailable",
107        Code::DataLoss => "DataLoss",
108        Code::Unauthenticated => "Unauthenticated",
109    }
110}