vector/sources/aws_s3/
sqs.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    panic,
6    sync::{Arc, LazyLock},
7    time::Duration,
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12    Client as SqsClient,
13    operation::{
14        delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15        receive_message::ReceiveMessageError,
16        send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17    },
18    types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use tracing::Instrument;
32use vector_lib::{
33    codecs::decoding::FramingError,
34    config::{LegacyKey, LogNamespace, log_schema},
35    configurable::configurable_component,
36    event::MaybeAsLogMut,
37    internal_event::{
38        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
39    },
40    lookup::{PathPrefix, metadata_path, path},
41};
42
43use crate::{
44    SourceSender,
45    aws::AwsTimeout,
46    codecs::Decoder,
47    common::backoff::ExponentialBackoff,
48    config::{SourceAcknowledgementsConfig, SourceContext},
49    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
50    internal_events::{
51        EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
52        SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
53        SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsMessageSendBatchError,
54        SqsMessageSentPartialError, SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored,
55        StreamClosedError,
56    },
57    line_agg::{self, LineAgg},
58    shutdown::ShutdownSignal,
59    sources::aws_s3::AwsS3Config,
60    tls::TlsConfig,
61};
62
63static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
64    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
65
66/// Configuration for deferring events based on their age.
67#[serde_as]
68#[configurable_component]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub(super) struct DeferredConfig {
72    /// The URL of the queue to forward events to when they are older than `max_age_secs`.
73    #[configurable(metadata(
74        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
75    ))]
76    #[configurable(validation(format = "uri"))]
77    pub(super) queue_url: String,
78
79    /// Event must have been emitted within the last `max_age_secs` seconds to be processed.
80    ///
81    /// If the event is older, it is forwarded to the `queue_url` for later processing.
82    #[configurable(metadata(docs::type_unit = "seconds"))]
83    #[configurable(metadata(docs::examples = 3600))]
84    pub(super) max_age_secs: u64,
85}
86
87/// SQS configuration options.
88//
89// TODO: It seems awfully likely that we could re-use the existing configuration type for the `aws_sqs` source in some
90// way, given the near 100% overlap in configurable values.
91#[serde_as]
92#[configurable_component]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub(super) struct Config {
97    /// The URL of the SQS queue to poll for bucket notifications.
98    #[configurable(metadata(
99        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
100    ))]
101    #[configurable(validation(format = "uri"))]
102    pub(super) queue_url: String,
103
104    /// How long to wait while polling the queue for new messages, in seconds.
105    ///
106    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
107    /// they are always consumed, regardless of the value of `poll_secs`.
108    // NOTE: We restrict this to u32 for safe conversion to i32 later.
109    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
110    #[serde(default = "default_poll_secs")]
111    #[derivative(Default(value = "default_poll_secs()"))]
112    #[configurable(metadata(docs::type_unit = "seconds"))]
113    pub(super) poll_secs: u32,
114
115    /// The visibility timeout to use for messages, in seconds.
116    ///
117    /// This controls how long a message is left unavailable after it is received. If a message is received, and
118    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
119    ///
120    /// This can happen if there is an issue between consuming a message and deleting it.
121    // NOTE: We restrict this to u32 for safe conversion to i32 later.
122    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
123    #[serde(default = "default_visibility_timeout_secs")]
124    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
125    #[configurable(metadata(docs::type_unit = "seconds"))]
126    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
127    pub(super) visibility_timeout_secs: u32,
128
129    /// Whether to delete the message once it is processed.
130    ///
131    /// It can be useful to set this to `false` for debugging or during the initial setup.
132    #[serde(default = "default_true")]
133    #[derivative(Default(value = "default_true()"))]
134    pub(super) delete_message: bool,
135
136    /// Whether to delete non-retryable messages.
137    ///
138    /// If a message is rejected by the sink and not retryable, it is deleted from the queue.
139    #[serde(default = "default_true")]
140    #[derivative(Default(value = "default_true()"))]
141    pub(super) delete_failed_message: bool,
142
143    /// Number of concurrent tasks to create for polling the queue for messages.
144    ///
145    /// Defaults to the number of available CPUs on the system.
146    ///
147    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
148    /// value when there is a high rate of messages being pushed into the queue and the objects
149    /// being fetched are small. In these cases, system resources may not be fully utilized without
150    /// fetching more messages per second, as the SQS message consumption rate affects the S3 object
151    /// retrieval rate.
152    #[configurable(metadata(docs::type_unit = "tasks"))]
153    #[configurable(metadata(docs::examples = 5))]
154    pub(super) client_concurrency: Option<NonZeroUsize>,
155
156    /// Maximum number of messages to poll from SQS in a batch
157    ///
158    /// Defaults to 10
159    ///
160    /// Should be set to a smaller value when the files are large to help prevent the ingestion of
161    /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
162    // NOTE: We restrict this to u32 for safe conversion to i32 later.
163    #[serde(default = "default_max_number_of_messages")]
164    #[derivative(Default(value = "default_max_number_of_messages()"))]
165    #[configurable(metadata(docs::human_name = "Max Messages"))]
166    #[configurable(metadata(docs::examples = 1))]
167    pub(super) max_number_of_messages: u32,
168
169    #[configurable(derived)]
170    #[serde(default)]
171    #[derivative(Default)]
172    pub(super) tls_options: Option<TlsConfig>,
173
174    // Client timeout configuration for SQS operations. Take care when configuring these settings
175    // to allow enough time for the polling interval configured in `poll_secs`.
176    #[configurable(derived)]
177    #[derivative(Default)]
178    #[serde(default)]
179    #[serde(flatten)]
180    pub(super) timeout: Option<AwsTimeout>,
181
182    /// Configuration for deferring events to another queue based on their age.
183    #[configurable(derived)]
184    pub(super) deferred: Option<DeferredConfig>,
185}
186
187const fn default_poll_secs() -> u32 {
188    15
189}
190
191const fn default_visibility_timeout_secs() -> u32 {
192    300
193}
194
195const fn default_max_number_of_messages() -> u32 {
196    10
197}
198
199const fn default_true() -> bool {
200    true
201}
202
203#[derive(Debug, Snafu)]
204pub(super) enum IngestorNewError {
205    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
206    InvalidNumberOfMessages { messages: u32 },
207}
208
209#[allow(clippy::large_enum_variant)]
210#[derive(Debug, Snafu)]
211pub enum ProcessingError {
212    #[snafu(display(
213        "Could not parse SQS message with id {} as S3 notification: {}",
214        message_id,
215        source
216    ))]
217    InvalidSqsMessage {
218        source: serde_json::Error,
219        message_id: String,
220    },
221    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
222    GetObject {
223        source: SdkError<GetObjectError, HttpResponse>,
224        bucket: String,
225        key: String,
226    },
227    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
228    ReadObject {
229        source: Box<dyn FramingError>,
230        bucket: String,
231        key: String,
232    },
233    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
234    PipelineSend {
235        source: crate::source_sender::ClosedError,
236        bucket: String,
237        key: String,
238    },
239    #[snafu(display(
240        "Object notification for s3://{}/{} is a bucket in another region: {}",
241        bucket,
242        key,
243        region
244    ))]
245    WrongRegion {
246        region: String,
247        bucket: String,
248        key: String,
249    },
250    #[snafu(display("Unsupported S3 event version: {}.", version,))]
251    UnsupportedS3EventVersion { version: semver::Version },
252    #[snafu(display(
253        "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
254        region,
255        bucket,
256        key
257    ))]
258    ErrorAcknowledgement {
259        region: String,
260        bucket: String,
261        key: String,
262    },
263    #[snafu(display(
264        "File s3://{}/{} too old.  Forwarded to deferred queue {}",
265        bucket,
266        key,
267        deferred_queue
268    ))]
269    FileTooOld {
270        bucket: String,
271        key: String,
272        deferred_queue: String,
273    },
274}
275
276pub struct State {
277    region: Region,
278
279    s3_client: S3Client,
280    sqs_client: SqsClient,
281
282    multiline: Option<line_agg::Config>,
283    compression: super::Compression,
284
285    queue_url: String,
286    poll_secs: i32,
287    max_number_of_messages: i32,
288    client_concurrency: usize,
289    visibility_timeout_secs: i32,
290    delete_message: bool,
291    delete_failed_message: bool,
292    decoder: Decoder,
293
294    deferred: Option<DeferredConfig>,
295}
296
297pub(super) struct Ingestor {
298    state: Arc<State>,
299}
300
301impl Ingestor {
302    pub(super) async fn new(
303        region: Region,
304        sqs_client: SqsClient,
305        s3_client: S3Client,
306        config: Config,
307        compression: super::Compression,
308        multiline: Option<line_agg::Config>,
309        decoder: Decoder,
310    ) -> Result<Ingestor, IngestorNewError> {
311        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
312            return Err(IngestorNewError::InvalidNumberOfMessages {
313                messages: config.max_number_of_messages,
314            });
315        }
316        let state = Arc::new(State {
317            region,
318
319            s3_client,
320            sqs_client,
321
322            compression,
323            multiline,
324
325            queue_url: config.queue_url,
326            poll_secs: config.poll_secs as i32,
327            max_number_of_messages: config.max_number_of_messages as i32,
328            client_concurrency: config
329                .client_concurrency
330                .map(|n| n.get())
331                .unwrap_or_else(crate::num_threads),
332            visibility_timeout_secs: config.visibility_timeout_secs as i32,
333            delete_message: config.delete_message,
334            delete_failed_message: config.delete_failed_message,
335            decoder,
336
337            deferred: config.deferred,
338        });
339
340        Ok(Ingestor { state })
341    }
342
343    pub(super) async fn run(
344        self,
345        cx: SourceContext,
346        acknowledgements: SourceAcknowledgementsConfig,
347        log_namespace: LogNamespace,
348    ) -> Result<(), ()> {
349        let acknowledgements = cx.do_acknowledgements(acknowledgements);
350        let mut handles = Vec::new();
351        for _ in 0..self.state.client_concurrency {
352            let process = IngestorProcess::new(
353                Arc::clone(&self.state),
354                cx.out.clone(),
355                cx.shutdown.clone(),
356                log_namespace,
357                acknowledgements,
358            );
359            let fut = process.run();
360            let handle = tokio::spawn(fut.in_current_span());
361            handles.push(handle);
362        }
363
364        // Wait for all of the processes to finish.  If any one of them panics, we resume
365        // that panic here to properly shutdown Vector.
366        for handle in handles.drain(..) {
367            if let Err(e) = handle.await
368                && e.is_panic()
369            {
370                panic::resume_unwind(e.into_panic());
371            }
372        }
373
374        Ok(())
375    }
376}
377
378pub struct IngestorProcess {
379    state: Arc<State>,
380    out: SourceSender,
381    shutdown: ShutdownSignal,
382    acknowledgements: bool,
383    log_namespace: LogNamespace,
384    bytes_received: Registered<BytesReceived>,
385    events_received: Registered<EventsReceived>,
386    backoff: ExponentialBackoff,
387}
388
389impl IngestorProcess {
390    pub fn new(
391        state: Arc<State>,
392        out: SourceSender,
393        shutdown: ShutdownSignal,
394        log_namespace: LogNamespace,
395        acknowledgements: bool,
396    ) -> Self {
397        Self {
398            state,
399            out,
400            shutdown,
401            acknowledgements,
402            log_namespace,
403            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
404            events_received: register!(EventsReceived),
405            backoff: ExponentialBackoff::from_millis(2)
406                .factor(250)
407                .max_delay(Duration::from_secs(30)),
408        }
409    }
410
411    async fn run(mut self) {
412        let shutdown = self.shutdown.clone().fuse();
413        pin!(shutdown);
414
415        loop {
416            select! {
417                _ = &mut shutdown => break,
418                result = self.run_once() => {
419                    match result {
420                        Ok(()) => {
421                            // Reset backoff on successful receive
422                            self.backoff.reset();
423                        }
424                        Err(_) => {
425                            let delay = self.backoff.next().expect("backoff never ends");
426                            trace!(
427                                delay_ms = delay.as_millis(),
428                                "`run_once` failed, will retry after delay.",
429                            );
430                            tokio::time::sleep(delay).await;
431                        }
432                    }
433                },
434            }
435        }
436    }
437
438    async fn run_once(&mut self) -> Result<(), ()> {
439        let messages = match self.receive_messages().await {
440            Ok(messages) => {
441                emit!(SqsMessageReceiveSucceeded {
442                    count: messages.len(),
443                });
444                messages
445            }
446            Err(err) => {
447                emit!(SqsMessageReceiveError { error: &err });
448                return Err(());
449            }
450        };
451
452        let mut delete_entries = Vec::new();
453        let mut deferred_entries = Vec::new();
454        for message in messages {
455            let receipt_handle = match message.receipt_handle {
456                None => {
457                    // I don't think this will ever actually happen, but is just an artifact of the
458                    // AWS's API predilection for returning nullable values for all response
459                    // attributes
460                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
461                    continue;
462                }
463                Some(ref handle) => handle.to_owned(),
464            };
465
466            let message_id = message
467                .message_id
468                .clone()
469                .unwrap_or_else(|| "<unknown>".to_owned());
470            match self.handle_sqs_message(message.clone()).await {
471                Ok(()) => {
472                    emit!(SqsMessageProcessingSucceeded {
473                        message_id: &message_id
474                    });
475                    if self.state.delete_message {
476                        trace!(
477                            message = "Queued SQS message for deletion.",
478                            id = message_id,
479                            receipt_handle = receipt_handle,
480                        );
481                        delete_entries.push(
482                            DeleteMessageBatchRequestEntry::builder()
483                                .id(message_id.clone())
484                                .receipt_handle(receipt_handle)
485                                .build()
486                                .expect("all required builder params specified"),
487                        );
488                    }
489                }
490                Err(err) => {
491                    match err {
492                        ProcessingError::FileTooOld { .. } => {
493                            emit!(SqsMessageProcessingSucceeded {
494                                message_id: &message_id
495                            });
496                            if let Some(deferred) = &self.state.deferred {
497                                trace!(
498                                    message = "Forwarding message to deferred queue.",
499                                    id = message_id,
500                                    receipt_handle = receipt_handle,
501                                    deferred_queue = deferred.queue_url,
502                                );
503
504                                deferred_entries.push(
505                                    SendMessageBatchRequestEntry::builder()
506                                        .id(message_id.clone())
507                                        .message_body(message.body.unwrap_or_default())
508                                        .build()
509                                        .expect("all required builder params specified"),
510                                );
511                            }
512                            //  maybe delete the message from current queue since we have processed it
513                            if self.state.delete_message {
514                                trace!(
515                                    message = "Queued SQS message for deletion.",
516                                    id = message_id,
517                                    receipt_handle = receipt_handle,
518                                );
519                                delete_entries.push(
520                                    DeleteMessageBatchRequestEntry::builder()
521                                        .id(message_id)
522                                        .receipt_handle(receipt_handle)
523                                        .build()
524                                        .expect("all required builder params specified"),
525                                );
526                            }
527                        }
528                        _ => {
529                            emit!(SqsMessageProcessingError {
530                                message_id: &message_id,
531                                error: &err,
532                            });
533                        }
534                    }
535                }
536            }
537        }
538
539        // Should consider removing failed deferrals from the delete_entries
540        if !deferred_entries.is_empty() {
541            let Some(deferred) = &self.state.deferred else {
542                warn!("Deferred queue not configured, but received deferred entries.");
543                return Ok(());
544            };
545            let cloned_entries = deferred_entries.clone();
546            match self
547                .send_messages(deferred_entries, deferred.queue_url.clone())
548                .await
549            {
550                Ok(result) => {
551                    if !result.successful.is_empty() {
552                        emit!(SqsMessageSentSucceeded {
553                            message_ids: result.successful,
554                        })
555                    }
556
557                    if !result.failed.is_empty() {
558                        emit!(SqsMessageSentPartialError {
559                            entries: result.failed
560                        })
561                    }
562                }
563                Err(err) => {
564                    emit!(SqsMessageSendBatchError {
565                        entries: cloned_entries,
566                        error: err,
567                    });
568                }
569            }
570        }
571        if !delete_entries.is_empty() {
572            // We need these for a correct error message if the batch fails overall.
573            let cloned_entries = delete_entries.clone();
574            match self.delete_messages(delete_entries).await {
575                Ok(result) => {
576                    // Batch deletes can have partial successes/failures, so we have to check
577                    // for both cases and emit accordingly.
578                    if !result.successful.is_empty() {
579                        emit!(SqsMessageDeleteSucceeded {
580                            message_ids: result.successful,
581                        });
582                    }
583
584                    if !result.failed.is_empty() {
585                        emit!(SqsMessageDeletePartialError {
586                            entries: result.failed
587                        });
588                    }
589                }
590                Err(err) => {
591                    emit!(SqsMessageDeleteBatchError {
592                        entries: cloned_entries,
593                        error: err,
594                    });
595                }
596            }
597        }
598        Ok(())
599    }
600
601    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
602        let sqs_body = message.body.unwrap_or_default();
603        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
604            .map(|notification| notification.message)
605            .unwrap_or(sqs_body);
606        let s3_event: SqsEvent =
607            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
608                message_id: message
609                    .message_id
610                    .clone()
611                    .unwrap_or_else(|| "<empty>".to_owned()),
612            })?;
613
614        match s3_event {
615            SqsEvent::TestEvent(_s3_test_event) => {
616                debug!(?message.message_id, message = "Found S3 Test Event.");
617                Ok(())
618            }
619            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
620        }
621    }
622
623    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
624        for record in s3_event.records {
625            self.handle_s3_event_record(record, self.log_namespace)
626                .await?
627        }
628        Ok(())
629    }
630
631    async fn handle_s3_event_record(
632        &mut self,
633        s3_event: S3EventRecord,
634        log_namespace: LogNamespace,
635    ) -> Result<(), ProcessingError> {
636        let event_version: semver::Version = s3_event.event_version.clone().into();
637        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
638            return Err(ProcessingError::UnsupportedS3EventVersion {
639                version: event_version.clone(),
640            });
641        }
642
643        if s3_event.event_name.kind != "ObjectCreated" {
644            emit!(SqsS3EventRecordInvalidEventIgnored {
645                bucket: &s3_event.s3.bucket.name,
646                key: &s3_event.s3.object.key,
647                kind: &s3_event.event_name.kind,
648                name: &s3_event.event_name.name,
649            });
650            return Ok(());
651        }
652
653        // S3 has to send notifications to a queue in the same region so I don't think this will
654        // actually ever be hit unless messages are being forwarded from one queue to another
655        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
656            return Err(ProcessingError::WrongRegion {
657                bucket: s3_event.s3.bucket.name.clone(),
658                key: s3_event.s3.object.key.clone(),
659                region: s3_event.aws_region,
660            });
661        }
662
663        if let Some(deferred) = &self.state.deferred {
664            let delta = Utc::now() - s3_event.event_time;
665            if delta.num_seconds() > deferred.max_age_secs as i64 {
666                return Err(ProcessingError::FileTooOld {
667                    bucket: s3_event.s3.bucket.name.clone(),
668                    key: s3_event.s3.object.key.clone(),
669                    deferred_queue: deferred.queue_url.clone(),
670                });
671            }
672        }
673
674        let object_result = self
675            .state
676            .s3_client
677            .get_object()
678            .bucket(s3_event.s3.bucket.name.clone())
679            .key(s3_event.s3.object.key.clone())
680            .send()
681            .await
682            .context(GetObjectSnafu {
683                bucket: s3_event.s3.bucket.name.clone(),
684                key: s3_event.s3.object.key.clone(),
685            });
686
687        let object = object_result?;
688
689        debug!(
690            message = "Got S3 object from SQS notification.",
691            bucket = s3_event.s3.bucket.name,
692            key = s3_event.s3.object.key,
693        );
694
695        let metadata = object.metadata;
696
697        let timestamp = object.last_modified.map(|ts| {
698            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
699                .single()
700                .expect("invalid timestamp")
701        });
702
703        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
704        let object_reader = super::s3_object_decoder(
705            self.state.compression,
706            &s3_event.s3.object.key,
707            object.content_encoding.as_deref(),
708            object.content_type.as_deref(),
709            object.body,
710        )
711        .await;
712
713        // Record the read error seen to propagate up later so we avoid ack'ing the SQS
714        // message
715        //
716        // String is used as we cannot clone std::io::Error to take ownership in closure
717        //
718        // FramedRead likely stops when it gets an i/o error but I found it more clear to
719        // show that we `take_while` there hasn't been an error
720        //
721        // This can result in objects being partially processed before an error, but we
722        // prefer duplicate lines over message loss. Future work could include recording
723        // the offset of the object that has been read, but this would only be relevant in
724        // the case that the same vector instance processes the same message.
725        let mut read_error = None;
726        let bytes_received = self.bytes_received.clone();
727        let events_received = self.events_received.clone();
728        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
729            FramedRead::new(object_reader, self.state.decoder.framer.clone())
730                .map(|res| {
731                    res.inspect(|bytes| {
732                        bytes_received.emit(ByteSize(bytes.len()));
733                    })
734                    .map_err(|err| {
735                        read_error = Some(err);
736                    })
737                    .ok()
738                })
739                .take_while(|res| ready(res.is_some()))
740                .map(|r| r.expect("validated by take_while")),
741        );
742
743        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
744            Some(config) => Box::new(
745                LineAgg::new(
746                    lines.map(|line| ((), line, ())),
747                    line_agg::Logic::new(config.clone()),
748                )
749                .map(|(_src, line, _context, _lastline_context)| line),
750            ),
751            None => lines,
752        };
753
754        let mut stream = lines.flat_map(|line| {
755            let events = match self.state.decoder.deserializer_parse(line) {
756                Ok((events, _events_size)) => events,
757                Err(_error) => {
758                    // Error is handled by `codecs::Decoder`, no further handling
759                    // is needed here.
760                    SmallVec::new()
761                }
762            };
763
764            let events = events
765                .into_iter()
766                .map(|mut event: Event| {
767                    event = event.with_batch_notifier_option(&batch);
768                    if let Some(log_event) = event.maybe_as_log_mut() {
769                        handle_single_log(
770                            log_event,
771                            log_namespace,
772                            &s3_event,
773                            &metadata,
774                            timestamp,
775                        );
776                    }
777                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
778                    event
779                })
780                .collect::<Vec<Event>>();
781            futures::stream::iter(events)
782        });
783
784        let send_error = match self.out.send_event_stream(&mut stream).await {
785            Ok(_) => None,
786            Err(_) => {
787                let (count, _) = stream.size_hint();
788                emit!(StreamClosedError { count });
789                Some(crate::source_sender::ClosedError)
790            }
791        };
792
793        // Up above, `lines` captures `read_error`, and eventually is captured by `stream`,
794        // so we explicitly drop it so that we can again utilize `read_error` below.
795        drop(stream);
796
797        // The BatchNotifier is cloned for each LogEvent in the batch stream, but the last
798        // reference must be dropped before the status of the batch is sent to the channel.
799        drop(batch);
800
801        if let Some(error) = read_error {
802            Err(ProcessingError::ReadObject {
803                source: error,
804                bucket: s3_event.s3.bucket.name.clone(),
805                key: s3_event.s3.object.key.clone(),
806            })
807        } else if let Some(error) = send_error {
808            Err(ProcessingError::PipelineSend {
809                source: error,
810                bucket: s3_event.s3.bucket.name.clone(),
811                key: s3_event.s3.object.key.clone(),
812            })
813        } else {
814            match receiver {
815                None => Ok(()),
816                Some(receiver) => {
817                    let result = receiver.await;
818                    match result {
819                        BatchStatus::Delivered => {
820                            debug!(
821                                message = "S3 object from SQS delivered.",
822                                bucket = s3_event.s3.bucket.name,
823                                key = s3_event.s3.object.key,
824                            );
825                            Ok(())
826                        }
827                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
828                            bucket: s3_event.s3.bucket.name,
829                            key: s3_event.s3.object.key,
830                            region: s3_event.aws_region,
831                        }),
832                        BatchStatus::Rejected => {
833                            if self.state.delete_failed_message {
834                                warn!(
835                                    message =
836                                        "S3 object from SQS was rejected. Deleting failed message.",
837                                    bucket = s3_event.s3.bucket.name,
838                                    key = s3_event.s3.object.key,
839                                );
840                                Ok(())
841                            } else {
842                                Err(ProcessingError::ErrorAcknowledgement {
843                                    bucket: s3_event.s3.bucket.name,
844                                    key: s3_event.s3.object.key,
845                                    region: s3_event.aws_region,
846                                })
847                            }
848                        }
849                    }
850                }
851            }
852        }
853    }
854
855    async fn receive_messages(
856        &mut self,
857    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
858        self.state
859            .sqs_client
860            .receive_message()
861            .queue_url(self.state.queue_url.clone())
862            .max_number_of_messages(self.state.max_number_of_messages)
863            .visibility_timeout(self.state.visibility_timeout_secs)
864            .wait_time_seconds(self.state.poll_secs)
865            .send()
866            .map_ok(|res| res.messages.unwrap_or_default())
867            .await
868    }
869
870    async fn delete_messages(
871        &mut self,
872        entries: Vec<DeleteMessageBatchRequestEntry>,
873    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
874        self.state
875            .sqs_client
876            .delete_message_batch()
877            .queue_url(self.state.queue_url.clone())
878            .set_entries(Some(entries))
879            .send()
880            .await
881    }
882
883    async fn send_messages(
884        &mut self,
885        entries: Vec<SendMessageBatchRequestEntry>,
886        queue_url: String,
887    ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
888        self.state
889            .sqs_client
890            .send_message_batch()
891            .queue_url(queue_url.clone())
892            .set_entries(Some(entries))
893            .send()
894            .await
895    }
896}
897
898fn handle_single_log(
899    log: &mut LogEvent,
900    log_namespace: LogNamespace,
901    s3_event: &S3EventRecord,
902    metadata: &Option<HashMap<String, String>>,
903    timestamp: Option<DateTime<Utc>>,
904) {
905    log_namespace.insert_source_metadata(
906        AwsS3Config::NAME,
907        log,
908        Some(LegacyKey::Overwrite(path!("bucket"))),
909        path!("bucket"),
910        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
911    );
912
913    log_namespace.insert_source_metadata(
914        AwsS3Config::NAME,
915        log,
916        Some(LegacyKey::Overwrite(path!("object"))),
917        path!("object"),
918        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
919    );
920    log_namespace.insert_source_metadata(
921        AwsS3Config::NAME,
922        log,
923        Some(LegacyKey::Overwrite(path!("region"))),
924        path!("region"),
925        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
926    );
927
928    if let Some(metadata) = metadata {
929        for (key, value) in metadata {
930            log_namespace.insert_source_metadata(
931                AwsS3Config::NAME,
932                log,
933                Some(LegacyKey::Overwrite(path!(key))),
934                path!("metadata", key.as_str()),
935                value.clone(),
936            );
937        }
938    }
939
940    log_namespace.insert_vector_metadata(
941        log,
942        log_schema().source_type_key(),
943        path!("source_type"),
944        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
945    );
946
947    // This handles the transition from the original timestamp logic. Originally the
948    // `timestamp_key` was populated by the `last_modified` time on the object, falling
949    // back to calling `now()`.
950    match log_namespace {
951        LogNamespace::Vector => {
952            if let Some(timestamp) = timestamp {
953                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
954            }
955
956            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
957        }
958        LogNamespace::Legacy => {
959            if let Some(timestamp_key) = log_schema().timestamp_key() {
960                log.try_insert(
961                    (PathPrefix::Event, timestamp_key),
962                    timestamp.unwrap_or_else(Utc::now),
963                );
964            }
965        }
966    };
967}
968
969// https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
970#[derive(Clone, Debug, Deserialize)]
971#[serde(rename_all = "PascalCase")]
972pub struct SnsNotification {
973    pub message: String,
974    pub timestamp: DateTime<Utc>,
975}
976
977// https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html
978#[derive(Clone, Debug, Deserialize)]
979#[serde(untagged)]
980enum SqsEvent {
981    Event(S3Event),
982    TestEvent(S3TestEvent),
983}
984
985#[derive(Clone, Debug, Deserialize)]
986#[serde(rename_all = "PascalCase")]
987pub struct S3TestEvent {
988    pub service: String,
989    pub event: S3EventName,
990    pub bucket: String,
991}
992
993// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
994#[derive(Clone, Debug, Deserialize, Serialize)]
995#[serde(rename_all = "PascalCase")]
996pub struct S3Event {
997    pub records: Vec<S3EventRecord>,
998}
999
1000#[derive(Clone, Debug, Deserialize, Serialize)]
1001#[serde(rename_all = "camelCase")]
1002pub struct S3EventRecord {
1003    pub event_version: S3EventVersion,
1004    pub event_source: String,
1005    pub aws_region: String,
1006    pub event_name: S3EventName,
1007    pub event_time: DateTime<Utc>,
1008
1009    pub s3: S3Message,
1010}
1011
1012#[derive(Clone, Debug)]
1013pub struct S3EventVersion {
1014    pub major: u64,
1015    pub minor: u64,
1016}
1017
1018impl From<S3EventVersion> for semver::Version {
1019    fn from(v: S3EventVersion) -> semver::Version {
1020        semver::Version::new(v.major, v.minor, 0)
1021    }
1022}
1023
1024// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1025// <major>.<minor>
1026impl<'de> Deserialize<'de> for S3EventVersion {
1027    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1028    where
1029        D: Deserializer<'de>,
1030    {
1031        use serde::de::Error;
1032
1033        let s = String::deserialize(deserializer)?;
1034
1035        let mut parts = s.splitn(2, '.');
1036
1037        let major = parts
1038            .next()
1039            .ok_or_else(|| D::Error::custom("Missing major version number"))?
1040            .parse::<u64>()
1041            .map_err(D::Error::custom)?;
1042
1043        let minor = parts
1044            .next()
1045            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1046            .parse::<u64>()
1047            .map_err(D::Error::custom)?;
1048
1049        Ok(S3EventVersion { major, minor })
1050    }
1051}
1052
1053impl Serialize for S3EventVersion {
1054    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1055    where
1056        S: Serializer,
1057    {
1058        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1059    }
1060}
1061
1062#[derive(Clone, Debug)]
1063pub struct S3EventName {
1064    pub kind: String,
1065    pub name: String,
1066}
1067
1068// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
1069//
1070// we could use enums here, but that seems overly brittle as deserialization would break if they
1071// add new event types or names
1072impl<'de> Deserialize<'de> for S3EventName {
1073    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1074    where
1075        D: Deserializer<'de>,
1076    {
1077        use serde::de::Error;
1078
1079        let s = String::deserialize(deserializer)?;
1080
1081        let mut parts = s.splitn(2, ':');
1082
1083        let kind = parts
1084            .next()
1085            .ok_or_else(|| D::Error::custom("Missing event kind"))?
1086            .parse::<String>()
1087            .map_err(D::Error::custom)?;
1088
1089        let name = parts
1090            .next()
1091            .ok_or_else(|| D::Error::custom("Missing event name"))?
1092            .parse::<String>()
1093            .map_err(D::Error::custom)?;
1094
1095        Ok(S3EventName { kind, name })
1096    }
1097}
1098
1099impl Serialize for S3EventName {
1100    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1101    where
1102        S: Serializer,
1103    {
1104        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1105    }
1106}
1107
1108#[derive(Clone, Debug, Deserialize, Serialize)]
1109#[serde(rename_all = "camelCase")]
1110pub struct S3Message {
1111    pub bucket: S3Bucket,
1112    pub object: S3Object,
1113}
1114
1115#[derive(Clone, Debug, Deserialize, Serialize)]
1116#[serde(rename_all = "camelCase")]
1117pub struct S3Bucket {
1118    pub name: String,
1119}
1120
1121#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1122#[serde(rename_all = "camelCase")]
1123pub struct S3Object {
1124    // S3ObjectKeys are URL encoded
1125    // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
1126    #[serde(with = "urlencoded_string")]
1127    pub key: String,
1128}
1129
1130mod urlencoded_string {
1131    use percent_encoding::{percent_decode, utf8_percent_encode};
1132
1133    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1134    where
1135        D: serde::de::Deserializer<'de>,
1136    {
1137        use serde::de::Error;
1138
1139        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1140            let decoded = if s.contains(&b'+') {
1141                // AWS encodes spaces as `+` rather than `%20`, so we first need to handle this.
1142                let s = s
1143                    .iter()
1144                    .map(|c| if *c == b'+' { b' ' } else { *c })
1145                    .collect::<Vec<_>>();
1146                percent_decode(&s).decode_utf8().map(Into::into)
1147            } else {
1148                percent_decode(s).decode_utf8().map(Into::into)
1149            };
1150
1151            decoded
1152                .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1153        })
1154    }
1155
1156    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1157    where
1158        S: serde::ser::Serializer,
1159    {
1160        serializer.serialize_str(
1161            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1162        )
1163    }
1164}
1165
1166#[test]
1167fn test_key_deserialize() {
1168    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1169    assert_eq!(
1170        S3Object {
1171            key: "noog nork".to_string(),
1172        },
1173        value
1174    );
1175
1176    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1177    assert_eq!(
1178        S3Object {
1179            key: "noog+nork".to_string(),
1180        },
1181        value
1182    );
1183}
1184
1185#[test]
1186fn test_s3_testevent() {
1187    let value: S3TestEvent = serde_json::from_str(
1188        r#"{
1189        "Service":"Amazon S3",
1190        "Event":"s3:TestEvent",
1191        "Time":"2014-10-13T15:57:02.089Z",
1192        "Bucket":"bucketname",
1193        "RequestId":"5582815E1AEA5ADF",
1194        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1195     }"#,
1196    )
1197    .unwrap();
1198
1199    assert_eq!(value.service, "Amazon S3".to_string());
1200    assert_eq!(value.bucket, "bucketname".to_string());
1201    assert_eq!(value.event.kind, "s3".to_string());
1202    assert_eq!(value.event.name, "TestEvent".to_string());
1203}
1204
1205#[test]
1206fn test_s3_sns_testevent() {
1207    let sns_value: SnsNotification = serde_json::from_str(
1208        r#"{
1209        "Type" : "Notification",
1210        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1211        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1212        "Subject" : "Testing publish to subscribed queues",
1213        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1214        "Timestamp" : "2012-03-29T05:12:16.901Z",
1215        "SignatureVersion" : "1",
1216        "Signature" : "EXAMPLEnTrFPa3...",
1217        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1218        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1219     }"#,
1220    ).unwrap();
1221
1222    assert_eq!(
1223        sns_value.timestamp,
1224        DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1225            .unwrap()
1226            .to_utc()
1227    );
1228
1229    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1230
1231    assert_eq!(value.service, "Amazon S3".to_string());
1232    assert_eq!(value.bucket, "bucketname".to_string());
1233    assert_eq!(value.event.kind, "s3".to_string());
1234    assert_eq!(value.event.name, "TestEvent".to_string());
1235}
1236
1237#[test]
1238fn parse_sqs_config() {
1239    let config: Config = toml::from_str(
1240        r#"
1241            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1242        "#,
1243    )
1244    .unwrap();
1245    assert_eq!(
1246        config.queue_url,
1247        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1248    );
1249    assert!(config.deferred.is_none());
1250
1251    let config: Config = toml::from_str(
1252        r#"
1253            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1254            [deferred]
1255            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1256            max_age_secs = 3600
1257        "#,
1258    )
1259    .unwrap();
1260    assert_eq!(
1261        config.queue_url,
1262        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1263    );
1264    let Some(deferred) = config.deferred else {
1265        panic!("Expected deferred config");
1266    };
1267    assert_eq!(
1268        deferred.queue_url,
1269        "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1270    );
1271    assert_eq!(deferred.max_age_secs, 3600);
1272
1273    let test: Result<Config, toml::de::Error> = toml::from_str(
1274        r#"
1275            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1276            [deferred]
1277            max_age_secs = 3600
1278        "#,
1279    );
1280    assert!(test.is_err());
1281
1282    let test: Result<Config, toml::de::Error> = toml::from_str(
1283        r#"
1284            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1285            [deferred]
1286            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1287        "#,
1288    );
1289    assert!(test.is_err());
1290}