vector/internal_events/
pulsar.rs1use metrics::counter;
2#[cfg(feature = "sources-pulsar")]
3use metrics::Counter;
4use vector_lib::internal_event::{
5 error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL,
6};
7
8#[derive(Debug)]
9pub struct PulsarSendingError {
10 pub count: usize,
11 pub error: vector_lib::Error,
12}
13
14impl InternalEvent for PulsarSendingError {
15 fn emit(self) {
16 let reason = "A Pulsar sink generated an error.";
17 error!(
18 message = reason,
19 error = %self.error,
20 error_type = error_type::REQUEST_FAILED,
21 stage = error_stage::SENDING,
22 internal_log_rate_limit = true,
23 );
24 counter!(
25 "component_errors_total",
26 "error_type" => error_type::REQUEST_FAILED,
27 "stage" => error_stage::SENDING,
28 )
29 .increment(1);
30 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
31 count: self.count,
32 reason,
33 });
34 }
35}
36
37pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
38 pub property_field: F,
39}
40
41impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
42 fn emit(self) {
43 error!(
44 message = "Failed to extract properties. Value should be a map of String -> Bytes.",
45 error_code = "extracting_property",
46 error_type = error_type::PARSER_FAILED,
47 stage = error_stage::PROCESSING,
48 property_field = %self.property_field,
49 internal_log_rate_limit = true,
50 );
51 counter!(
52 "component_errors_total",
53 "error_code" => "extracting_property",
54 "error_type" => error_type::PARSER_FAILED,
55 "stage" => error_stage::PROCESSING,
56 )
57 .increment(1);
58 }
59}
60
61#[cfg(feature = "sources-pulsar")]
62pub enum PulsarErrorEventType {
63 Read,
64 Ack,
65 NAck,
66}
67
68#[cfg(feature = "sources-pulsar")]
69pub struct PulsarErrorEventData {
70 pub msg: String,
71 pub error_type: PulsarErrorEventType,
72}
73
74#[cfg(feature = "sources-pulsar")]
75registered_event!(
76 PulsarErrorEvent => {
77 ack_errors: Counter = counter!(
78 "component_errors_total",
79 "error_code" => "acknowledge_message",
80 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
81 "stage" => error_stage::RECEIVING,
82 ),
83
84 nack_errors: Counter = counter!(
85 "component_errors_total",
86 "error_code" => "negative_acknowledge_message",
87 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
88 "stage" => error_stage::RECEIVING,
89 ),
90
91 read_errors: Counter = counter!(
92 "component_errors_total",
93 "error_code" => "reading_message",
94 "error_type" => error_type::READER_FAILED,
95 "stage" => error_stage::RECEIVING,
96 ),
97 }
98
99 fn emit(&self,error:PulsarErrorEventData) {
100 match error.error_type{
101 PulsarErrorEventType::Read => {
102 error!(
103 message = "Failed to read message.",
104 error = error.msg,
105 error_code = "reading_message",
106 error_type = error_type::READER_FAILED,
107 stage = error_stage::RECEIVING,
108 internal_log_rate_limit = true,
109 );
110
111 self.read_errors.increment(1_u64);
112 }
113 PulsarErrorEventType::Ack => {
114 error!(
115 message = "Failed to acknowledge message.",
116 error = error.msg,
117 error_code = "acknowledge_message",
118 error_type = error_type::ACKNOWLEDGMENT_FAILED,
119 stage = error_stage::RECEIVING,
120 internal_log_rate_limit = true,
121 );
122
123 self.ack_errors.increment(1_u64);
124 }
125 PulsarErrorEventType::NAck => {
126 error!(
127 message = "Failed to negatively acknowledge message.",
128 error = error.msg,
129 error_code = "negative_acknowledge_message",
130 error_type = error_type::ACKNOWLEDGMENT_FAILED,
131 stage = error_stage::RECEIVING,
132 internal_log_rate_limit = true,
133 );
134
135 self.nack_errors.increment(1_u64);
136 }
137 }
138 }
139);