1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use metrics::counter;
#[cfg(feature = "sources-pulsar")]
use metrics::Counter;
use vector_lib::internal_event::{
    error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL,
};

#[derive(Debug)]
pub struct PulsarSendingError {
    pub count: usize,
    pub error: vector_lib::Error,
}

impl InternalEvent for PulsarSendingError {
    fn emit(self) {
        let reason = "A Pulsar sink generated an error.";
        error!(
            message = reason,
            error = %self.error,
            error_type = error_type::REQUEST_FAILED,
            stage = error_stage::SENDING,
            internal_log_rate_limit = true,
        );
        counter!(
            "component_errors_total",
            "error_type" => error_type::REQUEST_FAILED,
            "stage" => error_stage::SENDING,
        )
        .increment(1);
        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
            count: self.count,
            reason,
        });
    }
}

pub struct PulsarPropertyExtractionError<F: std::fmt::Display> {
    pub property_field: F,
}

impl<F: std::fmt::Display> InternalEvent for PulsarPropertyExtractionError<F> {
    fn emit(self) {
        error!(
            message = "Failed to extract properties. Value should be a map of String -> Bytes.",
            error_code = "extracting_property",
            error_type = error_type::PARSER_FAILED,
            stage = error_stage::PROCESSING,
            property_field = %self.property_field,
            internal_log_rate_limit = true,
        );
        counter!(
            "component_errors_total",
            "error_code" => "extracting_property",
            "error_type" => error_type::PARSER_FAILED,
            "stage" => error_stage::PROCESSING,
        )
        .increment(1);
    }
}

#[cfg(feature = "sources-pulsar")]
pub enum PulsarErrorEventType {
    Read,
    Ack,
    NAck,
}

#[cfg(feature = "sources-pulsar")]
pub struct PulsarErrorEventData {
    pub msg: String,
    pub error_type: PulsarErrorEventType,
}

#[cfg(feature = "sources-pulsar")]
registered_event!(
    PulsarErrorEvent => {
        ack_errors: Counter = counter!(
            "component_errors_total",
            "error_code" => "acknowledge_message",
            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
            "stage" => error_stage::RECEIVING,
        ),

        nack_errors: Counter = counter!(
            "component_errors_total",
            "error_code" => "negative_acknowledge_message",
            "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
            "stage" => error_stage::RECEIVING,
        ),

        read_errors: Counter = counter!(
            "component_errors_total",
            "error_code" => "reading_message",
            "error_type" => error_type::READER_FAILED,
            "stage" => error_stage::RECEIVING,
        ),
    }

    fn emit(&self,error:PulsarErrorEventData) {
        match error.error_type{
            PulsarErrorEventType::Read => {
                error!(
                    message = "Failed to read message.",
                    error = error.msg,
                    error_code = "reading_message",
                    error_type = error_type::READER_FAILED,
                    stage = error_stage::RECEIVING,
                    internal_log_rate_limit = true,
                );

                self.read_errors.increment(1_u64);
            }
            PulsarErrorEventType::Ack => {
                error!(
                    message = "Failed to acknowledge message.",
                    error = error.msg,
                    error_code = "acknowledge_message",
                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
                    stage = error_stage::RECEIVING,
                    internal_log_rate_limit = true,
                );

                self.ack_errors.increment(1_u64);
            }
            PulsarErrorEventType::NAck => {
                error!(
                    message = "Failed to negatively acknowledge message.",
                    error = error.msg,
                    error_code = "negative_acknowledge_message",
                    error_type = error_type::ACKNOWLEDGMENT_FAILED,
                    stage = error_stage::RECEIVING,
                    internal_log_rate_limit = true,
                );

                self.nack_errors.increment(1_u64);
            }
        }
    }
);