1use std::{collections::HashMap, panic, str::FromStr, sync::Arc};
2
3use aws_sdk_sqs::{
4 Client as SqsClient,
5 types::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName},
6};
7use chrono::{DateTime, TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use tokio::{pin, select};
10use tracing_futures::Instrument;
11use vector_lib::{
12 config::LogNamespace,
13 finalizer::UnorderedFinalizer,
14 internal_event::{EventsReceived, Registered},
15};
16
17use crate::{
18 SourceSender,
19 codecs::Decoder,
20 event::{BatchNotifier, BatchStatus},
21 internal_events::{
22 EndpointBytesReceived, SqsMessageDeleteError, SqsMessageReceiveError, StreamClosedError,
23 },
24 shutdown::ShutdownSignal,
25 sources::util,
26};
27
28const MAX_BATCH_SIZE: i32 = 10;
30
31type Finalizer = UnorderedFinalizer<Vec<String>>;
32
33#[derive(Clone)]
34pub struct SqsSource {
35 pub client: SqsClient,
36 pub queue_url: String,
37 pub decoder: Decoder,
38 pub poll_secs: u32,
39 pub visibility_timeout_secs: u32,
40 pub delete_message: bool,
41 pub concurrency: usize,
42 pub(super) acknowledgements: bool,
43 pub(super) log_namespace: LogNamespace,
44}
45
46impl SqsSource {
47 pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
48 let mut task_handles = vec![];
49 let finalizer = self.acknowledgements.then(|| {
50 let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
51 let client = self.client.clone();
52 let queue_url = self.queue_url.clone();
53 tokio::spawn(
54 async move {
55 while let Some((status, receipts)) = ack_stream.next().await {
56 if status == BatchStatus::Delivered {
57 delete_messages(client.clone(), receipts, queue_url.clone()).await;
58 }
59 }
60 }
61 .in_current_span(),
62 );
63 Arc::new(finalizer)
64 });
65 let events_received = register!(EventsReceived);
66
67 for _ in 0..self.concurrency {
68 let source = self.clone();
69 let shutdown = shutdown.clone().fuse();
70 let mut out = out.clone();
71 let finalizer = finalizer.clone();
72 let events_received = events_received.clone();
73 task_handles.push(tokio::spawn(
74 async move {
75 let finalizer = finalizer.as_ref();
76 pin!(shutdown);
77 loop {
78 select! {
79 _ = &mut shutdown => break,
80 _ = source.run_once(&mut out, finalizer, events_received.clone()) => {},
81 }
82 }
83 }
84 .in_current_span(),
85 ));
86 }
87
88 for task_handle in task_handles.drain(..) {
91 if let Err(e) = task_handle.await
92 && e.is_panic()
93 {
94 panic::resume_unwind(e.into_panic());
95 }
96 }
97 Ok(())
98 }
99
100 async fn run_once(
101 &self,
102 out: &mut SourceSender,
103 finalizer: Option<&Arc<Finalizer>>,
104 events_received: Registered<EventsReceived>,
105 ) {
106 let result = self
107 .client
108 .receive_message()
109 .queue_url(&self.queue_url)
110 .max_number_of_messages(MAX_BATCH_SIZE)
111 .wait_time_seconds(self.poll_secs as i32)
112 .visibility_timeout(self.visibility_timeout_secs as i32)
113 .message_system_attribute_names(MessageSystemAttributeName::from("SentTimestamp"))
114 .send()
117 .await;
118
119 let receive_message_output = match result {
120 Ok(output) => output,
121 Err(err) => {
122 emit!(SqsMessageReceiveError { error: &err });
123 return;
124 }
125 };
126
127 if let Some(messages) = receive_message_output.messages {
128 let byte_size = messages
129 .iter()
130 .map(|message| message.body().map(|body| body.len()).unwrap_or(0))
131 .sum();
132 emit!(EndpointBytesReceived {
133 byte_size,
134 protocol: "http",
135 endpoint: &self.queue_url
136 });
137
138 let mut receipts_to_ack = Vec::with_capacity(messages.len());
139 let mut events = Vec::with_capacity(messages.len());
140
141 let (batch, batch_receiver) =
142 BatchNotifier::maybe_new_with_receiver(finalizer.is_some());
143 for message in messages {
144 if let Some(body) = message.body {
145 if let Some(receipt_handle) = message.receipt_handle {
147 receipts_to_ack.push(receipt_handle);
148 }
149 let timestamp = get_timestamp(&message.attributes);
150 let decoded = util::decode_message(
153 self.decoder.clone(),
154 "aws_sqs",
155 body.as_bytes(),
156 timestamp,
157 &batch,
158 self.log_namespace,
159 &events_received,
160 );
161 events.extend(decoded);
162 }
163 }
164 drop(batch); let count = events.len();
166
167 match out.send_batch(events).await {
168 Ok(()) => {
169 if self.delete_message {
170 match batch_receiver {
171 Some(receiver) => finalizer
172 .expect("Finalizer must exist for the batch receiver to be created")
173 .add(receipts_to_ack, receiver),
174 None => {
175 delete_messages(
176 self.client.clone(),
177 receipts_to_ack,
178 self.queue_url.clone(),
179 )
180 .await
181 }
182 }
183 }
184 }
185 Err(_) => emit!(StreamClosedError { count }),
186 }
187 }
188 }
189}
190
191fn get_timestamp(
192 attributes: &Option<HashMap<MessageSystemAttributeName, String>>,
193) -> Option<DateTime<Utc>> {
194 attributes.as_ref().and_then(|attributes| {
195 let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?;
196 Some(
197 Utc.timestamp_millis_opt(i64::from_str(sent_time_str).ok()?)
198 .single()
199 .expect("invalid timestamp"),
200 )
201 })
202}
203
204async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: String) {
205 if !receipts.is_empty() {
206 let mut batch = client.delete_message_batch().queue_url(queue_url);
207
208 for (id, receipt) in receipts.into_iter().enumerate() {
209 batch = batch.entries(
210 DeleteMessageBatchRequestEntry::builder()
211 .id(id.to_string())
212 .receipt_handle(receipt)
213 .build()
214 .expect("all required builder parameters specified"),
215 );
216 }
217 if let Err(err) = batch.send().await {
218 emit!(SqsMessageDeleteError { error: &err });
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use chrono::SecondsFormat;
226 use vector_lib::lookup::path;
227
228 use super::*;
229 use crate::{
230 codecs::DecodingConfig,
231 config::{SourceConfig, log_schema},
232 sources::aws_sqs::AwsSqsConfig,
233 };
234
235 #[tokio::test]
236 async fn test_decode_vector_namespace() {
237 let config = AwsSqsConfig {
238 log_namespace: Some(true),
239 ..Default::default()
240 };
241 let definitions = config
242 .outputs(LogNamespace::Vector)
243 .remove(0)
244 .schema_definition(true);
245
246 let message = "test";
247 let now = Utc::now();
248 let events: Vec<_> = util::decode_message(
249 DecodingConfig::new(
250 config.framing.clone(),
251 config.decoding,
252 LogNamespace::Vector,
253 )
254 .build()
255 .unwrap(),
256 "aws_sqs",
257 b"test",
258 Some(now),
259 &None,
260 LogNamespace::Vector,
261 ®ister!(EventsReceived),
262 )
263 .collect();
264 assert_eq!(events.len(), 1);
265 assert_eq!(
266 events[0]
267 .clone()
268 .as_log()
269 .get(".")
270 .unwrap()
271 .to_string_lossy(),
272 message
273 );
274 assert_eq!(
275 events[0]
276 .clone()
277 .as_log()
278 .metadata()
279 .value()
280 .get(path!(AwsSqsConfig::NAME, "timestamp"))
281 .unwrap()
282 .to_string_lossy(),
283 now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
284 );
285 definitions.unwrap().assert_valid_for_event(&events[0]);
286 }
287
288 #[tokio::test]
289 async fn test_decode_legacy_namespace() {
290 let config = AwsSqsConfig {
291 log_namespace: None,
292 ..Default::default()
293 };
294 let definitions = config
295 .outputs(LogNamespace::Legacy)
296 .remove(0)
297 .schema_definition(true);
298
299 let message = "test";
300 let now = Utc::now();
301 let events: Vec<_> = util::decode_message(
302 DecodingConfig::new(
303 config.framing.clone(),
304 config.decoding,
305 LogNamespace::Legacy,
306 )
307 .build()
308 .unwrap(),
309 "aws_sqs",
310 b"test",
311 Some(now),
312 &None,
313 LogNamespace::Legacy,
314 ®ister!(EventsReceived),
315 )
316 .collect();
317 assert_eq!(events.len(), 1);
318 assert_eq!(
319 events[0]
320 .clone()
321 .as_log()
322 .get(log_schema().message_key_target_path().unwrap())
323 .unwrap()
324 .to_string_lossy(),
325 message
326 );
327 assert_eq!(
328 events[0]
329 .clone()
330 .as_log()
331 .get_timestamp()
332 .unwrap()
333 .to_string_lossy(),
334 now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
335 );
336 definitions.unwrap().assert_valid_for_event(&events[0]);
337 }
338
339 #[test]
340 fn test_get_timestamp() {
341 let attributes = HashMap::from([(
342 MessageSystemAttributeName::SentTimestamp,
343 "1636408546018".to_string(),
344 )]);
345
346 assert_eq!(
347 get_timestamp(&Some(attributes)),
348 Some(
349 Utc.timestamp_millis_opt(1636408546018)
350 .single()
351 .expect("invalid timestamp")
352 )
353 );
354 }
355}