1#![allow(dead_code)] use metrics::counter;
4#[cfg(feature = "sources-aws_s3")]
5pub use s3::*;
6use vector_lib::internal_event::InternalEvent;
7#[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs"))]
8use vector_lib::internal_event::{error_stage, error_type};
9
10#[cfg(feature = "sources-aws_s3")]
11mod s3 {
12 use aws_sdk_sqs::types::{
13 BatchResultErrorEntry, DeleteMessageBatchRequestEntry, DeleteMessageBatchResultEntry,
14 SendMessageBatchRequestEntry, SendMessageBatchResultEntry,
15 };
16
17 use super::*;
18 use crate::sources::aws_s3::sqs::ProcessingError;
19
20 #[derive(Debug)]
21 pub struct SqsMessageProcessingError<'a> {
22 pub message_id: &'a str,
23 pub error: &'a ProcessingError,
24 }
25
26 impl InternalEvent for SqsMessageProcessingError<'_> {
27 fn emit(self) {
28 error!(
29 message = "Failed to process SQS message.",
30 message_id = %self.message_id,
31 error = %self.error,
32 error_code = "failed_processing_sqs_message",
33 error_type = error_type::PARSER_FAILED,
34 stage = error_stage::PROCESSING,
35 internal_log_rate_limit = true,
36 );
37 counter!(
38 "component_errors_total",
39 "error_code" => "failed_processing_sqs_message",
40 "error_type" => error_type::PARSER_FAILED,
41 "stage" => error_stage::PROCESSING,
42 )
43 .increment(1);
44 }
45 }
46
47 #[derive(Debug)]
48 pub struct SqsMessageDeleteSucceeded {
49 pub message_ids: Vec<DeleteMessageBatchResultEntry>,
50 }
51
52 impl InternalEvent for SqsMessageDeleteSucceeded {
53 fn emit(self) {
54 trace!(message = "Deleted SQS message(s).",
55 message_ids = %self.message_ids.iter()
56 .map(|x| x.id.as_str())
57 .collect::<Vec<_>>()
58 .join(", "));
59 counter!("sqs_message_delete_succeeded_total").increment(self.message_ids.len() as u64);
60 }
61 }
62
63 #[derive(Debug)]
64 pub struct SqsMessageDeletePartialError {
65 pub entries: Vec<BatchResultErrorEntry>,
66 }
67
68 impl InternalEvent for SqsMessageDeletePartialError {
69 fn emit(self) {
70 error!(
71 message = "Deletion of SQS message(s) failed.",
72 message_ids = %self.entries.iter()
73 .map(|x| format!("{}/{}", x.id, x.code))
74 .collect::<Vec<_>>()
75 .join(", "),
76 error_code = "failed_deleting_some_sqs_messages",
77 error_type = error_type::ACKNOWLEDGMENT_FAILED,
78 stage = error_stage::PROCESSING,
79 internal_log_rate_limit = true,
80 );
81 counter!(
82 "component_errors_total",
83 "error_code" => "failed_deleting_some_sqs_messages",
84 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
85 "stage" => error_stage::PROCESSING,
86 )
87 .increment(1);
88 }
89 }
90
91 #[derive(Debug)]
92 pub struct SqsMessageDeleteBatchError<E> {
93 pub entries: Vec<DeleteMessageBatchRequestEntry>,
94 pub error: E,
95 }
96
97 impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteBatchError<E> {
98 fn emit(self) {
99 error!(
100 message = "Deletion of SQS message(s) failed.",
101 message_ids = %self.entries.iter()
102 .map(|x| x.id.as_str())
103 .collect::<Vec<_>>()
104 .join(", "),
105 error = %self.error,
106 error_code = "failed_deleting_all_sqs_messages",
107 error_type = error_type::ACKNOWLEDGMENT_FAILED,
108 stage = error_stage::PROCESSING,
109 internal_log_rate_limit = true,
110 );
111 counter!(
112 "component_errors_total",
113 "error_code" => "failed_deleting_all_sqs_messages",
114 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
115 "stage" => error_stage::PROCESSING,
116 )
117 .increment(1);
118 }
119 }
120
121 #[derive(Debug)]
122 pub struct SqsMessageSentSucceeded {
123 pub message_ids: Vec<SendMessageBatchResultEntry>,
124 }
125
126 impl InternalEvent for SqsMessageSentSucceeded {
127 fn emit(self) {
128 trace!(message = "Deferred SQS message(s).",
129 message_ids = %self.message_ids.iter()
130 .map(|x| x.id.as_str())
131 .collect::<Vec<_>>()
132 .join(", "));
133 counter!("sqs_message_defer_succeeded_total").increment(self.message_ids.len() as u64);
134 }
135 }
136
137 #[derive(Debug)]
138 pub struct SqsMessageSentPartialError {
139 pub entries: Vec<BatchResultErrorEntry>,
140 }
141
142 impl InternalEvent for SqsMessageSentPartialError {
143 fn emit(self) {
144 error!(
145 message = "Sending of deferred SQS message(s) failed.",
146 message_ids = %self.entries.iter()
147 .map(|x| format!("{}/{}", x.id, x.code))
148 .collect::<Vec<_>>()
149 .join(", "),
150 error_code = "failed_deferring_some_sqs_messages",
151 error_type = error_type::ACKNOWLEDGMENT_FAILED,
152 stage = error_stage::PROCESSING,
153 internal_log_rate_limit = true,
154 );
155 counter!(
156 "component_errors_total",
157 "error_code" => "failed_deferring_some_sqs_messages",
158 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
159 "stage" => error_stage::PROCESSING,
160 )
161 .increment(1);
162 }
163 }
164
165 #[derive(Debug)]
166 pub struct SqsMessageSendBatchError<E> {
167 pub entries: Vec<SendMessageBatchRequestEntry>,
168 pub error: E,
169 }
170
171 impl<E: std::fmt::Display> InternalEvent for SqsMessageSendBatchError<E> {
172 fn emit(self) {
173 error!(
174 message = "Sending of deferred SQS message(s) failed.",
175 message_ids = %self.entries.iter()
176 .map(|x| x.id.as_str())
177 .collect::<Vec<_>>()
178 .join(", "),
179 error = %self.error,
180 error_code = "failed_deferring_all_sqs_messages",
181 error_type = error_type::ACKNOWLEDGMENT_FAILED,
182 stage = error_stage::PROCESSING,
183 internal_log_rate_limit = true,
184 );
185 counter!(
186 "component_errors_total",
187 "error_code" => "failed_deferring_all_sqs_messages",
188 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
189 "stage" => error_stage::PROCESSING,
190 )
191 .increment(1);
192 }
193 }
194}
195
196#[derive(Debug)]
197pub struct SqsMessageReceiveError<'a, E> {
198 pub error: &'a E,
199}
200
201impl<E: std::fmt::Display> InternalEvent for SqsMessageReceiveError<'_, E> {
202 fn emit(self) {
203 error!(
204 message = "Failed to fetch SQS events.",
205 error = %self.error,
206 error_code = "failed_fetching_sqs_events",
207 error_type = error_type::REQUEST_FAILED,
208 stage = error_stage::RECEIVING,
209 internal_log_rate_limit = true,
210 );
211 counter!(
212 "component_errors_total",
213 "error_code" => "failed_fetching_sqs_events",
214 "error_type" => error_type::REQUEST_FAILED,
215 "stage" => error_stage::RECEIVING,
216 )
217 .increment(1);
218 }
219}
220
221#[derive(Debug)]
222pub struct SqsMessageReceiveSucceeded {
223 pub count: usize,
224}
225
226impl InternalEvent for SqsMessageReceiveSucceeded {
227 fn emit(self) {
228 trace!(message = "Received SQS messages.", count = %self.count);
229 counter!("sqs_message_receive_succeeded_total").increment(1);
230 counter!("sqs_message_received_messages_total").increment(self.count as u64);
231 }
232}
233
234#[derive(Debug)]
235pub struct SqsMessageProcessingSucceeded<'a> {
236 pub message_id: &'a str,
237}
238
239impl InternalEvent for SqsMessageProcessingSucceeded<'_> {
240 fn emit(self) {
241 trace!(message = "Processed SQS message successfully.", message_id = %self.message_id);
242 counter!("sqs_message_processing_succeeded_total").increment(1);
243 }
244}
245
246#[cfg(feature = "sources-aws_sqs")]
249#[derive(Debug)]
250pub struct SqsMessageDeleteError<'a, E> {
251 pub error: &'a E,
252}
253
254#[cfg(feature = "sources-aws_sqs")]
255impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteError<'_, E> {
256 fn emit(self) {
257 error!(
258 message = "Failed to delete SQS events.",
259 error = %self.error,
260 error_type = error_type::WRITER_FAILED,
261 stage = error_stage::PROCESSING,
262 internal_log_rate_limit = true,
263 );
264 counter!(
265 "component_errors_total",
266 "error_type" => error_type::WRITER_FAILED,
267 "stage" => error_stage::PROCESSING,
268 )
269 .increment(1);
270 }
271}
272
273#[derive(Debug)]
276pub struct SqsS3EventRecordInvalidEventIgnored<'a> {
277 pub bucket: &'a str,
278 pub key: &'a str,
279 pub kind: &'a str,
280 pub name: &'a str,
281}
282
283impl InternalEvent for SqsS3EventRecordInvalidEventIgnored<'_> {
284 fn emit(self) {
285 warn!(message = "Ignored S3 record in SQS message for an event that was not ObjectCreated.",
286 bucket = %self.bucket, key = %self.key, kind = %self.kind, name = %self.name);
287 counter!("sqs_s3_event_record_ignored_total", "ignore_type" => "invalid_event_kind")
288 .increment(1);
289 }
290}