vector/internal_events/
amqp.rs1#[cfg(feature = "sources-amqp")]
2pub mod source {
3 use metrics::counter;
4 use vector_lib::NamedInternalEvent;
5 use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
6
7 #[derive(Debug, NamedInternalEvent)]
8 pub struct AmqpBytesReceived {
9 pub byte_size: usize,
10 pub protocol: &'static str,
11 }
12
13 impl InternalEvent for AmqpBytesReceived {
14 fn emit(self) {
15 trace!(
16 message = "Bytes received.",
17 byte_size = %self.byte_size,
18 protocol = %self.protocol,
19 );
20 counter!(
21 "component_received_bytes_total",
22 "protocol" => self.protocol,
23 )
24 .increment(self.byte_size as u64);
25 }
26 }
27
28 #[derive(Debug, NamedInternalEvent)]
29 pub struct AmqpEventError {
30 pub error: lapin::Error,
31 }
32
33 impl InternalEvent for AmqpEventError {
34 fn emit(self) {
35 error!(message = "Failed to read message.",
36 error = ?self.error,
37 error_type = error_type::REQUEST_FAILED,
38 stage = error_stage::RECEIVING,
39 );
40 counter!(
41 "component_errors_total",
42 "error_type" => error_type::REQUEST_FAILED,
43 "stage" => error_stage::RECEIVING,
44 )
45 .increment(1);
46 }
47 }
48
49 #[derive(Debug, NamedInternalEvent)]
50 pub struct AmqpAckError {
51 pub error: lapin::Error,
52 }
53
54 impl InternalEvent for AmqpAckError {
55 fn emit(self) {
56 error!(message = "Unable to ack.",
57 error = ?self.error,
58 error_type = error_type::ACKNOWLEDGMENT_FAILED,
59 stage = error_stage::RECEIVING,
60 );
61 counter!(
62 "component_errors_total",
63 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
64 "stage" => error_stage::RECEIVING,
65 )
66 .increment(1);
67 }
68 }
69
70 #[derive(Debug, NamedInternalEvent)]
71 pub struct AmqpRejectError {
72 pub error: lapin::Error,
73 }
74
75 impl InternalEvent for AmqpRejectError {
76 fn emit(self) {
77 error!(message = "Unable to reject.",
78 error = ?self.error,
79 error_type = error_type::COMMAND_FAILED,
80 stage = error_stage::RECEIVING,
81 );
82 counter!(
83 "component_errors_total",
84 "error_type" => error_type::COMMAND_FAILED,
85 "stage" => error_stage::RECEIVING,
86 )
87 .increment(1);
88 }
89 }
90}