vector/internal_events/
aws_sqs.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use metrics::counter;
4#[cfg(feature = "sources-aws_s3")]
5pub use s3::*;
6use vector_lib::internal_event::InternalEvent;
7#[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs"))]
8use vector_lib::internal_event::{error_stage, error_type};
9
10#[cfg(feature = "sources-aws_s3")]
11mod s3 {
12    use aws_sdk_sqs::types::{
13        BatchResultErrorEntry, DeleteMessageBatchRequestEntry, DeleteMessageBatchResultEntry,
14        SendMessageBatchRequestEntry, SendMessageBatchResultEntry,
15    };
16
17    use super::*;
18    use crate::sources::aws_s3::sqs::ProcessingError;
19
20    #[derive(Debug)]
21    pub struct SqsMessageProcessingError<'a> {
22        pub message_id: &'a str,
23        pub error: &'a ProcessingError,
24    }
25
26    impl InternalEvent for SqsMessageProcessingError<'_> {
27        fn emit(self) {
28            error!(
29                message = "Failed to process SQS message.",
30                message_id = %self.message_id,
31                error = %self.error,
32                error_code = "failed_processing_sqs_message",
33                error_type = error_type::PARSER_FAILED,
34                stage = error_stage::PROCESSING,
35                internal_log_rate_limit = true,
36            );
37            counter!(
38                "component_errors_total",
39                "error_code" => "failed_processing_sqs_message",
40                "error_type" => error_type::PARSER_FAILED,
41                "stage" => error_stage::PROCESSING,
42            )
43            .increment(1);
44        }
45    }
46
47    #[derive(Debug)]
48    pub struct SqsMessageDeleteSucceeded {
49        pub message_ids: Vec<DeleteMessageBatchResultEntry>,
50    }
51
52    impl InternalEvent for SqsMessageDeleteSucceeded {
53        fn emit(self) {
54            trace!(message = "Deleted SQS message(s).",
55            message_ids = %self.message_ids.iter()
56                .map(|x| x.id.as_str())
57                .collect::<Vec<_>>()
58                .join(", "));
59            counter!("sqs_message_delete_succeeded_total").increment(self.message_ids.len() as u64);
60        }
61    }
62
63    #[derive(Debug)]
64    pub struct SqsMessageDeletePartialError {
65        pub entries: Vec<BatchResultErrorEntry>,
66    }
67
68    impl InternalEvent for SqsMessageDeletePartialError {
69        fn emit(self) {
70            error!(
71                message = "Deletion of SQS message(s) failed.",
72                message_ids = %self.entries.iter()
73                    .map(|x| format!("{}/{}", x.id, x.code))
74                    .collect::<Vec<_>>()
75                    .join(", "),
76                error_code = "failed_deleting_some_sqs_messages",
77                error_type = error_type::ACKNOWLEDGMENT_FAILED,
78                stage = error_stage::PROCESSING,
79                internal_log_rate_limit = true,
80            );
81            counter!(
82                "component_errors_total",
83                "error_code" => "failed_deleting_some_sqs_messages",
84                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
85                "stage" => error_stage::PROCESSING,
86            )
87            .increment(1);
88        }
89    }
90
91    #[derive(Debug)]
92    pub struct SqsMessageDeleteBatchError<E> {
93        pub entries: Vec<DeleteMessageBatchRequestEntry>,
94        pub error: E,
95    }
96
97    impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteBatchError<E> {
98        fn emit(self) {
99            error!(
100                message = "Deletion of SQS message(s) failed.",
101                message_ids = %self.entries.iter()
102                    .map(|x| x.id.as_str())
103                    .collect::<Vec<_>>()
104                    .join(", "),
105                error = %self.error,
106                error_code = "failed_deleting_all_sqs_messages",
107                error_type = error_type::ACKNOWLEDGMENT_FAILED,
108                stage = error_stage::PROCESSING,
109                internal_log_rate_limit = true,
110            );
111            counter!(
112                "component_errors_total",
113                "error_code" => "failed_deleting_all_sqs_messages",
114                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
115                "stage" => error_stage::PROCESSING,
116            )
117            .increment(1);
118        }
119    }
120
121    #[derive(Debug)]
122    pub struct SqsMessageSentSucceeded {
123        pub message_ids: Vec<SendMessageBatchResultEntry>,
124    }
125
126    impl InternalEvent for SqsMessageSentSucceeded {
127        fn emit(self) {
128            trace!(message = "Deferred SQS message(s).",
129            message_ids = %self.message_ids.iter()
130                .map(|x| x.id.as_str())
131                .collect::<Vec<_>>()
132                .join(", "));
133            counter!("sqs_message_defer_succeeded_total").increment(self.message_ids.len() as u64);
134        }
135    }
136
137    #[derive(Debug)]
138    pub struct SqsMessageSentPartialError {
139        pub entries: Vec<BatchResultErrorEntry>,
140    }
141
142    impl InternalEvent for SqsMessageSentPartialError {
143        fn emit(self) {
144            error!(
145                message = "Sending of deferred SQS message(s) failed.",
146                message_ids = %self.entries.iter()
147                    .map(|x| format!("{}/{}", x.id, x.code))
148                    .collect::<Vec<_>>()
149                    .join(", "),
150                error_code = "failed_deferring_some_sqs_messages",
151                error_type = error_type::ACKNOWLEDGMENT_FAILED,
152                stage = error_stage::PROCESSING,
153                internal_log_rate_limit = true,
154            );
155            counter!(
156                "component_errors_total",
157                "error_code" => "failed_deferring_some_sqs_messages",
158                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
159                "stage" => error_stage::PROCESSING,
160            )
161            .increment(1);
162        }
163    }
164
165    #[derive(Debug)]
166    pub struct SqsMessageSendBatchError<E> {
167        pub entries: Vec<SendMessageBatchRequestEntry>,
168        pub error: E,
169    }
170
171    impl<E: std::fmt::Display> InternalEvent for SqsMessageSendBatchError<E> {
172        fn emit(self) {
173            error!(
174                message = "Sending of deferred SQS message(s) failed.",
175                message_ids = %self.entries.iter()
176                    .map(|x| x.id.as_str())
177                    .collect::<Vec<_>>()
178                    .join(", "),
179                error = %self.error,
180                error_code = "failed_deferring_all_sqs_messages",
181                error_type = error_type::ACKNOWLEDGMENT_FAILED,
182                stage = error_stage::PROCESSING,
183                internal_log_rate_limit = true,
184            );
185            counter!(
186                "component_errors_total",
187                "error_code" => "failed_deferring_all_sqs_messages",
188                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
189                "stage" => error_stage::PROCESSING,
190            )
191            .increment(1);
192        }
193    }
194}
195
196#[derive(Debug)]
197pub struct SqsMessageReceiveError<'a, E> {
198    pub error: &'a E,
199}
200
201impl<E: std::fmt::Display> InternalEvent for SqsMessageReceiveError<'_, E> {
202    fn emit(self) {
203        error!(
204            message = "Failed to fetch SQS events.",
205            error = %self.error,
206            error_code = "failed_fetching_sqs_events",
207            error_type = error_type::REQUEST_FAILED,
208            stage = error_stage::RECEIVING,
209            internal_log_rate_limit = true,
210        );
211        counter!(
212            "component_errors_total",
213            "error_code" => "failed_fetching_sqs_events",
214            "error_type" => error_type::REQUEST_FAILED,
215            "stage" => error_stage::RECEIVING,
216        )
217        .increment(1);
218    }
219}
220
221#[derive(Debug)]
222pub struct SqsMessageReceiveSucceeded {
223    pub count: usize,
224}
225
226impl InternalEvent for SqsMessageReceiveSucceeded {
227    fn emit(self) {
228        trace!(message = "Received SQS messages.", count = %self.count);
229        counter!("sqs_message_receive_succeeded_total").increment(1);
230        counter!("sqs_message_received_messages_total").increment(self.count as u64);
231    }
232}
233
234#[derive(Debug)]
235pub struct SqsMessageProcessingSucceeded<'a> {
236    pub message_id: &'a str,
237}
238
239impl InternalEvent for SqsMessageProcessingSucceeded<'_> {
240    fn emit(self) {
241        trace!(message = "Processed SQS message successfully.", message_id = %self.message_id);
242        counter!("sqs_message_processing_succeeded_total").increment(1);
243    }
244}
245
246// AWS SQS source
247
248#[cfg(feature = "sources-aws_sqs")]
249#[derive(Debug)]
250pub struct SqsMessageDeleteError<'a, E> {
251    pub error: &'a E,
252}
253
254#[cfg(feature = "sources-aws_sqs")]
255impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteError<'_, E> {
256    fn emit(self) {
257        error!(
258            message = "Failed to delete SQS events.",
259            error = %self.error,
260            error_type = error_type::WRITER_FAILED,
261            stage = error_stage::PROCESSING,
262            internal_log_rate_limit = true,
263        );
264        counter!(
265            "component_errors_total",
266            "error_type" => error_type::WRITER_FAILED,
267            "stage" => error_stage::PROCESSING,
268        )
269        .increment(1);
270    }
271}
272
273// AWS s3 source
274
275#[derive(Debug)]
276pub struct SqsS3EventRecordInvalidEventIgnored<'a> {
277    pub bucket: &'a str,
278    pub key: &'a str,
279    pub kind: &'a str,
280    pub name: &'a str,
281}
282
283impl InternalEvent for SqsS3EventRecordInvalidEventIgnored<'_> {
284    fn emit(self) {
285        warn!(message = "Ignored S3 record in SQS message for an event that was not ObjectCreated.",
286            bucket = %self.bucket, key = %self.key, kind = %self.kind, name = %self.name);
287        counter!("sqs_s3_event_record_ignored_total", "ignore_type" => "invalid_event_kind")
288            .increment(1);
289    }
290}