vector/internal_events/
amqp.rs1#[cfg(feature = "sources-amqp")]
2pub mod source {
3 use metrics::counter;
4 use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
5
6 #[derive(Debug)]
7 pub struct AmqpBytesReceived {
8 pub byte_size: usize,
9 pub protocol: &'static str,
10 }
11
12 impl InternalEvent for AmqpBytesReceived {
13 fn emit(self) {
14 trace!(
15 message = "Bytes received.",
16 byte_size = %self.byte_size,
17 protocol = %self.protocol,
18 );
19 counter!(
20 "component_received_bytes_total",
21 "protocol" => self.protocol,
22 )
23 .increment(self.byte_size as u64);
24 }
25 }
26
27 #[derive(Debug)]
28 pub struct AmqpEventError {
29 pub error: lapin::Error,
30 }
31
32 impl InternalEvent for AmqpEventError {
33 fn emit(self) {
34 error!(message = "Failed to read message.",
35 error = ?self.error,
36 error_type = error_type::REQUEST_FAILED,
37 stage = error_stage::RECEIVING,
38 internal_log_rate_limit = true,
39 );
40 counter!(
41 "component_errors_total",
42 "error_type" => error_type::REQUEST_FAILED,
43 "stage" => error_stage::RECEIVING,
44 )
45 .increment(1);
46 }
47 }
48
49 #[derive(Debug)]
50 pub struct AmqpAckError {
51 pub error: lapin::Error,
52 }
53
54 impl InternalEvent for AmqpAckError {
55 fn emit(self) {
56 error!(message = "Unable to ack.",
57 error = ?self.error,
58 error_type = error_type::ACKNOWLEDGMENT_FAILED,
59 stage = error_stage::RECEIVING,
60 internal_log_rate_limit = true,
61 );
62 counter!(
63 "component_errors_total",
64 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
65 "stage" => error_stage::RECEIVING,
66 )
67 .increment(1);
68 }
69 }
70
71 #[derive(Debug)]
72 pub struct AmqpRejectError {
73 pub error: lapin::Error,
74 }
75
76 impl InternalEvent for AmqpRejectError {
77 fn emit(self) {
78 error!(message = "Unable to reject.",
79 error = ?self.error,
80 error_type = error_type::COMMAND_FAILED,
81 stage = error_stage::RECEIVING,
82 internal_log_rate_limit = true,
83 );
84 counter!(
85 "component_errors_total",
86 "error_type" => error_type::COMMAND_FAILED,
87 "stage" => error_stage::RECEIVING,
88 )
89 .increment(1);
90 }
91 }
92}