vector/internal_events/
amqp.rs1#[cfg(feature = "sources-amqp")]
2pub mod source {
3 use metrics::counter;
4 use vector_lib::internal_event::InternalEvent;
5 use vector_lib::internal_event::{error_stage, error_type};
6
7 #[derive(Debug)]
8 pub struct AmqpBytesReceived {
9 pub byte_size: usize,
10 pub protocol: &'static str,
11 }
12
13 impl InternalEvent for AmqpBytesReceived {
14 fn emit(self) {
15 trace!(
16 message = "Bytes received.",
17 byte_size = %self.byte_size,
18 protocol = %self.protocol,
19 );
20 counter!(
21 "component_received_bytes_total",
22 "protocol" => self.protocol,
23 )
24 .increment(self.byte_size as u64);
25 }
26 }
27
28 #[derive(Debug)]
29 pub struct AmqpEventError {
30 pub error: lapin::Error,
31 }
32
33 impl InternalEvent for AmqpEventError {
34 fn emit(self) {
35 error!(message = "Failed to read message.",
36 error = ?self.error,
37 error_type = error_type::REQUEST_FAILED,
38 stage = error_stage::RECEIVING,
39 internal_log_rate_limit = true,
40 );
41 counter!(
42 "component_errors_total",
43 "error_type" => error_type::REQUEST_FAILED,
44 "stage" => error_stage::RECEIVING,
45 )
46 .increment(1);
47 }
48 }
49
50 #[derive(Debug)]
51 pub struct AmqpAckError {
52 pub error: lapin::Error,
53 }
54
55 impl InternalEvent for AmqpAckError {
56 fn emit(self) {
57 error!(message = "Unable to ack.",
58 error = ?self.error,
59 error_type = error_type::ACKNOWLEDGMENT_FAILED,
60 stage = error_stage::RECEIVING,
61 internal_log_rate_limit = true,
62 );
63 counter!(
64 "component_errors_total",
65 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
66 "stage" => error_stage::RECEIVING,
67 )
68 .increment(1);
69 }
70 }
71
72 #[derive(Debug)]
73 pub struct AmqpRejectError {
74 pub error: lapin::Error,
75 }
76
77 impl InternalEvent for AmqpRejectError {
78 fn emit(self) {
79 error!(message = "Unable to reject.",
80 error = ?self.error,
81 error_type = error_type::COMMAND_FAILED,
82 stage = error_stage::RECEIVING,
83 internal_log_rate_limit = true,
84 );
85 counter!(
86 "component_errors_total",
87 "error_type" => error_type::COMMAND_FAILED,
88 "stage" => error_stage::RECEIVING,
89 )
90 .increment(1);
91 }
92 }
93}