vector/internal_events/
pulsar.rs

1use metrics::counter;
2#[cfg(feature = "sources-pulsar")]
3use metrics::Counter;
4use vector_lib::internal_event::{
5    error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL,
6};
7
8#[derive(Debug)]
9pub struct PulsarSendingError {
10    pub count: usize,
11    pub error: vector_lib::Error,
12}
13
14impl InternalEvent for PulsarSendingError {
15    fn emit(self) {
16        let reason = "A Pulsar sink generated an error.";
17        error!(
18            message = reason,
19            error = %self.error,
20            error_type = error_type::REQUEST_FAILED,
21            stage = error_stage::SENDING,
22            internal_log_rate_limit = true,
23        );
24        counter!(
25            "component_errors_total",
26            "error_type" => error_type::REQUEST_FAILED,
27            "stage" => error_stage::SENDING,
28        )
29        .increment(1);
30        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
31            count: self.count,
32            reason,
33        });
34    }
35}
36
37pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
38    pub property_field: F,
39}
40
41impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
42    fn emit(self) {
43        error!(
44            message = "Failed to extract properties. Value should be a map of String -> Bytes.",
45            error_code = "extracting_property",
46            error_type = error_type::PARSER_FAILED,
47            stage = error_stage::PROCESSING,
48            property_field = %self.property_field,
49            internal_log_rate_limit = true,
50        );
51        counter!(
52            "component_errors_total",
53            "error_code" => "extracting_property",
54            "error_type" => error_type::PARSER_FAILED,
55            "stage" => error_stage::PROCESSING,
56        )
57        .increment(1);
58    }
59}
60
61#[cfg(feature = "sources-pulsar")]
62pub enum PulsarErrorEventType {
63    Read,
64    Ack,
65    NAck,
66}
67
68#[cfg(feature = "sources-pulsar")]
69pub struct PulsarErrorEventData {
70    pub msg: String,
71    pub error_type: PulsarErrorEventType,
72}
73
74#[cfg(feature = "sources-pulsar")]
75registered_event!(
76    PulsarErrorEvent => {
77        ack_errors: Counter = counter!(
78            "component_errors_total",
79            "error_code" => "acknowledge_message",
80            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
81            "stage" => error_stage::RECEIVING,
82        ),
83
84        nack_errors: Counter = counter!(
85            "component_errors_total",
86            "error_code" => "negative_acknowledge_message",
87            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
88            "stage" => error_stage::RECEIVING,
89        ),
90
91        read_errors: Counter = counter!(
92            "component_errors_total",
93            "error_code" => "reading_message",
94            "error_type" => error_type::READER_FAILED,
95            "stage" => error_stage::RECEIVING,
96        ),
97    }
98
99    fn emit(&self,error:PulsarErrorEventData) {
100        match error.error_type{
101            PulsarErrorEventType::Read => {
102                error!(
103                    message = "Failed to read message.",
104                    error = error.msg,
105                    error_code = "reading_message",
106                    error_type = error_type::READER_FAILED,
107                    stage = error_stage::RECEIVING,
108                    internal_log_rate_limit = true,
109                );
110
111                self.read_errors.increment(1_u64);
112            }
113            PulsarErrorEventType::Ack => {
114                error!(
115                    message = "Failed to acknowledge message.",
116                    error = error.msg,
117                    error_code = "acknowledge_message",
118                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
119                    stage = error_stage::RECEIVING,
120                    internal_log_rate_limit = true,
121                );
122
123                self.ack_errors.increment(1_u64);
124            }
125            PulsarErrorEventType::NAck => {
126                error!(
127                    message = "Failed to negatively acknowledge message.",
128                    error = error.msg,
129                    error_code = "negative_acknowledge_message",
130                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
131                    stage = error_stage::RECEIVING,
132                    internal_log_rate_limit = true,
133                );
134
135                self.nack_errors.increment(1_u64);
136            }
137        }
138    }
139);