1use std::{collections::HashMap, panic, str::FromStr, sync::Arc};
2
3use aws_sdk_sqs::{
4 types::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName},
5 Client as SqsClient,
6};
7use chrono::{DateTime, TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use tokio::{pin, select};
10use tracing_futures::Instrument;
11use vector_lib::config::LogNamespace;
12use vector_lib::finalizer::UnorderedFinalizer;
13use vector_lib::internal_event::{EventsReceived, Registered};
14
15use crate::{
16 codecs::Decoder,
17 event::{BatchNotifier, BatchStatus},
18 internal_events::{
19 EndpointBytesReceived, SqsMessageDeleteError, SqsMessageReceiveError, StreamClosedError,
20 },
21 shutdown::ShutdownSignal,
22 sources::util,
23 SourceSender,
24};
25
26const MAX_BATCH_SIZE: i32 = 10;
28
29type Finalizer = UnorderedFinalizer<Vec<String>>;
30
31#[derive(Clone)]
32pub struct SqsSource {
33 pub client: SqsClient,
34 pub queue_url: String,
35 pub decoder: Decoder,
36 pub poll_secs: u32,
37 pub visibility_timeout_secs: u32,
38 pub delete_message: bool,
39 pub concurrency: usize,
40 pub(super) acknowledgements: bool,
41 pub(super) log_namespace: LogNamespace,
42}
43
44impl SqsSource {
45 pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
46 let mut task_handles = vec![];
47 let finalizer = self.acknowledgements.then(|| {
48 let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
49 let client = self.client.clone();
50 let queue_url = self.queue_url.clone();
51 tokio::spawn(
52 async move {
53 while let Some((status, receipts)) = ack_stream.next().await {
54 if status == BatchStatus::Delivered {
55 delete_messages(client.clone(), receipts, queue_url.clone()).await;
56 }
57 }
58 }
59 .in_current_span(),
60 );
61 Arc::new(finalizer)
62 });
63 let events_received = register!(EventsReceived);
64
65 for _ in 0..self.concurrency {
66 let source = self.clone();
67 let shutdown = shutdown.clone().fuse();
68 let mut out = out.clone();
69 let finalizer = finalizer.clone();
70 let events_received = events_received.clone();
71 task_handles.push(tokio::spawn(
72 async move {
73 let finalizer = finalizer.as_ref();
74 pin!(shutdown);
75 loop {
76 select! {
77 _ = &mut shutdown => break,
78 _ = source.run_once(&mut out, finalizer, events_received.clone()) => {},
79 }
80 }
81 }
82 .in_current_span(),
83 ));
84 }
85
86 for task_handle in task_handles.drain(..) {
89 if let Err(e) = task_handle.await {
90 if e.is_panic() {
91 panic::resume_unwind(e.into_panic());
92 }
93 }
94 }
95 Ok(())
96 }
97
98 async fn run_once(
99 &self,
100 out: &mut SourceSender,
101 finalizer: Option<&Arc<Finalizer>>,
102 events_received: Registered<EventsReceived>,
103 ) {
104 let result = self
105 .client
106 .receive_message()
107 .queue_url(&self.queue_url)
108 .max_number_of_messages(MAX_BATCH_SIZE)
109 .wait_time_seconds(self.poll_secs as i32)
110 .visibility_timeout(self.visibility_timeout_secs as i32)
111 .message_system_attribute_names(MessageSystemAttributeName::from("SentTimestamp"))
112 .send()
115 .await;
116
117 let receive_message_output = match result {
118 Ok(output) => output,
119 Err(err) => {
120 emit!(SqsMessageReceiveError { error: &err });
121 return;
122 }
123 };
124
125 if let Some(messages) = receive_message_output.messages {
126 let byte_size = messages
127 .iter()
128 .map(|message| message.body().map(|body| body.len()).unwrap_or(0))
129 .sum();
130 emit!(EndpointBytesReceived {
131 byte_size,
132 protocol: "http",
133 endpoint: &self.queue_url
134 });
135
136 let mut receipts_to_ack = Vec::with_capacity(messages.len());
137 let mut events = Vec::with_capacity(messages.len());
138
139 let (batch, batch_receiver) =
140 BatchNotifier::maybe_new_with_receiver(finalizer.is_some());
141 for message in messages {
142 if let Some(body) = message.body {
143 if let Some(receipt_handle) = message.receipt_handle {
145 receipts_to_ack.push(receipt_handle);
146 }
147 let timestamp = get_timestamp(&message.attributes);
148 let decoded = util::decode_message(
151 self.decoder.clone(),
152 "aws_sqs",
153 body.as_bytes(),
154 timestamp,
155 &batch,
156 self.log_namespace,
157 &events_received,
158 );
159 events.extend(decoded);
160 }
161 }
162 drop(batch); let count = events.len();
164
165 match out.send_batch(events).await {
166 Ok(()) => {
167 if self.delete_message {
168 match batch_receiver {
169 Some(receiver) => finalizer
170 .expect("Finalizer must exist for the batch receiver to be created")
171 .add(receipts_to_ack, receiver),
172 None => {
173 delete_messages(
174 self.client.clone(),
175 receipts_to_ack,
176 self.queue_url.clone(),
177 )
178 .await
179 }
180 }
181 }
182 }
183 Err(_) => emit!(StreamClosedError { count }),
184 }
185 }
186 }
187}
188
189fn get_timestamp(
190 attributes: &Option<HashMap<MessageSystemAttributeName, String>>,
191) -> Option<DateTime<Utc>> {
192 attributes.as_ref().and_then(|attributes| {
193 let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?;
194 Some(
195 Utc.timestamp_millis_opt(i64::from_str(sent_time_str).ok()?)
196 .single()
197 .expect("invalid timestamp"),
198 )
199 })
200}
201
202async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: String) {
203 if !receipts.is_empty() {
204 let mut batch = client.delete_message_batch().queue_url(queue_url);
205
206 for (id, receipt) in receipts.into_iter().enumerate() {
207 batch = batch.entries(
208 DeleteMessageBatchRequestEntry::builder()
209 .id(id.to_string())
210 .receipt_handle(receipt)
211 .build()
212 .expect("all required builder parameters specified"),
213 );
214 }
215 if let Err(err) = batch.send().await {
216 emit!(SqsMessageDeleteError { error: &err });
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::codecs::DecodingConfig;
225 use crate::config::{log_schema, SourceConfig};
226 use crate::sources::aws_sqs::AwsSqsConfig;
227 use chrono::SecondsFormat;
228 use vector_lib::lookup::path;
229
230 #[tokio::test]
231 async fn test_decode_vector_namespace() {
232 let config = AwsSqsConfig {
233 log_namespace: Some(true),
234 ..Default::default()
235 };
236 let definitions = config
237 .outputs(LogNamespace::Vector)
238 .remove(0)
239 .schema_definition(true);
240
241 let message = "test";
242 let now = Utc::now();
243 let events: Vec<_> = util::decode_message(
244 DecodingConfig::new(
245 config.framing.clone(),
246 config.decoding,
247 LogNamespace::Vector,
248 )
249 .build()
250 .unwrap(),
251 "aws_sqs",
252 b"test",
253 Some(now),
254 &None,
255 LogNamespace::Vector,
256 ®ister!(EventsReceived),
257 )
258 .collect();
259 assert_eq!(events.len(), 1);
260 assert_eq!(
261 events[0]
262 .clone()
263 .as_log()
264 .get(".")
265 .unwrap()
266 .to_string_lossy(),
267 message
268 );
269 assert_eq!(
270 events[0]
271 .clone()
272 .as_log()
273 .metadata()
274 .value()
275 .get(path!(AwsSqsConfig::NAME, "timestamp"))
276 .unwrap()
277 .to_string_lossy(),
278 now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
279 );
280 definitions.unwrap().assert_valid_for_event(&events[0]);
281 }
282
283 #[tokio::test]
284 async fn test_decode_legacy_namespace() {
285 let config = AwsSqsConfig {
286 log_namespace: None,
287 ..Default::default()
288 };
289 let definitions = config
290 .outputs(LogNamespace::Legacy)
291 .remove(0)
292 .schema_definition(true);
293
294 let message = "test";
295 let now = Utc::now();
296 let events: Vec<_> = util::decode_message(
297 DecodingConfig::new(
298 config.framing.clone(),
299 config.decoding,
300 LogNamespace::Legacy,
301 )
302 .build()
303 .unwrap(),
304 "aws_sqs",
305 b"test",
306 Some(now),
307 &None,
308 LogNamespace::Legacy,
309 ®ister!(EventsReceived),
310 )
311 .collect();
312 assert_eq!(events.len(), 1);
313 assert_eq!(
314 events[0]
315 .clone()
316 .as_log()
317 .get(log_schema().message_key_target_path().unwrap())
318 .unwrap()
319 .to_string_lossy(),
320 message
321 );
322 assert_eq!(
323 events[0]
324 .clone()
325 .as_log()
326 .get_timestamp()
327 .unwrap()
328 .to_string_lossy(),
329 now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
330 );
331 definitions.unwrap().assert_valid_for_event(&events[0]);
332 }
333
334 #[test]
335 fn test_get_timestamp() {
336 let attributes = HashMap::from([(
337 MessageSystemAttributeName::SentTimestamp,
338 "1636408546018".to_string(),
339 )]);
340
341 assert_eq!(
342 get_timestamp(&Some(attributes)),
343 Some(
344 Utc.timestamp_millis_opt(1636408546018)
345 .single()
346 .expect("invalid timestamp")
347 )
348 );
349 }
350}