vector/internal_events/
amqp.rs

1#[cfg(feature = "sources-amqp")]
2pub mod source {
3    use metrics::counter;
4    use vector_lib::NamedInternalEvent;
5    use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
6
7    #[derive(Debug, NamedInternalEvent)]
8    pub struct AmqpBytesReceived {
9        pub byte_size: usize,
10        pub protocol: &'static str,
11    }
12
13    impl InternalEvent for AmqpBytesReceived {
14        fn emit(self) {
15            trace!(
16                message = "Bytes received.",
17                byte_size = %self.byte_size,
18                protocol = %self.protocol,
19            );
20            counter!(
21                "component_received_bytes_total",
22                "protocol" => self.protocol,
23            )
24            .increment(self.byte_size as u64);
25        }
26    }
27
28    #[derive(Debug, NamedInternalEvent)]
29    pub struct AmqpEventError {
30        pub error: lapin::Error,
31    }
32
33    impl InternalEvent for AmqpEventError {
34        fn emit(self) {
35            error!(message = "Failed to read message.",
36                   error = ?self.error,
37                   error_type = error_type::REQUEST_FAILED,
38                   stage = error_stage::RECEIVING,
39            );
40            counter!(
41                "component_errors_total",
42                "error_type" => error_type::REQUEST_FAILED,
43                "stage" => error_stage::RECEIVING,
44            )
45            .increment(1);
46        }
47    }
48
49    #[derive(Debug, NamedInternalEvent)]
50    pub struct AmqpAckError {
51        pub error: lapin::Error,
52    }
53
54    impl InternalEvent for AmqpAckError {
55        fn emit(self) {
56            error!(message = "Unable to ack.",
57                   error = ?self.error,
58                   error_type = error_type::ACKNOWLEDGMENT_FAILED,
59                   stage = error_stage::RECEIVING,
60            );
61            counter!(
62                "component_errors_total",
63                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
64                "stage" => error_stage::RECEIVING,
65            )
66            .increment(1);
67        }
68    }
69
70    #[derive(Debug, NamedInternalEvent)]
71    pub struct AmqpRejectError {
72        pub error: lapin::Error,
73    }
74
75    impl InternalEvent for AmqpRejectError {
76        fn emit(self) {
77            error!(message = "Unable to reject.",
78                   error = ?self.error,
79                   error_type = error_type::COMMAND_FAILED,
80                   stage = error_stage::RECEIVING,
81            );
82            counter!(
83                "component_errors_total",
84                "error_type" => error_type::COMMAND_FAILED,
85                "stage" => error_stage::RECEIVING,
86            )
87            .increment(1);
88        }
89    }
90}