vector/internal_events/
pulsar.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3#[cfg(feature = "sources-pulsar")]
4use metrics::Counter;
5use metrics::counter;
6use vector_lib::internal_event::{
7    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8};
9
10#[derive(Debug)]
11pub struct PulsarSendingError {
12    pub count: usize,
13    pub error: vector_lib::Error,
14}
15
16impl InternalEvent for PulsarSendingError {
17    fn emit(self) {
18        let reason = "A Pulsar sink generated an error.";
19        error!(
20            message = reason,
21            error = %self.error,
22            error_type = error_type::REQUEST_FAILED,
23            stage = error_stage::SENDING,
24            internal_log_rate_limit = true,
25        );
26        counter!(
27            "component_errors_total",
28            "error_type" => error_type::REQUEST_FAILED,
29            "stage" => error_stage::SENDING,
30        )
31        .increment(1);
32        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
33            count: self.count,
34            reason,
35        });
36    }
37}
38
39pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
40    pub property_field: F,
41}
42
43impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
44    fn emit(self) {
45        error!(
46            message = "Failed to extract properties. Value should be a map of String -> Bytes.",
47            error_code = "extracting_property",
48            error_type = error_type::PARSER_FAILED,
49            stage = error_stage::PROCESSING,
50            property_field = %self.property_field,
51            internal_log_rate_limit = true,
52        );
53        counter!(
54            "component_errors_total",
55            "error_code" => "extracting_property",
56            "error_type" => error_type::PARSER_FAILED,
57            "stage" => error_stage::PROCESSING,
58        )
59        .increment(1);
60    }
61}
62
63#[cfg(feature = "sources-pulsar")]
64pub enum PulsarErrorEventType {
65    Read,
66    Ack,
67    NAck,
68}
69
70#[cfg(feature = "sources-pulsar")]
71pub struct PulsarErrorEventData {
72    pub msg: String,
73    pub error_type: PulsarErrorEventType,
74}
75
76#[cfg(feature = "sources-pulsar")]
77registered_event!(
78    PulsarErrorEvent => {
79        ack_errors: Counter = counter!(
80            "component_errors_total",
81            "error_code" => "acknowledge_message",
82            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
83            "stage" => error_stage::RECEIVING,
84        ),
85
86        nack_errors: Counter = counter!(
87            "component_errors_total",
88            "error_code" => "negative_acknowledge_message",
89            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
90            "stage" => error_stage::RECEIVING,
91        ),
92
93        read_errors: Counter = counter!(
94            "component_errors_total",
95            "error_code" => "reading_message",
96            "error_type" => error_type::READER_FAILED,
97            "stage" => error_stage::RECEIVING,
98        ),
99    }
100
101    fn emit(&self,error:PulsarErrorEventData) {
102        match error.error_type{
103            PulsarErrorEventType::Read => {
104                error!(
105                    message = "Failed to read message.",
106                    error = error.msg,
107                    error_code = "reading_message",
108                    error_type = error_type::READER_FAILED,
109                    stage = error_stage::RECEIVING,
110                    internal_log_rate_limit = true,
111                );
112
113                self.read_errors.increment(1_u64);
114            }
115            PulsarErrorEventType::Ack => {
116                error!(
117                    message = "Failed to acknowledge message.",
118                    error = error.msg,
119                    error_code = "acknowledge_message",
120                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
121                    stage = error_stage::RECEIVING,
122                    internal_log_rate_limit = true,
123                );
124
125                self.ack_errors.increment(1_u64);
126            }
127            PulsarErrorEventType::NAck => {
128                error!(
129                    message = "Failed to negatively acknowledge message.",
130                    error = error.msg,
131                    error_code = "negative_acknowledge_message",
132                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
133                    stage = error_stage::RECEIVING,
134                    internal_log_rate_limit = true,
135                );
136
137                self.nack_errors.increment(1_u64);
138            }
139        }
140    }
141);