vector/internal_events/
amqp.rs

1#[cfg(feature = "sources-amqp")]
2pub mod source {
3    use metrics::counter;
4    use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
5
6    #[derive(Debug)]
7    pub struct AmqpBytesReceived {
8        pub byte_size: usize,
9        pub protocol: &'static str,
10    }
11
12    impl InternalEvent for AmqpBytesReceived {
13        fn emit(self) {
14            trace!(
15                message = "Bytes received.",
16                byte_size = %self.byte_size,
17                protocol = %self.protocol,
18            );
19            counter!(
20                "component_received_bytes_total",
21                "protocol" => self.protocol,
22            )
23            .increment(self.byte_size as u64);
24        }
25    }
26
27    #[derive(Debug)]
28    pub struct AmqpEventError {
29        pub error: lapin::Error,
30    }
31
32    impl InternalEvent for AmqpEventError {
33        fn emit(self) {
34            error!(message = "Failed to read message.",
35                   error = ?self.error,
36                   error_type = error_type::REQUEST_FAILED,
37                   stage = error_stage::RECEIVING,
38                   internal_log_rate_limit = true,
39            );
40            counter!(
41                "component_errors_total",
42                "error_type" => error_type::REQUEST_FAILED,
43                "stage" => error_stage::RECEIVING,
44            )
45            .increment(1);
46        }
47    }
48
49    #[derive(Debug)]
50    pub struct AmqpAckError {
51        pub error: lapin::Error,
52    }
53
54    impl InternalEvent for AmqpAckError {
55        fn emit(self) {
56            error!(message = "Unable to ack.",
57                   error = ?self.error,
58                   error_type = error_type::ACKNOWLEDGMENT_FAILED,
59                   stage = error_stage::RECEIVING,
60                   internal_log_rate_limit = true,
61            );
62            counter!(
63                "component_errors_total",
64                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
65                "stage" => error_stage::RECEIVING,
66            )
67            .increment(1);
68        }
69    }
70
71    #[derive(Debug)]
72    pub struct AmqpRejectError {
73        pub error: lapin::Error,
74    }
75
76    impl InternalEvent for AmqpRejectError {
77        fn emit(self) {
78            error!(message = "Unable to reject.",
79                   error = ?self.error,
80                   error_type = error_type::COMMAND_FAILED,
81                   stage = error_stage::RECEIVING,
82                   internal_log_rate_limit = true,
83            );
84            counter!(
85                "component_errors_total",
86                "error_type" => error_type::COMMAND_FAILED,
87                "stage" => error_stage::RECEIVING,
88            )
89            .increment(1);
90        }
91    }
92}