vector/internal_events/
gcp_pubsub.rs

1use metrics::counter;
2use vector_lib::internal_event::InternalEvent;
3use vector_lib::internal_event::{error_stage, error_type};
4
5pub struct GcpPubsubConnectError {
6    pub error: tonic::transport::Error,
7}
8
9impl InternalEvent for GcpPubsubConnectError {
10    fn emit(self) {
11        error!(
12            message = "Failed to connect to the server.",
13            error = %self.error,
14            error_code = "failed_connecting",
15            error_type = error_type::CONNECTION_FAILED,
16            stage = error_stage::RECEIVING,
17            internal_log_rate_limit = true,
18        );
19
20        counter!(
21            "component_errors_total",
22            "error_code" => "failed_connecting",
23            "error_type" => error_type::CONNECTION_FAILED,
24            "stage" => error_stage::RECEIVING,
25        )
26        .increment(1);
27    }
28}
29
30pub struct GcpPubsubStreamingPullError {
31    pub error: tonic::Status,
32}
33
34impl InternalEvent for GcpPubsubStreamingPullError {
35    fn emit(self) {
36        error!(
37            message = "Failed to set up streaming pull.",
38            error = %self.error,
39            error_code = "failed_streaming_pull",
40            error_type = error_type::REQUEST_FAILED,
41            stage = error_stage::RECEIVING,
42            internal_log_rate_limit = true,
43        );
44
45        counter!(
46            "component_errors_total",
47            "error_code" => "failed_streaming_pull",
48            "error_type" => error_type::REQUEST_FAILED,
49            "stage" => error_stage::RECEIVING,
50        )
51        .increment(1);
52    }
53}
54
55pub struct GcpPubsubReceiveError {
56    pub error: tonic::Status,
57}
58
59impl InternalEvent for GcpPubsubReceiveError {
60    fn emit(self) {
61        error!(
62            message = "Failed to fetch events.",
63            error = %self.error,
64            error_code = "failed_fetching_events",
65            error_type = error_type::REQUEST_FAILED,
66            stage = error_stage::RECEIVING,
67            internal_log_rate_limit = true,
68        );
69
70        counter!(
71            "component_errors_total",
72            "error_code" => "failed_fetching_events",
73            "error_type" => error_type::REQUEST_FAILED,
74            "stage" => error_stage::RECEIVING,
75        )
76        .increment(1);
77    }
78}