vector/sources/aws_sqs/
source.rs

1use std::{collections::HashMap, panic, str::FromStr, sync::Arc};
2
3use aws_sdk_sqs::{
4    Client as SqsClient,
5    types::{DeleteMessageBatchRequestEntry, MessageSystemAttributeName},
6};
7use chrono::{DateTime, TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use tokio::{pin, select};
10use tracing_futures::Instrument;
11use vector_lib::{
12    config::LogNamespace,
13    finalizer::UnorderedFinalizer,
14    internal_event::{EventsReceived, Registered},
15};
16
17use crate::{
18    SourceSender,
19    codecs::Decoder,
20    event::{BatchNotifier, BatchStatus},
21    internal_events::{
22        EndpointBytesReceived, SqsMessageDeleteError, SqsMessageReceiveError, StreamClosedError,
23    },
24    shutdown::ShutdownSignal,
25    sources::util,
26};
27
28// This is the maximum SQS supports in a single batch request
29const MAX_BATCH_SIZE: i32 = 10;
30
31type Finalizer = UnorderedFinalizer<Vec<String>>;
32
33#[derive(Clone)]
34pub struct SqsSource {
35    pub client: SqsClient,
36    pub queue_url: String,
37    pub decoder: Decoder,
38    pub poll_secs: u32,
39    pub visibility_timeout_secs: u32,
40    pub delete_message: bool,
41    pub concurrency: usize,
42    pub(super) acknowledgements: bool,
43    pub(super) log_namespace: LogNamespace,
44}
45
46impl SqsSource {
47    pub async fn run(self, out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
48        let mut task_handles = vec![];
49        let finalizer = self.acknowledgements.then(|| {
50            let (finalizer, mut ack_stream) = Finalizer::new(Some(shutdown.clone()));
51            let client = self.client.clone();
52            let queue_url = self.queue_url.clone();
53            tokio::spawn(
54                async move {
55                    while let Some((status, receipts)) = ack_stream.next().await {
56                        if status == BatchStatus::Delivered {
57                            delete_messages(client.clone(), receipts, queue_url.clone()).await;
58                        }
59                    }
60                }
61                .in_current_span(),
62            );
63            Arc::new(finalizer)
64        });
65        let events_received = register!(EventsReceived);
66
67        for _ in 0..self.concurrency {
68            let source = self.clone();
69            let shutdown = shutdown.clone().fuse();
70            let mut out = out.clone();
71            let finalizer = finalizer.clone();
72            let events_received = events_received.clone();
73            task_handles.push(tokio::spawn(
74                async move {
75                    let finalizer = finalizer.as_ref();
76                    pin!(shutdown);
77                    loop {
78                        select! {
79                            _ = &mut shutdown => break,
80                            _ = source.run_once(&mut out, finalizer, events_received.clone()) => {},
81                        }
82                    }
83                }
84                .in_current_span(),
85            ));
86        }
87
88        // Wait for all of the processes to finish.  If any one of them panics, we resume
89        // that panic here to properly shutdown Vector.
90        for task_handle in task_handles.drain(..) {
91            if let Err(e) = task_handle.await
92                && e.is_panic()
93            {
94                panic::resume_unwind(e.into_panic());
95            }
96        }
97        Ok(())
98    }
99
100    async fn run_once(
101        &self,
102        out: &mut SourceSender,
103        finalizer: Option<&Arc<Finalizer>>,
104        events_received: Registered<EventsReceived>,
105    ) {
106        let result = self
107            .client
108            .receive_message()
109            .queue_url(&self.queue_url)
110            .max_number_of_messages(MAX_BATCH_SIZE)
111            .wait_time_seconds(self.poll_secs as i32)
112            .visibility_timeout(self.visibility_timeout_secs as i32)
113            .message_system_attribute_names(MessageSystemAttributeName::from("SentTimestamp"))
114            // I think this should be a known attribute
115            // https://github.com/awslabs/aws-sdk-rust/issues/411
116            .send()
117            .await;
118
119        let receive_message_output = match result {
120            Ok(output) => output,
121            Err(err) => {
122                emit!(SqsMessageReceiveError { error: &err });
123                return;
124            }
125        };
126
127        if let Some(messages) = receive_message_output.messages {
128            let byte_size = messages
129                .iter()
130                .map(|message| message.body().map(|body| body.len()).unwrap_or(0))
131                .sum();
132            emit!(EndpointBytesReceived {
133                byte_size,
134                protocol: "http",
135                endpoint: &self.queue_url
136            });
137
138            let mut receipts_to_ack = Vec::with_capacity(messages.len());
139            let mut events = Vec::with_capacity(messages.len());
140
141            let (batch, batch_receiver) =
142                BatchNotifier::maybe_new_with_receiver(finalizer.is_some());
143            for message in messages {
144                if let Some(body) = message.body {
145                    // a receipt handle should always exist
146                    if let Some(receipt_handle) = message.receipt_handle {
147                        receipts_to_ack.push(receipt_handle);
148                    }
149                    let timestamp = get_timestamp(&message.attributes);
150                    // Error is logged by `crate::codecs::Decoder`, no further handling
151                    // is needed here.
152                    let decoded = util::decode_message(
153                        self.decoder.clone(),
154                        "aws_sqs",
155                        body.as_bytes(),
156                        timestamp,
157                        &batch,
158                        self.log_namespace,
159                        &events_received,
160                    );
161                    events.extend(decoded);
162                }
163            }
164            drop(batch); // Drop last reference to batch acknowledgement finalizer
165            let count = events.len();
166
167            match out.send_batch(events).await {
168                Ok(()) => {
169                    if self.delete_message {
170                        match batch_receiver {
171                            Some(receiver) => finalizer
172                                .expect("Finalizer must exist for the batch receiver to be created")
173                                .add(receipts_to_ack, receiver),
174                            None => {
175                                delete_messages(
176                                    self.client.clone(),
177                                    receipts_to_ack,
178                                    self.queue_url.clone(),
179                                )
180                                .await
181                            }
182                        }
183                    }
184                }
185                Err(_) => emit!(StreamClosedError { count }),
186            }
187        }
188    }
189}
190
191fn get_timestamp(
192    attributes: &Option<HashMap<MessageSystemAttributeName, String>>,
193) -> Option<DateTime<Utc>> {
194    attributes.as_ref().and_then(|attributes| {
195        let sent_time_str = attributes.get(&MessageSystemAttributeName::SentTimestamp)?;
196        Some(
197            Utc.timestamp_millis_opt(i64::from_str(sent_time_str).ok()?)
198                .single()
199                .expect("invalid timestamp"),
200        )
201    })
202}
203
204async fn delete_messages(client: SqsClient, receipts: Vec<String>, queue_url: String) {
205    if !receipts.is_empty() {
206        let mut batch = client.delete_message_batch().queue_url(queue_url);
207
208        for (id, receipt) in receipts.into_iter().enumerate() {
209            batch = batch.entries(
210                DeleteMessageBatchRequestEntry::builder()
211                    .id(id.to_string())
212                    .receipt_handle(receipt)
213                    .build()
214                    .expect("all required builder parameters specified"),
215            );
216        }
217        if let Err(err) = batch.send().await {
218            emit!(SqsMessageDeleteError { error: &err });
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use chrono::SecondsFormat;
226    use vector_lib::lookup::path;
227
228    use super::*;
229    use crate::{
230        codecs::DecodingConfig,
231        config::{SourceConfig, log_schema},
232        sources::aws_sqs::AwsSqsConfig,
233    };
234
235    #[tokio::test]
236    async fn test_decode_vector_namespace() {
237        let config = AwsSqsConfig {
238            log_namespace: Some(true),
239            ..Default::default()
240        };
241        let definitions = config
242            .outputs(LogNamespace::Vector)
243            .remove(0)
244            .schema_definition(true);
245
246        let message = "test";
247        let now = Utc::now();
248        let events: Vec<_> = util::decode_message(
249            DecodingConfig::new(
250                config.framing.clone(),
251                config.decoding,
252                LogNamespace::Vector,
253            )
254            .build()
255            .unwrap(),
256            "aws_sqs",
257            b"test",
258            Some(now),
259            &None,
260            LogNamespace::Vector,
261            &register!(EventsReceived),
262        )
263        .collect();
264        assert_eq!(events.len(), 1);
265        assert_eq!(
266            events[0]
267                .clone()
268                .as_log()
269                .get(".")
270                .unwrap()
271                .to_string_lossy(),
272            message
273        );
274        assert_eq!(
275            events[0]
276                .clone()
277                .as_log()
278                .metadata()
279                .value()
280                .get(path!(AwsSqsConfig::NAME, "timestamp"))
281                .unwrap()
282                .to_string_lossy(),
283            now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
284        );
285        definitions.unwrap().assert_valid_for_event(&events[0]);
286    }
287
288    #[tokio::test]
289    async fn test_decode_legacy_namespace() {
290        let config = AwsSqsConfig {
291            log_namespace: None,
292            ..Default::default()
293        };
294        let definitions = config
295            .outputs(LogNamespace::Legacy)
296            .remove(0)
297            .schema_definition(true);
298
299        let message = "test";
300        let now = Utc::now();
301        let events: Vec<_> = util::decode_message(
302            DecodingConfig::new(
303                config.framing.clone(),
304                config.decoding,
305                LogNamespace::Legacy,
306            )
307            .build()
308            .unwrap(),
309            "aws_sqs",
310            b"test",
311            Some(now),
312            &None,
313            LogNamespace::Legacy,
314            &register!(EventsReceived),
315        )
316        .collect();
317        assert_eq!(events.len(), 1);
318        assert_eq!(
319            events[0]
320                .clone()
321                .as_log()
322                .get(log_schema().message_key_target_path().unwrap())
323                .unwrap()
324                .to_string_lossy(),
325            message
326        );
327        assert_eq!(
328            events[0]
329                .clone()
330                .as_log()
331                .get_timestamp()
332                .unwrap()
333                .to_string_lossy(),
334            now.to_rfc3339_opts(SecondsFormat::AutoSi, true)
335        );
336        definitions.unwrap().assert_valid_for_event(&events[0]);
337    }
338
339    #[test]
340    fn test_get_timestamp() {
341        let attributes = HashMap::from([(
342            MessageSystemAttributeName::SentTimestamp,
343            "1636408546018".to_string(),
344        )]);
345
346        assert_eq!(
347            get_timestamp(&Some(attributes)),
348            Some(
349                Utc.timestamp_millis_opt(1636408546018)
350                    .single()
351                    .expect("invalid timestamp")
352            )
353        );
354    }
355}