vector/internal_events/
pulsar.rs1#![allow(dead_code)] #[cfg(feature = "sources-pulsar")]
4use metrics::Counter;
5use metrics::counter;
6use vector_lib::NamedInternalEvent;
7use vector_lib::internal_event::{
8 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
9};
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct PulsarSendingError {
13 pub count: usize,
14 pub error: vector_lib::Error,
15}
16
17impl InternalEvent for PulsarSendingError {
18 fn emit(self) {
19 let reason = "A Pulsar sink generated an error.";
20 error!(
21 message = reason,
22 error = %self.error,
23 error_type = error_type::REQUEST_FAILED,
24 stage = error_stage::SENDING,
25 );
26 counter!(
27 "component_errors_total",
28 "error_type" => error_type::REQUEST_FAILED,
29 "stage" => error_stage::SENDING,
30 )
31 .increment(1);
32 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
33 count: self.count,
34 reason,
35 });
36 }
37}
38
39#[derive(NamedInternalEvent)]
40pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
41 pub property_field: F,
42}
43
44impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
45 fn emit(self) {
46 error!(
47 message = "Failed to extract properties. Value should be a map of String -> Bytes.",
48 error_code = "extracting_property",
49 error_type = error_type::PARSER_FAILED,
50 stage = error_stage::PROCESSING,
51 property_field = %self.property_field,
52 );
53 counter!(
54 "component_errors_total",
55 "error_code" => "extracting_property",
56 "error_type" => error_type::PARSER_FAILED,
57 "stage" => error_stage::PROCESSING,
58 )
59 .increment(1);
60 }
61}
62
63#[cfg(feature = "sources-pulsar")]
64pub enum PulsarErrorEventType {
65 Read,
66 Ack,
67 NAck,
68}
69
70#[cfg(feature = "sources-pulsar")]
71pub struct PulsarErrorEventData {
72 pub msg: String,
73 pub error_type: PulsarErrorEventType,
74}
75
76#[cfg(feature = "sources-pulsar")]
77registered_event!(
78 PulsarErrorEvent => {
79 ack_errors: Counter = counter!(
80 "component_errors_total",
81 "error_code" => "acknowledge_message",
82 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
83 "stage" => error_stage::RECEIVING,
84 ),
85
86 nack_errors: Counter = counter!(
87 "component_errors_total",
88 "error_code" => "negative_acknowledge_message",
89 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
90 "stage" => error_stage::RECEIVING,
91 ),
92
93 read_errors: Counter = counter!(
94 "component_errors_total",
95 "error_code" => "reading_message",
96 "error_type" => error_type::READER_FAILED,
97 "stage" => error_stage::RECEIVING,
98 ),
99 }
100
101 fn emit(&self,error:PulsarErrorEventData) {
102 match error.error_type{
103 PulsarErrorEventType::Read => {
104 error!(
105 message = "Failed to read message.",
106 error = error.msg,
107 error_code = "reading_message",
108 error_type = error_type::READER_FAILED,
109 stage = error_stage::RECEIVING,
110 );
111
112 self.read_errors.increment(1_u64);
113 }
114 PulsarErrorEventType::Ack => {
115 error!(
116 message = "Failed to acknowledge message.",
117 error = error.msg,
118 error_code = "acknowledge_message",
119 error_type = error_type::ACKNOWLEDGMENT_FAILED,
120 stage = error_stage::RECEIVING,
121 );
122
123 self.ack_errors.increment(1_u64);
124 }
125 PulsarErrorEventType::NAck => {
126 error!(
127 message = "Failed to negatively acknowledge message.",
128 error = error.msg,
129 error_code = "negative_acknowledge_message",
130 error_type = error_type::ACKNOWLEDGMENT_FAILED,
131 stage = error_stage::RECEIVING,
132 );
133
134 self.nack_errors.increment(1_u64);
135 }
136 }
137 }
138);