vector/internal_events/
grpc.rs

1use std::time::Duration;
2
3use http::response::Response;
4use metrics::{counter, histogram};
5use tonic::Code;
6use vector_lib::internal_event::InternalEvent;
7use vector_lib::internal_event::{error_stage, error_type};
8
9const GRPC_STATUS_LABEL: &str = "grpc_status";
10
11#[derive(Debug)]
12pub struct GrpcServerRequestReceived;
13
14impl InternalEvent for GrpcServerRequestReceived {
15    fn emit(self) {
16        counter!("grpc_server_messages_received_total").increment(1);
17    }
18}
19
20#[derive(Debug)]
21pub struct GrpcServerResponseSent<'a, B> {
22    pub response: &'a Response<B>,
23    pub latency: Duration,
24}
25
26impl<B> InternalEvent for GrpcServerResponseSent<'_, B> {
27    fn emit(self) {
28        let grpc_code = self
29            .response
30            .headers()
31            .get("grpc-status")
32            // The header value is missing on success.
33            .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
34        let grpc_code = grpc_code_to_name(grpc_code);
35
36        let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
37        counter!("grpc_server_messages_sent_total", labels).increment(1);
38        histogram!("grpc_server_handler_duration_seconds", labels).record(self.latency);
39    }
40}
41
42#[derive(Debug)]
43pub struct GrpcInvalidCompressionSchemeError<'a> {
44    pub status: &'a tonic::Status,
45}
46
47impl InternalEvent for GrpcInvalidCompressionSchemeError<'_> {
48    fn emit(self) {
49        error!(
50            message = "Invalid compression scheme.",
51            error = ?self.status.message(),
52            error_type = error_type::REQUEST_FAILED,
53            stage = error_stage::RECEIVING,
54            internal_log_rate_limit = true
55        );
56        counter!(
57            "component_errors_total",
58            "error_type" => error_type::REQUEST_FAILED,
59            "stage" => error_stage::RECEIVING,
60        )
61        .increment(1);
62    }
63}
64
65#[derive(Debug)]
66pub struct GrpcError<E> {
67    pub error: E,
68}
69
70impl<E> InternalEvent for GrpcError<E>
71where
72    E: std::fmt::Display,
73{
74    fn emit(self) {
75        error!(
76            message = "Grpc error.",
77            error = %self.error,
78            error_type = error_type::REQUEST_FAILED,
79            stage = error_stage::RECEIVING,
80            internal_log_rate_limit = true
81        );
82        counter!(
83            "component_errors_total",
84            "error_type" => error_type::REQUEST_FAILED,
85            "stage" => error_stage::RECEIVING,
86        )
87        .increment(1);
88    }
89}
90
91const fn grpc_code_to_name(code: Code) -> &'static str {
92    match code {
93        Code::Ok => "Ok",
94        Code::Cancelled => "Cancelled",
95        Code::Unknown => "Unknown",
96        Code::InvalidArgument => "InvalidArgument",
97        Code::DeadlineExceeded => "DeadlineExceeded",
98        Code::NotFound => "NotFound",
99        Code::AlreadyExists => "AlreadyExists",
100        Code::PermissionDenied => "PermissionDenied",
101        Code::ResourceExhausted => "ResourceExhausted",
102        Code::FailedPrecondition => "FailedPrecondition",
103        Code::Aborted => "Aborted",
104        Code::OutOfRange => "OutOfRange",
105        Code::Unimplemented => "Unimplemented",
106        Code::Internal => "Internal",
107        Code::Unavailable => "Unavailable",
108        Code::DataLoss => "DataLoss",
109        Code::Unauthenticated => "Unauthenticated",
110    }
111}