vector/sources/aws_s3/
sqs.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    panic,
6    sync::{Arc, LazyLock},
7    time::{Duration, Instant},
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12    Client as SqsClient,
13    operation::{
14        delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15        receive_message::ReceiveMessageError,
16        send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17    },
18    types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use tracing::Instrument;
32use vector_lib::{
33    codecs::decoding::FramingError,
34    config::{LegacyKey, LogNamespace, log_schema},
35    configurable::configurable_component,
36    event::MaybeAsLogMut,
37    internal_event::{
38        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
39    },
40    lookup::{PathPrefix, metadata_path, path},
41    source_sender::SendError,
42};
43
44use crate::{
45    SourceSender,
46    aws::AwsTimeout,
47    codecs::Decoder,
48    common::backoff::ExponentialBackoff,
49    config::{SourceAcknowledgementsConfig, SourceContext},
50    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
51    internal_events::{
52        EventsReceived, S3ObjectProcessingFailed, S3ObjectProcessingSucceeded,
53        SqsMessageDeleteBatchError, SqsMessageDeletePartialError, SqsMessageDeleteSucceeded,
54        SqsMessageProcessingError, SqsMessageProcessingSucceeded, SqsMessageReceiveError,
55        SqsMessageReceiveSucceeded, SqsMessageSendBatchError, SqsMessageSentPartialError,
56        SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored, StreamClosedError,
57    },
58    line_agg::{self, LineAgg},
59    shutdown::ShutdownSignal,
60    sources::aws_s3::AwsS3Config,
61    tls::TlsConfig,
62};
63
64static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
65    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
66
67/// Configuration for deferring events based on their age.
68#[serde_as]
69#[configurable_component]
70#[derive(Clone, Debug, Default)]
71#[serde(deny_unknown_fields)]
72pub(super) struct DeferredConfig {
73    /// The URL of the queue to forward events to when they are older than `max_age_secs`.
74    #[configurable(metadata(
75        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
76    ))]
77    #[configurable(validation(format = "uri"))]
78    pub(super) queue_url: String,
79
80    /// Event must have been emitted within the last `max_age_secs` seconds to be processed.
81    ///
82    /// If the event is older, it is forwarded to the `queue_url` for later processing.
83    #[configurable(metadata(docs::type_unit = "seconds"))]
84    #[configurable(metadata(docs::examples = 3600))]
85    pub(super) max_age_secs: u64,
86}
87
88/// SQS configuration options.
89//
90// TODO: It seems awfully likely that we could re-use the existing configuration type for the `aws_sqs` source in some
91// way, given the near 100% overlap in configurable values.
92#[serde_as]
93#[configurable_component]
94#[derive(Clone, Debug, Derivative)]
95#[derivative(Default)]
96#[serde(deny_unknown_fields)]
97pub(super) struct Config {
98    /// The URL of the SQS queue to poll for bucket notifications.
99    #[configurable(metadata(
100        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
101    ))]
102    #[configurable(validation(format = "uri"))]
103    pub(super) queue_url: String,
104
105    /// How long to wait while polling the queue for new messages, in seconds.
106    ///
107    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
108    /// they are always consumed, regardless of the value of `poll_secs`.
109    // NOTE: We restrict this to u32 for safe conversion to i32 later.
110    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
111    #[serde(default = "default_poll_secs")]
112    #[derivative(Default(value = "default_poll_secs()"))]
113    #[configurable(metadata(docs::type_unit = "seconds"))]
114    pub(super) poll_secs: u32,
115
116    /// The visibility timeout to use for messages, in seconds.
117    ///
118    /// This controls how long a message is left unavailable after it is received. If a message is received, and
119    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
120    ///
121    /// This can happen if there is an issue between consuming a message and deleting it.
122    // NOTE: We restrict this to u32 for safe conversion to i32 later.
123    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
124    #[serde(default = "default_visibility_timeout_secs")]
125    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
126    #[configurable(metadata(docs::type_unit = "seconds"))]
127    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
128    pub(super) visibility_timeout_secs: u32,
129
130    /// Whether to delete the message once it is processed.
131    ///
132    /// It can be useful to set this to `false` for debugging or during the initial setup.
133    #[serde(default = "default_true")]
134    #[derivative(Default(value = "default_true()"))]
135    pub(super) delete_message: bool,
136
137    /// Whether to delete non-retryable messages.
138    ///
139    /// If a message is rejected by the sink and not retryable, it is deleted from the queue.
140    #[serde(default = "default_true")]
141    #[derivative(Default(value = "default_true()"))]
142    pub(super) delete_failed_message: bool,
143
144    /// Number of concurrent tasks to create for polling the queue for messages.
145    ///
146    /// Defaults to the number of available CPUs on the system.
147    ///
148    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
149    /// value when there is a high rate of messages being pushed into the queue and the objects
150    /// being fetched are small. In these cases, system resources may not be fully utilized without
151    /// fetching more messages per second, as the SQS message consumption rate affects the S3 object
152    /// retrieval rate.
153    #[configurable(metadata(docs::type_unit = "tasks"))]
154    #[configurable(metadata(docs::examples = 5))]
155    pub(super) client_concurrency: Option<NonZeroUsize>,
156
157    /// Maximum number of messages to poll from SQS in a batch
158    ///
159    /// Defaults to 10
160    ///
161    /// Should be set to a smaller value when the files are large to help prevent the ingestion of
162    /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
163    // NOTE: We restrict this to u32 for safe conversion to i32 later.
164    #[serde(default = "default_max_number_of_messages")]
165    #[derivative(Default(value = "default_max_number_of_messages()"))]
166    #[configurable(metadata(docs::human_name = "Max Messages"))]
167    #[configurable(metadata(docs::examples = 1))]
168    pub(super) max_number_of_messages: u32,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    #[derivative(Default)]
173    pub(super) tls_options: Option<TlsConfig>,
174
175    // Client timeout configuration for SQS operations. Take care when configuring these settings
176    // to allow enough time for the polling interval configured in `poll_secs`.
177    #[configurable(derived)]
178    #[derivative(Default)]
179    #[serde(default)]
180    #[serde(flatten)]
181    pub(super) timeout: Option<AwsTimeout>,
182
183    /// Configuration for deferring events to another queue based on their age.
184    #[configurable(derived)]
185    pub(super) deferred: Option<DeferredConfig>,
186}
187
188const fn default_poll_secs() -> u32 {
189    15
190}
191
192const fn default_visibility_timeout_secs() -> u32 {
193    300
194}
195
196const fn default_max_number_of_messages() -> u32 {
197    10
198}
199
200const fn default_true() -> bool {
201    true
202}
203
204#[derive(Debug, Snafu)]
205pub(super) enum IngestorNewError {
206    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
207    InvalidNumberOfMessages { messages: u32 },
208}
209
210#[allow(clippy::large_enum_variant)]
211#[derive(Debug, Snafu)]
212pub enum ProcessingError {
213    #[snafu(display(
214        "Could not parse SQS message with id {} as S3 notification: {}",
215        message_id,
216        source
217    ))]
218    InvalidSqsMessage {
219        source: serde_json::Error,
220        message_id: String,
221    },
222    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
223    GetObject {
224        source: SdkError<GetObjectError, HttpResponse>,
225        bucket: String,
226        key: String,
227    },
228    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
229    ReadObject {
230        source: Box<dyn FramingError>,
231        bucket: String,
232        key: String,
233    },
234    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
235    PipelineSend {
236        source: vector_lib::source_sender::SendError,
237        bucket: String,
238        key: String,
239    },
240    #[snafu(display(
241        "Object notification for s3://{}/{} is a bucket in another region: {}",
242        bucket,
243        key,
244        region
245    ))]
246    WrongRegion {
247        region: String,
248        bucket: String,
249        key: String,
250    },
251    #[snafu(display("Unsupported S3 event version: {}.", version,))]
252    UnsupportedS3EventVersion { version: semver::Version },
253    #[snafu(display(
254        "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
255        region,
256        bucket,
257        key
258    ))]
259    ErrorAcknowledgement {
260        region: String,
261        bucket: String,
262        key: String,
263    },
264    #[snafu(display(
265        "File s3://{}/{} too old.  Forwarded to deferred queue {}",
266        bucket,
267        key,
268        deferred_queue
269    ))]
270    FileTooOld {
271        bucket: String,
272        key: String,
273        deferred_queue: String,
274    },
275}
276
277pub struct State {
278    region: Region,
279
280    s3_client: S3Client,
281    sqs_client: SqsClient,
282
283    multiline: Option<line_agg::Config>,
284    compression: super::Compression,
285
286    queue_url: String,
287    poll_secs: i32,
288    max_number_of_messages: i32,
289    client_concurrency: usize,
290    visibility_timeout_secs: i32,
291    delete_message: bool,
292    delete_failed_message: bool,
293    decoder: Decoder,
294
295    deferred: Option<DeferredConfig>,
296}
297
298pub(super) struct Ingestor {
299    state: Arc<State>,
300}
301
302impl Ingestor {
303    pub(super) async fn new(
304        region: Region,
305        sqs_client: SqsClient,
306        s3_client: S3Client,
307        config: Config,
308        compression: super::Compression,
309        multiline: Option<line_agg::Config>,
310        decoder: Decoder,
311    ) -> Result<Ingestor, IngestorNewError> {
312        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
313            return Err(IngestorNewError::InvalidNumberOfMessages {
314                messages: config.max_number_of_messages,
315            });
316        }
317        let state = Arc::new(State {
318            region,
319
320            s3_client,
321            sqs_client,
322
323            compression,
324            multiline,
325
326            queue_url: config.queue_url,
327            poll_secs: config.poll_secs as i32,
328            max_number_of_messages: config.max_number_of_messages as i32,
329            client_concurrency: config
330                .client_concurrency
331                .map(|n| n.get())
332                .unwrap_or_else(crate::num_threads),
333            visibility_timeout_secs: config.visibility_timeout_secs as i32,
334            delete_message: config.delete_message,
335            delete_failed_message: config.delete_failed_message,
336            decoder,
337
338            deferred: config.deferred,
339        });
340
341        Ok(Ingestor { state })
342    }
343
344    pub(super) async fn run(
345        self,
346        cx: SourceContext,
347        acknowledgements: SourceAcknowledgementsConfig,
348        log_namespace: LogNamespace,
349    ) -> Result<(), ()> {
350        let acknowledgements = cx.do_acknowledgements(acknowledgements);
351        let mut handles = Vec::new();
352        for _ in 0..self.state.client_concurrency {
353            let process = IngestorProcess::new(
354                Arc::clone(&self.state),
355                cx.out.clone(),
356                cx.shutdown.clone(),
357                log_namespace,
358                acknowledgements,
359            );
360            let fut = process.run();
361            let handle = tokio::spawn(fut.in_current_span());
362            handles.push(handle);
363        }
364
365        // Wait for all of the processes to finish.  If any one of them panics, we resume
366        // that panic here to properly shutdown Vector.
367        for handle in handles.drain(..) {
368            if let Err(e) = handle.await
369                && e.is_panic()
370            {
371                panic::resume_unwind(e.into_panic());
372            }
373        }
374
375        Ok(())
376    }
377}
378
379pub struct IngestorProcess {
380    state: Arc<State>,
381    out: SourceSender,
382    shutdown: ShutdownSignal,
383    acknowledgements: bool,
384    log_namespace: LogNamespace,
385    bytes_received: Registered<BytesReceived>,
386    events_received: Registered<EventsReceived>,
387    backoff: ExponentialBackoff,
388}
389
390impl IngestorProcess {
391    pub fn new(
392        state: Arc<State>,
393        out: SourceSender,
394        shutdown: ShutdownSignal,
395        log_namespace: LogNamespace,
396        acknowledgements: bool,
397    ) -> Self {
398        Self {
399            state,
400            out,
401            shutdown,
402            acknowledgements,
403            log_namespace,
404            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
405            events_received: register!(EventsReceived),
406            backoff: ExponentialBackoff::default().max_delay(Duration::from_secs(30)),
407        }
408    }
409
410    async fn run(mut self) {
411        let shutdown = self.shutdown.clone().fuse();
412        pin!(shutdown);
413
414        loop {
415            select! {
416                _ = &mut shutdown => break,
417                result = self.run_once() => {
418                    match result {
419                        Ok(()) => {
420                            // Reset backoff on successful receive
421                            self.backoff.reset();
422                        }
423                        Err(_) => {
424                            let delay = self.backoff.next().expect("backoff never ends");
425                            trace!(
426                                delay_ms = delay.as_millis(),
427                                "`run_once` failed, will retry after delay.",
428                            );
429                            tokio::time::sleep(delay).await;
430                        }
431                    }
432                },
433            }
434        }
435    }
436
437    async fn run_once(&mut self) -> Result<(), ()> {
438        let messages = match self.receive_messages().await {
439            Ok(messages) => {
440                emit!(SqsMessageReceiveSucceeded {
441                    count: messages.len(),
442                });
443                messages
444            }
445            Err(err) => {
446                emit!(SqsMessageReceiveError { error: &err });
447                return Err(());
448            }
449        };
450
451        let mut delete_entries = Vec::new();
452        let mut deferred_entries = Vec::new();
453        for message in messages {
454            let receipt_handle = match message.receipt_handle {
455                None => {
456                    // I don't think this will ever actually happen, but is just an artifact of the
457                    // AWS's API predilection for returning nullable values for all response
458                    // attributes
459                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
460                    continue;
461                }
462                Some(ref handle) => handle.to_owned(),
463            };
464
465            let message_id = message
466                .message_id
467                .clone()
468                .unwrap_or_else(|| "<unknown>".to_owned());
469            match self.handle_sqs_message(message.clone()).await {
470                Ok(()) => {
471                    emit!(SqsMessageProcessingSucceeded {
472                        message_id: &message_id
473                    });
474                    if self.state.delete_message {
475                        trace!(
476                            message = "Queued SQS message for deletion.",
477                            id = message_id,
478                            receipt_handle = receipt_handle,
479                        );
480                        delete_entries.push(
481                            DeleteMessageBatchRequestEntry::builder()
482                                .id(message_id.clone())
483                                .receipt_handle(receipt_handle)
484                                .build()
485                                .expect("all required builder params specified"),
486                        );
487                    }
488                }
489                Err(err) => {
490                    match err {
491                        ProcessingError::FileTooOld { .. } => {
492                            emit!(SqsMessageProcessingSucceeded {
493                                message_id: &message_id
494                            });
495                            if let Some(deferred) = &self.state.deferred {
496                                trace!(
497                                    message = "Forwarding message to deferred queue.",
498                                    id = message_id,
499                                    receipt_handle = receipt_handle,
500                                    deferred_queue = deferred.queue_url,
501                                );
502
503                                deferred_entries.push(
504                                    SendMessageBatchRequestEntry::builder()
505                                        .id(message_id.clone())
506                                        .message_body(message.body.unwrap_or_default())
507                                        .build()
508                                        .expect("all required builder params specified"),
509                                );
510                            }
511                            //  maybe delete the message from current queue since we have processed it
512                            if self.state.delete_message {
513                                trace!(
514                                    message = "Queued SQS message for deletion.",
515                                    id = message_id,
516                                    receipt_handle = receipt_handle,
517                                );
518                                delete_entries.push(
519                                    DeleteMessageBatchRequestEntry::builder()
520                                        .id(message_id)
521                                        .receipt_handle(receipt_handle)
522                                        .build()
523                                        .expect("all required builder params specified"),
524                                );
525                            }
526                        }
527                        _ => {
528                            emit!(SqsMessageProcessingError {
529                                message_id: &message_id,
530                                error: &err,
531                            });
532                        }
533                    }
534                }
535            }
536        }
537
538        // Should consider removing failed deferrals from the delete_entries
539        if !deferred_entries.is_empty() {
540            let Some(deferred) = &self.state.deferred else {
541                warn!("Deferred queue not configured, but received deferred entries.");
542                return Ok(());
543            };
544            let cloned_entries = deferred_entries.clone();
545            match self
546                .send_messages(deferred_entries, deferred.queue_url.clone())
547                .await
548            {
549                Ok(result) => {
550                    if !result.successful.is_empty() {
551                        emit!(SqsMessageSentSucceeded {
552                            message_ids: result.successful,
553                        })
554                    }
555
556                    if !result.failed.is_empty() {
557                        emit!(SqsMessageSentPartialError {
558                            entries: result.failed
559                        })
560                    }
561                }
562                Err(err) => {
563                    emit!(SqsMessageSendBatchError {
564                        entries: cloned_entries,
565                        error: err,
566                    });
567                }
568            }
569        }
570        if !delete_entries.is_empty() {
571            // We need these for a correct error message if the batch fails overall.
572            let cloned_entries = delete_entries.clone();
573            match self.delete_messages(delete_entries).await {
574                Ok(result) => {
575                    // Batch deletes can have partial successes/failures, so we have to check
576                    // for both cases and emit accordingly.
577                    if !result.successful.is_empty() {
578                        emit!(SqsMessageDeleteSucceeded {
579                            message_ids: result.successful,
580                        });
581                    }
582
583                    if !result.failed.is_empty() {
584                        emit!(SqsMessageDeletePartialError {
585                            entries: result.failed
586                        });
587                    }
588                }
589                Err(err) => {
590                    emit!(SqsMessageDeleteBatchError {
591                        entries: cloned_entries,
592                        error: err,
593                    });
594                }
595            }
596        }
597        Ok(())
598    }
599
600    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
601        let sqs_body = message.body.unwrap_or_default();
602        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
603            .map(|notification| notification.message)
604            .unwrap_or(sqs_body);
605        let s3_event: SqsEvent =
606            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
607                message_id: message
608                    .message_id
609                    .clone()
610                    .unwrap_or_else(|| "<empty>".to_owned()),
611            })?;
612
613        match s3_event {
614            SqsEvent::TestEvent(_s3_test_event) => {
615                debug!(?message.message_id, message = "Found S3 Test Event.");
616                Ok(())
617            }
618            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
619        }
620    }
621
622    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
623        for record in s3_event.records {
624            self.handle_s3_event_record(record, self.log_namespace)
625                .await?
626        }
627        Ok(())
628    }
629
630    async fn handle_s3_event_record(
631        &mut self,
632        s3_event: S3EventRecord,
633        log_namespace: LogNamespace,
634    ) -> Result<(), ProcessingError> {
635        let event_version: semver::Version = s3_event.event_version.clone().into();
636        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
637            return Err(ProcessingError::UnsupportedS3EventVersion {
638                version: event_version.clone(),
639            });
640        }
641
642        if s3_event.event_name.kind != "ObjectCreated" {
643            emit!(SqsS3EventRecordInvalidEventIgnored {
644                bucket: &s3_event.s3.bucket.name,
645                key: &s3_event.s3.object.key,
646                kind: &s3_event.event_name.kind,
647                name: &s3_event.event_name.name,
648            });
649            return Ok(());
650        }
651
652        // S3 has to send notifications to a queue in the same region so I don't think this will
653        // actually ever be hit unless messages are being forwarded from one queue to another
654        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
655            return Err(ProcessingError::WrongRegion {
656                bucket: s3_event.s3.bucket.name.clone(),
657                key: s3_event.s3.object.key.clone(),
658                region: s3_event.aws_region,
659            });
660        }
661
662        if let Some(deferred) = &self.state.deferred {
663            let delta = Utc::now() - s3_event.event_time;
664            if delta.num_seconds() > deferred.max_age_secs as i64 {
665                return Err(ProcessingError::FileTooOld {
666                    bucket: s3_event.s3.bucket.name.clone(),
667                    key: s3_event.s3.object.key.clone(),
668                    deferred_queue: deferred.queue_url.clone(),
669                });
670            }
671        }
672
673        let download_start = Instant::now();
674
675        let object_result = self
676            .state
677            .s3_client
678            .get_object()
679            .bucket(s3_event.s3.bucket.name.clone())
680            .key(s3_event.s3.object.key.clone())
681            .send()
682            .await
683            .context(GetObjectSnafu {
684                bucket: s3_event.s3.bucket.name.clone(),
685                key: s3_event.s3.object.key.clone(),
686            });
687
688        let object = object_result?;
689
690        debug!(
691            message = "Got S3 object from SQS notification.",
692            bucket = s3_event.s3.bucket.name,
693            key = s3_event.s3.object.key,
694        );
695
696        let metadata = object.metadata;
697
698        let timestamp = object.last_modified.map(|ts| {
699            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
700                .single()
701                .expect("invalid timestamp")
702        });
703
704        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
705        let object_reader = super::s3_object_decoder(
706            self.state.compression,
707            &s3_event.s3.object.key,
708            object.content_encoding.as_deref(),
709            object.content_type.as_deref(),
710            object.body,
711        )
712        .await;
713
714        // Record the read error seen to propagate up later so we avoid ack'ing the SQS
715        // message
716        //
717        // String is used as we cannot clone std::io::Error to take ownership in closure
718        //
719        // FramedRead likely stops when it gets an i/o error but I found it more clear to
720        // show that we `take_while` there hasn't been an error
721        //
722        // This can result in objects being partially processed before an error, but we
723        // prefer duplicate lines over message loss. Future work could include recording
724        // the offset of the object that has been read, but this would only be relevant in
725        // the case that the same vector instance processes the same message.
726        let mut read_error = None;
727        let bytes_received = self.bytes_received.clone();
728        let events_received = self.events_received.clone();
729        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
730            FramedRead::new(object_reader, self.state.decoder.framer.clone())
731                .map(|res| {
732                    res.inspect(|bytes| {
733                        bytes_received.emit(ByteSize(bytes.len()));
734                    })
735                    .map_err(|err| {
736                        read_error = Some(err);
737                    })
738                    .ok()
739                })
740                .take_while(|res| ready(res.is_some()))
741                .map(|r| r.expect("validated by take_while")),
742        );
743
744        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
745            Some(config) => Box::new(
746                LineAgg::new(
747                    lines.map(|line| ((), line, ())),
748                    line_agg::Logic::new(config.clone()),
749                )
750                .map(|(_src, line, _context, _lastline_context)| line),
751            ),
752            None => lines,
753        };
754
755        let mut stream = lines.flat_map(|line| {
756            let events = match self.state.decoder.deserializer_parse(line) {
757                Ok((events, _events_size)) => events,
758                Err(_error) => {
759                    // Error is handled by `codecs::Decoder`, no further handling
760                    // is needed here.
761                    SmallVec::new()
762                }
763            };
764
765            let events = events
766                .into_iter()
767                .map(|mut event: Event| {
768                    event = event.with_batch_notifier_option(&batch);
769                    if let Some(log_event) = event.maybe_as_log_mut() {
770                        handle_single_log(
771                            log_event,
772                            log_namespace,
773                            &s3_event,
774                            &metadata,
775                            timestamp,
776                        );
777                    }
778                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
779                    event
780                })
781                .collect::<Vec<Event>>();
782            futures::stream::iter(events)
783        });
784
785        let send_error = match self.out.send_event_stream(&mut stream).await {
786            Ok(_) => None,
787            Err(SendError::Closed) => {
788                let (count, _) = stream.size_hint();
789                emit!(StreamClosedError { count });
790                Some(SendError::Closed)
791            }
792            Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
793        };
794
795        // Up above, `lines` captures `read_error`, and eventually is captured by `stream`,
796        // so we explicitly drop it so that we can again utilize `read_error` below.
797        drop(stream);
798
799        let bucket = &s3_event.s3.bucket.name;
800        let duration = download_start.elapsed();
801
802        if read_error.is_some() {
803            emit!(S3ObjectProcessingFailed { bucket, duration });
804        } else {
805            emit!(S3ObjectProcessingSucceeded { bucket, duration });
806        }
807
808        // The BatchNotifier is cloned for each LogEvent in the batch stream, but the last
809        // reference must be dropped before the status of the batch is sent to the channel.
810        drop(batch);
811
812        if let Some(error) = read_error {
813            Err(ProcessingError::ReadObject {
814                source: error,
815                bucket: s3_event.s3.bucket.name.clone(),
816                key: s3_event.s3.object.key.clone(),
817            })
818        } else if let Some(error) = send_error {
819            Err(ProcessingError::PipelineSend {
820                source: error,
821                bucket: s3_event.s3.bucket.name.clone(),
822                key: s3_event.s3.object.key.clone(),
823            })
824        } else {
825            match receiver {
826                None => Ok(()),
827                Some(receiver) => {
828                    let result = receiver.await;
829                    match result {
830                        BatchStatus::Delivered => {
831                            debug!(
832                                message = "S3 object from SQS delivered.",
833                                bucket = s3_event.s3.bucket.name,
834                                key = s3_event.s3.object.key,
835                            );
836                            Ok(())
837                        }
838                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
839                            bucket: s3_event.s3.bucket.name,
840                            key: s3_event.s3.object.key,
841                            region: s3_event.aws_region,
842                        }),
843                        BatchStatus::Rejected => {
844                            if self.state.delete_failed_message {
845                                warn!(
846                                    message =
847                                        "S3 object from SQS was rejected. Deleting failed message.",
848                                    bucket = s3_event.s3.bucket.name,
849                                    key = s3_event.s3.object.key,
850                                );
851                                Ok(())
852                            } else {
853                                Err(ProcessingError::ErrorAcknowledgement {
854                                    bucket: s3_event.s3.bucket.name,
855                                    key: s3_event.s3.object.key,
856                                    region: s3_event.aws_region,
857                                })
858                            }
859                        }
860                    }
861                }
862            }
863        }
864    }
865
866    async fn receive_messages(
867        &mut self,
868    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
869        self.state
870            .sqs_client
871            .receive_message()
872            .queue_url(self.state.queue_url.clone())
873            .max_number_of_messages(self.state.max_number_of_messages)
874            .visibility_timeout(self.state.visibility_timeout_secs)
875            .wait_time_seconds(self.state.poll_secs)
876            .send()
877            .map_ok(|res| res.messages.unwrap_or_default())
878            .await
879    }
880
881    async fn delete_messages(
882        &mut self,
883        entries: Vec<DeleteMessageBatchRequestEntry>,
884    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
885        self.state
886            .sqs_client
887            .delete_message_batch()
888            .queue_url(self.state.queue_url.clone())
889            .set_entries(Some(entries))
890            .send()
891            .await
892    }
893
894    async fn send_messages(
895        &mut self,
896        entries: Vec<SendMessageBatchRequestEntry>,
897        queue_url: String,
898    ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
899        self.state
900            .sqs_client
901            .send_message_batch()
902            .queue_url(queue_url.clone())
903            .set_entries(Some(entries))
904            .send()
905            .await
906    }
907}
908
909fn handle_single_log(
910    log: &mut LogEvent,
911    log_namespace: LogNamespace,
912    s3_event: &S3EventRecord,
913    metadata: &Option<HashMap<String, String>>,
914    timestamp: Option<DateTime<Utc>>,
915) {
916    log_namespace.insert_source_metadata(
917        AwsS3Config::NAME,
918        log,
919        Some(LegacyKey::Overwrite(path!("bucket"))),
920        path!("bucket"),
921        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
922    );
923
924    log_namespace.insert_source_metadata(
925        AwsS3Config::NAME,
926        log,
927        Some(LegacyKey::Overwrite(path!("object"))),
928        path!("object"),
929        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
930    );
931    log_namespace.insert_source_metadata(
932        AwsS3Config::NAME,
933        log,
934        Some(LegacyKey::Overwrite(path!("region"))),
935        path!("region"),
936        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
937    );
938
939    if let Some(metadata) = metadata {
940        for (key, value) in metadata {
941            log_namespace.insert_source_metadata(
942                AwsS3Config::NAME,
943                log,
944                Some(LegacyKey::Overwrite(path!(key))),
945                path!("metadata", key.as_str()),
946                value.clone(),
947            );
948        }
949    }
950
951    log_namespace.insert_vector_metadata(
952        log,
953        log_schema().source_type_key(),
954        path!("source_type"),
955        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
956    );
957
958    // This handles the transition from the original timestamp logic. Originally the
959    // `timestamp_key` was populated by the `last_modified` time on the object, falling
960    // back to calling `now()`.
961    match log_namespace {
962        LogNamespace::Vector => {
963            if let Some(timestamp) = timestamp {
964                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
965            }
966
967            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
968        }
969        LogNamespace::Legacy => {
970            if let Some(timestamp_key) = log_schema().timestamp_key() {
971                log.try_insert(
972                    (PathPrefix::Event, timestamp_key),
973                    timestamp.unwrap_or_else(Utc::now),
974                );
975            }
976        }
977    };
978}
979
980// https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
981#[derive(Clone, Debug, Deserialize)]
982#[serde(rename_all = "PascalCase")]
983pub struct SnsNotification {
984    pub message: String,
985    pub timestamp: DateTime<Utc>,
986}
987
988// https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html
989#[derive(Clone, Debug, Deserialize)]
990#[serde(untagged)]
991enum SqsEvent {
992    Event(S3Event),
993    TestEvent(S3TestEvent),
994}
995
996#[derive(Clone, Debug, Deserialize)]
997#[serde(rename_all = "PascalCase")]
998pub struct S3TestEvent {
999    pub service: String,
1000    pub event: S3EventName,
1001    pub bucket: String,
1002}
1003
1004// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1005#[derive(Clone, Debug, Deserialize, Serialize)]
1006#[serde(rename_all = "PascalCase")]
1007pub struct S3Event {
1008    pub records: Vec<S3EventRecord>,
1009}
1010
1011#[derive(Clone, Debug, Deserialize, Serialize)]
1012#[serde(rename_all = "camelCase")]
1013pub struct S3EventRecord {
1014    pub event_version: S3EventVersion,
1015    pub event_source: String,
1016    pub aws_region: String,
1017    pub event_name: S3EventName,
1018    pub event_time: DateTime<Utc>,
1019
1020    pub s3: S3Message,
1021}
1022
1023#[derive(Clone, Debug)]
1024pub struct S3EventVersion {
1025    pub major: u64,
1026    pub minor: u64,
1027}
1028
1029impl From<S3EventVersion> for semver::Version {
1030    fn from(v: S3EventVersion) -> semver::Version {
1031        semver::Version::new(v.major, v.minor, 0)
1032    }
1033}
1034
1035// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1036// <major>.<minor>
1037impl<'de> Deserialize<'de> for S3EventVersion {
1038    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1039    where
1040        D: Deserializer<'de>,
1041    {
1042        use serde::de::Error;
1043
1044        let s = String::deserialize(deserializer)?;
1045
1046        let mut parts = s.splitn(2, '.');
1047
1048        let major = parts
1049            .next()
1050            .ok_or_else(|| D::Error::custom("Missing major version number"))?
1051            .parse::<u64>()
1052            .map_err(D::Error::custom)?;
1053
1054        let minor = parts
1055            .next()
1056            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1057            .parse::<u64>()
1058            .map_err(D::Error::custom)?;
1059
1060        Ok(S3EventVersion { major, minor })
1061    }
1062}
1063
1064impl Serialize for S3EventVersion {
1065    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1066    where
1067        S: Serializer,
1068    {
1069        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1070    }
1071}
1072
1073#[derive(Clone, Debug)]
1074pub struct S3EventName {
1075    pub kind: String,
1076    pub name: String,
1077}
1078
1079// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
1080//
1081// we could use enums here, but that seems overly brittle as deserialization would break if they
1082// add new event types or names
1083impl<'de> Deserialize<'de> for S3EventName {
1084    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1085    where
1086        D: Deserializer<'de>,
1087    {
1088        use serde::de::Error;
1089
1090        let s = String::deserialize(deserializer)?;
1091
1092        let mut parts = s.splitn(2, ':');
1093
1094        let kind = parts
1095            .next()
1096            .ok_or_else(|| D::Error::custom("Missing event kind"))?
1097            .parse::<String>()
1098            .map_err(D::Error::custom)?;
1099
1100        let name = parts
1101            .next()
1102            .ok_or_else(|| D::Error::custom("Missing event name"))?
1103            .parse::<String>()
1104            .map_err(D::Error::custom)?;
1105
1106        Ok(S3EventName { kind, name })
1107    }
1108}
1109
1110impl Serialize for S3EventName {
1111    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1112    where
1113        S: Serializer,
1114    {
1115        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1116    }
1117}
1118
1119#[derive(Clone, Debug, Deserialize, Serialize)]
1120#[serde(rename_all = "camelCase")]
1121pub struct S3Message {
1122    pub bucket: S3Bucket,
1123    pub object: S3Object,
1124}
1125
1126#[derive(Clone, Debug, Deserialize, Serialize)]
1127#[serde(rename_all = "camelCase")]
1128pub struct S3Bucket {
1129    pub name: String,
1130}
1131
1132#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1133#[serde(rename_all = "camelCase")]
1134pub struct S3Object {
1135    // S3ObjectKeys are URL encoded
1136    // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
1137    #[serde(with = "urlencoded_string")]
1138    pub key: String,
1139}
1140
1141mod urlencoded_string {
1142    use percent_encoding::{percent_decode, utf8_percent_encode};
1143
1144    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1145    where
1146        D: serde::de::Deserializer<'de>,
1147    {
1148        use serde::de::Error;
1149
1150        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1151            let decoded = if s.contains(&b'+') {
1152                // AWS encodes spaces as `+` rather than `%20`, so we first need to handle this.
1153                let s = s
1154                    .iter()
1155                    .map(|c| if *c == b'+' { b' ' } else { *c })
1156                    .collect::<Vec<_>>();
1157                percent_decode(&s).decode_utf8().map(Into::into)
1158            } else {
1159                percent_decode(s).decode_utf8().map(Into::into)
1160            };
1161
1162            decoded
1163                .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1164        })
1165    }
1166
1167    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1168    where
1169        S: serde::ser::Serializer,
1170    {
1171        serializer.serialize_str(
1172            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1173        )
1174    }
1175}
1176
1177#[test]
1178fn test_key_deserialize() {
1179    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1180    assert_eq!(
1181        S3Object {
1182            key: "noog nork".to_string(),
1183        },
1184        value
1185    );
1186
1187    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1188    assert_eq!(
1189        S3Object {
1190            key: "noog+nork".to_string(),
1191        },
1192        value
1193    );
1194}
1195
1196#[test]
1197fn test_s3_testevent() {
1198    let value: S3TestEvent = serde_json::from_str(
1199        r#"{
1200        "Service":"Amazon S3",
1201        "Event":"s3:TestEvent",
1202        "Time":"2014-10-13T15:57:02.089Z",
1203        "Bucket":"bucketname",
1204        "RequestId":"5582815E1AEA5ADF",
1205        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1206     }"#,
1207    )
1208    .unwrap();
1209
1210    assert_eq!(value.service, "Amazon S3".to_string());
1211    assert_eq!(value.bucket, "bucketname".to_string());
1212    assert_eq!(value.event.kind, "s3".to_string());
1213    assert_eq!(value.event.name, "TestEvent".to_string());
1214}
1215
1216#[test]
1217fn test_s3_sns_testevent() {
1218    let sns_value: SnsNotification = serde_json::from_str(
1219        r#"{
1220        "Type" : "Notification",
1221        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1222        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1223        "Subject" : "Testing publish to subscribed queues",
1224        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1225        "Timestamp" : "2012-03-29T05:12:16.901Z",
1226        "SignatureVersion" : "1",
1227        "Signature" : "EXAMPLEnTrFPa3...",
1228        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1229        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1230     }"#,
1231    ).unwrap();
1232
1233    assert_eq!(
1234        sns_value.timestamp,
1235        DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1236            .unwrap()
1237            .to_utc()
1238    );
1239
1240    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1241
1242    assert_eq!(value.service, "Amazon S3".to_string());
1243    assert_eq!(value.bucket, "bucketname".to_string());
1244    assert_eq!(value.event.kind, "s3".to_string());
1245    assert_eq!(value.event.name, "TestEvent".to_string());
1246}
1247
1248#[test]
1249fn parse_sqs_config() {
1250    let config: Config = toml::from_str(
1251        r#"
1252            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1253        "#,
1254    )
1255    .unwrap();
1256    assert_eq!(
1257        config.queue_url,
1258        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1259    );
1260    assert!(config.deferred.is_none());
1261
1262    let config: Config = toml::from_str(
1263        r#"
1264            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1265            [deferred]
1266            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1267            max_age_secs = 3600
1268        "#,
1269    )
1270    .unwrap();
1271    assert_eq!(
1272        config.queue_url,
1273        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1274    );
1275    let Some(deferred) = config.deferred else {
1276        panic!("Expected deferred config");
1277    };
1278    assert_eq!(
1279        deferred.queue_url,
1280        "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1281    );
1282    assert_eq!(deferred.max_age_secs, 3600);
1283
1284    let test: Result<Config, toml::de::Error> = toml::from_str(
1285        r#"
1286            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1287            [deferred]
1288            max_age_secs = 3600
1289        "#,
1290    );
1291    assert!(test.is_err());
1292
1293    let test: Result<Config, toml::de::Error> = toml::from_str(
1294        r#"
1295            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1296            [deferred]
1297            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1298        "#,
1299    );
1300    assert!(test.is_err());
1301}