vector/sources/aws_sqs/
source.rs

1use std::{collections::HashMap, panic, str::FromStr, sync::Arc};
2
3use aws_sdk_sqs::{
4    types::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName},
5    Client as SqsClient,
6};
7use chrono::{DateTime, TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use tokio::{pin, select};
10use tracing_futures::Instrument;
11use vector_lib::config::LogNamespace;
12use vector_lib::finalizer::UnorderedFinalizer;
13use vector_lib::internal_event::{EventsReceived, Registered};
14
15use crate::{
16    codecs::Decoder,
17    event::{BatchNotifier, BatchStatus},
18    internal_events::{
19        EndpointBytesReceived, SqsMessageDeleteError, SqsMessageReceiveError, StreamClosedError,
20    },
21    shutdown::ShutdownSignal,
22    sources::util,
23    SourceSender,
24};
25
26// This is the maximum SQS supports in a single batch request
27const MAX_BATCH_SIZE: i32 = 10;
28
29type Finalizer = UnorderedFinalizer<Vec<String>>;
30
31#[derive(Clone)]
32pub struct SqsSource {
33    pub client: SqsClient,
34    pub queue_url: String,
35    pub decoder: Decoder,
36    pub poll_secs: u32,
37    pub visibility_timeout_secs: u32,
38    pub delete_message: bool,
39    pub concurrency: usize,
40    pub(super) acknowledgements: bool,
41    pub(super) log_namespace: LogNamespace,
42}
43
44impl SqsSource {
45    pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
46        let mut task_handles = vec![];
47        let finalizer = self.acknowledgements.then(|| {
48            let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
49            let client = self.client.clone();
50            let queue_url = self.queue_url.clone();
51            tokio::spawn(
52                async move {
53                    while let Some((status, receipts)) = ack_stream.next().await {
54                        if status == BatchStatus::Delivered {
55                            delete_messages(client.clone(), receipts, queue_url.clone()).await;
56                        }
57                    }
58                }
59                .in_current_span(),
60            );
61            Arc::new(finalizer)
62        });
63        let events_received = register!(EventsReceived);
64
65        for _ in 0..self.concurrency {
66            let source = self.clone();
67            let shutdown = shutdown.clone().fuse();
68            let mut out = out.clone();
69            let finalizer = finalizer.clone();
70            let events_received = events_received.clone();
71            task_handles.push(tokio::spawn(
72                async move {
73                    let finalizer = finalizer.as_ref();
74                    pin!(shutdown);
75                    loop {
76                        select! {
77                            _ = &mut shutdown => break,
78                            _ = source.run_once(&mut out, finalizer, events_received.clone()) => {},
79                        }
80                    }
81                }
82                .in_current_span(),
83            ));
84        }
85
86        // Wait for all of the processes to finish.  If any one of them panics, we resume
87        // that panic here to properly shutdown Vector.
88        for task_handle in task_handles.drain(..) {
89            if let Err(e) = task_handle.await {
90                if e.is_panic() {
91                    panic::resume_unwind(e.into_panic());
92                }
93            }
94        }
95        Ok(())
96    }
97
98    async fn run_once(
99        &self,
100        out: &mut SourceSender,
101        finalizer: Option<&Arc<Finalizer>>,
102        events_received: Registered<EventsReceived>,
103    ) {
104        let result = self
105            .client
106            .receive_message()
107            .queue_url(&self.queue_url)
108            .max_number_of_messages(MAX_BATCH_SIZE)
109            .wait_time_seconds(self.poll_secs as i32)
110            .visibility_timeout(self.visibility_timeout_secs as i32)
111            .message_system_attribute_names(MessageSystemAttributeName::from("SentTimestamp"))
112            // I think this should be a known attribute
113            // https://github.com/awslabs/aws-sdk-rust/issues/411
114            .send()
115            .await;
116
117        let receive_message_output = match result {
118            Ok(output) => output,
119            Err(err) => {
120                emit!(SqsMessageReceiveError { error: &err });
121                return;
122            }
123        };
124
125        if let Some(messages) = receive_message_output.messages {
126            let byte_size = messages
127                .iter()
128                .map(|message| message.body().map(|body| body.len()).unwrap_or(0))
129                .sum();
130            emit!(EndpointBytesReceived {
131                byte_size,
132                protocol: "http",
133                endpoint: &self.queue_url
134            });
135
136            let mut receipts_to_ack = Vec::with_capacity(messages.len());
137            let mut events = Vec::with_capacity(messages.len());
138
139            let (batch, batch_receiver) =
140                BatchNotifier::maybe_new_with_receiver(finalizer.is_some());
141            for message in messages {
142                if let Some(body) = message.body {
143                    // a receipt handle should always exist
144                    if let Some(receipt_handle) = message.receipt_handle {
145                        receipts_to_ack.push(receipt_handle);
146                    }
147                    let timestamp = get_timestamp(&message.attributes);
148                    // Error is logged by `crate::codecs::Decoder`, no further handling
149                    // is needed here.
150                    let decoded = util::decode_message(
151                        self.decoder.clone(),
152                        "aws_sqs",
153                        body.as_bytes(),
154                        timestamp,
155                        &batch,
156                        self.log_namespace,
157                        &events_received,
158                    );
159                    events.extend(decoded);
160                }
161            }
162            drop(batch); // Drop last reference to batch acknowledgement finalizer
163            let count = events.len();
164
165            match out.send_batch(events).await {
166                Ok(()) => {
167                    if self.delete_message {
168                        match batch_receiver {
169                            Some(receiver) => finalizer
170                                .expect("Finalizer must exist for the batch receiver to be created")
171                                .add(receipts_to_ack, receiver),
172                            None => {
173                                delete_messages(
174                                    self.client.clone(),
175                                    receipts_to_ack,
176                                    self.queue_url.clone(),
177                                )
178                                .await
179                            }
180                        }
181                    }
182                }
183                Err(_) => emit!(StreamClosedError { count }),
184            }
185        }
186    }
187}
188
189fn get_timestamp(
190    attributes: &Option<HashMap<MessageSystemAttributeName, String>>,
191) -> Option<DateTime<Utc>> {
192    attributes.as_ref().and_then(|attributes| {
193        let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?;
194        Some(
195            Utc.timestamp_millis_opt(i64::from_str(sent_time_str).ok()?)
196                .single()
197                .expect("invalid timestamp"),
198        )
199    })
200}
201
202async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: String) {
203    if !receipts.is_empty() {
204        let mut batch = client.delete_message_batch().queue_url(queue_url);
205
206        for (id, receipt) in receipts.into_iter().enumerate() {
207            batch = batch.entries(
208                DeleteMessageBatchRequestEntry::builder()
209                    .id(id.to_string())
210                    .receipt_handle(receipt)
211                    .build()
212                    .expect("all required builder parameters specified"),
213            );
214        }
215        if let Err(err) = batch.send().await {
216            emit!(SqsMessageDeleteError { error: &err });
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::codecs::DecodingConfig;
225    use crate::config::{log_schema, SourceConfig};
226    use crate::sources::aws_sqs::AwsSqsConfig;
227    use chrono::SecondsFormat;
228    use vector_lib::lookup::path;
229
230    #[tokio::test]
231    async fn test_decode_vector_namespace() {
232        let config = AwsSqsConfig {
233            log_namespace: Some(true),
234            ..Default::default()
235        };
236        let definitions = config
237            .outputs(LogNamespace::Vector)
238            .remove(0)
239            .schema_definition(true);
240
241        let message = "test";
242        let now = Utc::now();
243        let events: Vec<_> = util::decode_message(
244            DecodingConfig::new(
245                config.framing.clone(),
246                config.decoding,
247                LogNamespace::Vector,
248            )
249            .build()
250            .unwrap(),
251            "aws_sqs",
252            b"test",
253            Some(now),
254            &None,
255            LogNamespace::Vector,
256            &register!(EventsReceived),
257        )
258        .collect();
259        assert_eq!(events.len(), 1);
260        assert_eq!(
261            events[0]
262                .clone()
263                .as_log()
264                .get(".")
265                .unwrap()
266                .to_string_lossy(),
267            message
268        );
269        assert_eq!(
270            events[0]
271                .clone()
272                .as_log()
273                .metadata()
274                .value()
275                .get(path!(AwsSqsConfig::NAME, "timestamp"))
276                .unwrap()
277                .to_string_lossy(),
278            now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
279        );
280        definitions.unwrap().assert_valid_for_event(&events[0]);
281    }
282
283    #[tokio::test]
284    async fn test_decode_legacy_namespace() {
285        let config = AwsSqsConfig {
286            log_namespace: None,
287            ..Default::default()
288        };
289        let definitions = config
290            .outputs(LogNamespace::Legacy)
291            .remove(0)
292            .schema_definition(true);
293
294        let message = "test";
295        let now = Utc::now();
296        let events: Vec<_> = util::decode_message(
297            DecodingConfig::new(
298                config.framing.clone(),
299                config.decoding,
300                LogNamespace::Legacy,
301            )
302            .build()
303            .unwrap(),
304            "aws_sqs",
305            b"test",
306            Some(now),
307            &None,
308            LogNamespace::Legacy,
309            &register!(EventsReceived),
310        )
311        .collect();
312        assert_eq!(events.len(), 1);
313        assert_eq!(
314            events[0]
315                .clone()
316                .as_log()
317                .get(log_schema().message_key_target_path().unwrap())
318                .unwrap()
319                .to_string_lossy(),
320            message
321        );
322        assert_eq!(
323            events[0]
324                .clone()
325                .as_log()
326                .get_timestamp()
327                .unwrap()
328                .to_string_lossy(),
329            now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
330        );
331        definitions.unwrap().assert_valid_for_event(&events[0]);
332    }
333
334    #[test]
335    fn test_get_timestamp() {
336        let attributes = HashMap::from([(
337            MessageSystemAttributeName::SentTimestamp,
338            "1636408546018".to_string(),
339        )]);
340
341        assert_eq!(
342            get_timestamp(&Some(attributes)),
343            Some(
344                Utc.timestamp_millis_opt(1636408546018)
345                    .single()
346                    .expect("invalid timestamp")
347            )
348        );
349    }
350}