vector/internal_events/
amqp.rs

1#[cfg(feature = "sources-amqp")]
2pub mod source {
3    use metrics::counter;
4    use vector_lib::internal_event::InternalEvent;
5    use vector_lib::internal_event::{error_stage, error_type};
6
7    #[derive(Debug)]
8    pub struct AmqpBytesReceived {
9        pub byte_size: usize,
10        pub protocol: &'static str,
11    }
12
13    impl InternalEvent for AmqpBytesReceived {
14        fn emit(self) {
15            trace!(
16                message = "Bytes received.",
17                byte_size = %self.byte_size,
18                protocol = %self.protocol,
19            );
20            counter!(
21                "component_received_bytes_total",
22                "protocol" => self.protocol,
23            )
24            .increment(self.byte_size as u64);
25        }
26    }
27
28    #[derive(Debug)]
29    pub struct AmqpEventError {
30        pub error: lapin::Error,
31    }
32
33    impl InternalEvent for AmqpEventError {
34        fn emit(self) {
35            error!(message = "Failed to read message.",
36                   error = ?self.error,
37                   error_type = error_type::REQUEST_FAILED,
38                   stage = error_stage::RECEIVING,
39                   internal_log_rate_limit = true,
40            );
41            counter!(
42                "component_errors_total",
43                "error_type" => error_type::REQUEST_FAILED,
44                "stage" => error_stage::RECEIVING,
45            )
46            .increment(1);
47        }
48    }
49
50    #[derive(Debug)]
51    pub struct AmqpAckError {
52        pub error: lapin::Error,
53    }
54
55    impl InternalEvent for AmqpAckError {
56        fn emit(self) {
57            error!(message = "Unable to ack.",
58                   error = ?self.error,
59                   error_type = error_type::ACKNOWLEDGMENT_FAILED,
60                   stage = error_stage::RECEIVING,
61                   internal_log_rate_limit = true,
62            );
63            counter!(
64                "component_errors_total",
65                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
66                "stage" => error_stage::RECEIVING,
67            )
68            .increment(1);
69        }
70    }
71
72    #[derive(Debug)]
73    pub struct AmqpRejectError {
74        pub error: lapin::Error,
75    }
76
77    impl InternalEvent for AmqpRejectError {
78        fn emit(self) {
79            error!(message = "Unable to reject.",
80                   error = ?self.error,
81                   error_type = error_type::COMMAND_FAILED,
82                   stage = error_stage::RECEIVING,
83                   internal_log_rate_limit = true,
84            );
85            counter!(
86                "component_errors_total",
87                "error_type" => error_type::COMMAND_FAILED,
88                "stage" => error_stage::RECEIVING,
89            )
90            .increment(1);
91        }
92    }
93}