vector/internal_events/
gcp_pubsub.rs

1use metrics::counter;
2use vector_lib::NamedInternalEvent;
3use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
4
5#[derive(NamedInternalEvent)]
6pub struct GcpPubsubConnectError {
7    pub error: tonic::transport::Error,
8}
9
10impl InternalEvent for GcpPubsubConnectError {
11    fn emit(self) {
12        error!(
13            message = "Failed to connect to the server.",
14            error = %self.error,
15            error_code = "failed_connecting",
16            error_type = error_type::CONNECTION_FAILED,
17            stage = error_stage::RECEIVING,
18        );
19
20        counter!(
21            "component_errors_total",
22            "error_code" => "failed_connecting",
23            "error_type" => error_type::CONNECTION_FAILED,
24            "stage" => error_stage::RECEIVING,
25        )
26        .increment(1);
27    }
28}
29
30#[derive(NamedInternalEvent)]
31pub struct GcpPubsubStreamingPullError {
32    pub error: tonic::Status,
33}
34
35impl InternalEvent for GcpPubsubStreamingPullError {
36    fn emit(self) {
37        error!(
38            message = "Failed to set up streaming pull.",
39            error = %self.error,
40            error_code = "failed_streaming_pull",
41            error_type = error_type::REQUEST_FAILED,
42            stage = error_stage::RECEIVING,
43        );
44
45        counter!(
46            "component_errors_total",
47            "error_code" => "failed_streaming_pull",
48            "error_type" => error_type::REQUEST_FAILED,
49            "stage" => error_stage::RECEIVING,
50        )
51        .increment(1);
52    }
53}
54
55#[derive(NamedInternalEvent)]
56pub struct GcpPubsubReceiveError {
57    pub error: tonic::Status,
58}
59
60impl InternalEvent for GcpPubsubReceiveError {
61    fn emit(self) {
62        error!(
63            message = "Failed to fetch events.",
64            error = %self.error,
65            error_code = "failed_fetching_events",
66            error_type = error_type::REQUEST_FAILED,
67            stage = error_stage::RECEIVING,
68        );
69
70        counter!(
71            "component_errors_total",
72            "error_code" => "failed_fetching_events",
73            "error_type" => error_type::REQUEST_FAILED,
74            "stage" => error_stage::RECEIVING,
75        )
76        .increment(1);
77    }
78}