vector/internal_events/
gcp_pubsub.rs

1use metrics::counter;
2use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
3
4pub struct GcpPubsubConnectError {
5    pub error: tonic::transport::Error,
6}
7
8impl InternalEvent for GcpPubsubConnectError {
9    fn emit(self) {
10        error!(
11            message = "Failed to connect to the server.",
12            error = %self.error,
13            error_code = "failed_connecting",
14            error_type = error_type::CONNECTION_FAILED,
15            stage = error_stage::RECEIVING,
16            internal_log_rate_limit = true,
17        );
18
19        counter!(
20            "component_errors_total",
21            "error_code" => "failed_connecting",
22            "error_type" => error_type::CONNECTION_FAILED,
23            "stage" => error_stage::RECEIVING,
24        )
25        .increment(1);
26    }
27}
28
29pub struct GcpPubsubStreamingPullError {
30    pub error: tonic::Status,
31}
32
33impl InternalEvent for GcpPubsubStreamingPullError {
34    fn emit(self) {
35        error!(
36            message = "Failed to set up streaming pull.",
37            error = %self.error,
38            error_code = "failed_streaming_pull",
39            error_type = error_type::REQUEST_FAILED,
40            stage = error_stage::RECEIVING,
41            internal_log_rate_limit = true,
42        );
43
44        counter!(
45            "component_errors_total",
46            "error_code" => "failed_streaming_pull",
47            "error_type" => error_type::REQUEST_FAILED,
48            "stage" => error_stage::RECEIVING,
49        )
50        .increment(1);
51    }
52}
53
54pub struct GcpPubsubReceiveError {
55    pub error: tonic::Status,
56}
57
58impl InternalEvent for GcpPubsubReceiveError {
59    fn emit(self) {
60        error!(
61            message = "Failed to fetch events.",
62            error = %self.error,
63            error_code = "failed_fetching_events",
64            error_type = error_type::REQUEST_FAILED,
65            stage = error_stage::RECEIVING,
66            internal_log_rate_limit = true,
67        );
68
69        counter!(
70            "component_errors_total",
71            "error_code" => "failed_fetching_events",
72            "error_type" => error_type::REQUEST_FAILED,
73            "stage" => error_stage::RECEIVING,
74        )
75        .increment(1);
76    }
77}