1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use std::time::Duration;

use http::response::Response;
use metrics::{counter, histogram};
use tonic::Code;
use vector_lib::internal_event::InternalEvent;
use vector_lib::internal_event::{error_stage, error_type};

const GRPC_STATUS_LABEL: &str = "grpc_status";

#[derive(Debug)]
pub struct GrpcServerRequestReceived;

impl InternalEvent for GrpcServerRequestReceived {
    fn emit(self) {
        counter!("grpc_server_messages_received_total").increment(1);
    }
}

#[derive(Debug)]
pub struct GrpcServerResponseSent<'a, B> {
    pub response: &'a Response<B>,
    pub latency: Duration,
}

impl<'a, B> InternalEvent for GrpcServerResponseSent<'a, B> {
    fn emit(self) {
        let grpc_code = self
            .response
            .headers()
            .get("grpc-status")
            // The header value is missing on success.
            .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
        let grpc_code = grpc_code_to_name(grpc_code);

        let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
        counter!("grpc_server_messages_sent_total", labels).increment(1);
        histogram!("grpc_server_handler_duration_seconds", labels).record(self.latency);
    }
}

#[derive(Debug)]
pub struct GrpcInvalidCompressionSchemeError<'a> {
    pub status: &'a tonic::Status,
}

impl InternalEvent for GrpcInvalidCompressionSchemeError<'_> {
    fn emit(self) {
        error!(
            message = "Invalid compression scheme.",
            error = ?self.status.message(),
            error_type = error_type::REQUEST_FAILED,
            stage = error_stage::RECEIVING,
            internal_log_rate_limit = true
        );
        counter!(
            "component_errors_total",
            "error_type" => error_type::REQUEST_FAILED,
            "stage" => error_stage::RECEIVING,
        )
        .increment(1);
    }
}

#[derive(Debug)]
pub struct GrpcError<E> {
    pub error: E,
}

impl<E> InternalEvent for GrpcError<E>
where
    E: std::fmt::Display,
{
    fn emit(self) {
        error!(
            message = "Grpc error.",
            error = %self.error,
            error_type = error_type::REQUEST_FAILED,
            stage = error_stage::RECEIVING,
            internal_log_rate_limit = true
        );
        counter!(
            "component_errors_total",
            "error_type" => error_type::REQUEST_FAILED,
            "stage" => error_stage::RECEIVING,
        )
        .increment(1);
    }
}

const fn grpc_code_to_name(code: Code) -> &'static str {
    match code {
        Code::Ok => "Ok",
        Code::Cancelled => "Cancelled",
        Code::Unknown => "Unknown",
        Code::InvalidArgument => "InvalidArgument",
        Code::DeadlineExceeded => "DeadlineExceeded",
        Code::NotFound => "NotFound",
        Code::AlreadyExists => "AlreadyExists",
        Code::PermissionDenied => "PermissionDenied",
        Code::ResourceExhausted => "ResourceExhausted",
        Code::FailedPrecondition => "FailedPrecondition",
        Code::Aborted => "Aborted",
        Code::OutOfRange => "OutOfRange",
        Code::Unimplemented => "Unimplemented",
        Code::Internal => "Internal",
        Code::Unavailable => "Unavailable",
        Code::DataLoss => "DataLoss",
        Code::Unauthenticated => "Unauthenticated",
    }
}