vector/internal_events/
grpc.rs1use std::time::Duration;
2
3use http::response::Response;
4use metrics::{counter, histogram};
5use tonic::Code;
6use vector_lib::NamedInternalEvent;
7use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
8
9const GRPC_STATUS_LABEL: &str = "grpc_status";
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct GrpcServerRequestReceived;
13
14impl InternalEvent for GrpcServerRequestReceived {
15 fn emit(self) {
16 counter!("grpc_server_messages_received_total").increment(1);
17 }
18}
19
20#[derive(Debug, NamedInternalEvent)]
21pub struct GrpcServerResponseSent<'a, B> {
22 pub response: &'a Response<B>,
23 pub latency: Duration,
24}
25
26impl<B> InternalEvent for GrpcServerResponseSent<'_, B> {
27 fn emit(self) {
28 let grpc_code = self
29 .response
30 .headers()
31 .get("grpc-status")
32 .map_or(tonic::Code::Ok, |v| tonic::Code::from_bytes(v.as_bytes()));
34 let grpc_code = grpc_code_to_name(grpc_code);
35
36 let labels = &[(GRPC_STATUS_LABEL, grpc_code)];
37 counter!("grpc_server_messages_sent_total", labels).increment(1);
38 histogram!("grpc_server_handler_duration_seconds", labels).record(self.latency);
39 }
40}
41
42#[derive(Debug, NamedInternalEvent)]
43pub struct GrpcInvalidCompressionSchemeError<'a> {
44 pub status: &'a tonic::Status,
45}
46
47impl InternalEvent for GrpcInvalidCompressionSchemeError<'_> {
48 fn emit(self) {
49 error!(
50 message = "Invalid compression scheme.",
51 error = ?self.status.message(),
52 error_type = error_type::REQUEST_FAILED,
53 stage = error_stage::RECEIVING
54 );
55 counter!(
56 "component_errors_total",
57 "error_type" => error_type::REQUEST_FAILED,
58 "stage" => error_stage::RECEIVING,
59 )
60 .increment(1);
61 }
62}
63
64#[derive(Debug, NamedInternalEvent)]
65pub struct GrpcError<E> {
66 pub error: E,
67}
68
69impl<E> InternalEvent for GrpcError<E>
70where
71 E: std::fmt::Display,
72{
73 fn emit(self) {
74 error!(
75 message = "Grpc error.",
76 error = %self.error,
77 error_type = error_type::REQUEST_FAILED,
78 stage = error_stage::RECEIVING
79 );
80 counter!(
81 "component_errors_total",
82 "error_type" => error_type::REQUEST_FAILED,
83 "stage" => error_stage::RECEIVING,
84 )
85 .increment(1);
86 }
87}
88
89const fn grpc_code_to_name(code: Code) -> &'static str {
90 match code {
91 Code::Ok => "Ok",
92 Code::Cancelled => "Cancelled",
93 Code::Unknown => "Unknown",
94 Code::InvalidArgument => "InvalidArgument",
95 Code::DeadlineExceeded => "DeadlineExceeded",
96 Code::NotFound => "NotFound",
97 Code::AlreadyExists => "AlreadyExists",
98 Code::PermissionDenied => "PermissionDenied",
99 Code::ResourceExhausted => "ResourceExhausted",
100 Code::FailedPrecondition => "FailedPrecondition",
101 Code::Aborted => "Aborted",
102 Code::OutOfRange => "OutOfRange",
103 Code::Unimplemented => "Unimplemented",
104 Code::Internal => "Internal",
105 Code::Unavailable => "Unavailable",
106 Code::DataLoss => "DataLoss",
107 Code::Unauthenticated => "Unauthenticated",
108 }
109}