vector/sources/aws_s3/
sqs.rs

1use std::{
2    collections::HashMap,
3    future::ready,
4    num::NonZeroUsize,
5    panic,
6    sync::{Arc, LazyLock},
7};
8
9use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
10use aws_sdk_sqs::{
11    Client as SqsClient,
12    operation::{
13        delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
14        receive_message::ReceiveMessageError,
15        send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
16    },
17    types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
18};
19use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
20use aws_types::region::Region;
21use bytes::Bytes;
22use chrono::{DateTime, TimeZone, Utc};
23use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use serde_with::serde_as;
26use smallvec::SmallVec;
27use snafu::{ResultExt, Snafu};
28use tokio::{pin, select};
29use tokio_util::codec::FramedRead;
30use tracing::Instrument;
31use vector_lib::{
32    codecs::decoding::FramingError,
33    config::{LegacyKey, LogNamespace, log_schema},
34    configurable::configurable_component,
35    event::MaybeAsLogMut,
36    internal_event::{
37        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
38    },
39    lookup::{PathPrefix, metadata_path, path},
40};
41
42use crate::{
43    SourceSender,
44    aws::AwsTimeout,
45    codecs::Decoder,
46    config::{SourceAcknowledgementsConfig, SourceContext},
47    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
48    internal_events::{
49        EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
50        SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
51        SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsMessageSendBatchError,
52        SqsMessageSentPartialError, SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored,
53        StreamClosedError,
54    },
55    line_agg::{self, LineAgg},
56    shutdown::ShutdownSignal,
57    sources::aws_s3::AwsS3Config,
58    tls::TlsConfig,
59};
60
61static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
62    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
63
64/// Configuration for deferring events based on their age.
65#[serde_as]
66#[configurable_component]
67#[derive(Clone, Debug, Default)]
68#[serde(deny_unknown_fields)]
69pub(super) struct DeferredConfig {
70    /// The URL of the queue to forward events to when they are older than `max_age_secs`.
71    #[configurable(metadata(
72        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
73    ))]
74    #[configurable(validation(format = "uri"))]
75    pub(super) queue_url: String,
76
77    /// Event must have been emitted within the last `max_age_secs` seconds to be processed.
78    ///
79    /// If the event is older, it is forwarded to the `queue_url` for later processing.
80    #[configurable(metadata(docs::type_unit = "seconds"))]
81    #[configurable(metadata(docs::examples = 3600))]
82    pub(super) max_age_secs: u64,
83}
84
85/// SQS configuration options.
86//
87// TODO: It seems awfully likely that we could re-use the existing configuration type for the `aws_sqs` source in some
88// way, given the near 100% overlap in configurable values.
89#[serde_as]
90#[configurable_component]
91#[derive(Clone, Debug, Derivative)]
92#[derivative(Default)]
93#[serde(deny_unknown_fields)]
94pub(super) struct Config {
95    /// The URL of the SQS queue to poll for bucket notifications.
96    #[configurable(metadata(
97        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
98    ))]
99    #[configurable(validation(format = "uri"))]
100    pub(super) queue_url: String,
101
102    /// How long to wait while polling the queue for new messages, in seconds.
103    ///
104    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
105    /// they are always consumed, regardless of the value of `poll_secs`.
106    // NOTE: We restrict this to u32 for safe conversion to i32 later.
107    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
108    #[serde(default = "default_poll_secs")]
109    #[derivative(Default(value = "default_poll_secs()"))]
110    #[configurable(metadata(docs::type_unit = "seconds"))]
111    pub(super) poll_secs: u32,
112
113    /// The visibility timeout to use for messages, in seconds.
114    ///
115    /// This controls how long a message is left unavailable after it is received. If a message is received, and
116    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
117    ///
118    /// This can happen if there is an issue between consuming a message and deleting it.
119    // NOTE: We restrict this to u32 for safe conversion to i32 later.
120    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
121    #[serde(default = "default_visibility_timeout_secs")]
122    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
123    #[configurable(metadata(docs::type_unit = "seconds"))]
124    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
125    pub(super) visibility_timeout_secs: u32,
126
127    /// Whether to delete the message once it is processed.
128    ///
129    /// It can be useful to set this to `false` for debugging or during the initial setup.
130    #[serde(default = "default_true")]
131    #[derivative(Default(value = "default_true()"))]
132    pub(super) delete_message: bool,
133
134    /// Whether to delete non-retryable messages.
135    ///
136    /// If a message is rejected by the sink and not retryable, it is deleted from the queue.
137    #[serde(default = "default_true")]
138    #[derivative(Default(value = "default_true()"))]
139    pub(super) delete_failed_message: bool,
140
141    /// Number of concurrent tasks to create for polling the queue for messages.
142    ///
143    /// Defaults to the number of available CPUs on the system.
144    ///
145    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
146    /// value when there is a high rate of messages being pushed into the queue and the objects
147    /// being fetched are small. In these cases, system resources may not be fully utilized without
148    /// fetching more messages per second, as the SQS message consumption rate affects the S3 object
149    /// retrieval rate.
150    #[configurable(metadata(docs::type_unit = "tasks"))]
151    #[configurable(metadata(docs::examples = 5))]
152    pub(super) client_concurrency: Option<NonZeroUsize>,
153
154    /// Maximum number of messages to poll from SQS in a batch
155    ///
156    /// Defaults to 10
157    ///
158    /// Should be set to a smaller value when the files are large to help prevent the ingestion of
159    /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
160    // NOTE: We restrict this to u32 for safe conversion to i32 later.
161    #[serde(default = "default_max_number_of_messages")]
162    #[derivative(Default(value = "default_max_number_of_messages()"))]
163    #[configurable(metadata(docs::human_name = "Max Messages"))]
164    #[configurable(metadata(docs::examples = 1))]
165    pub(super) max_number_of_messages: u32,
166
167    #[configurable(derived)]
168    #[serde(default)]
169    #[derivative(Default)]
170    pub(super) tls_options: Option<TlsConfig>,
171
172    // Client timeout configuration for SQS operations. Take care when configuring these settings
173    // to allow enough time for the polling interval configured in `poll_secs`.
174    #[configurable(derived)]
175    #[derivative(Default)]
176    #[serde(default)]
177    #[serde(flatten)]
178    pub(super) timeout: Option<AwsTimeout>,
179
180    /// Configuration for deferring events to another queue based on their age.
181    #[configurable(derived)]
182    pub(super) deferred: Option<DeferredConfig>,
183}
184
185const fn default_poll_secs() -> u32 {
186    15
187}
188
189const fn default_visibility_timeout_secs() -> u32 {
190    300
191}
192
193const fn default_max_number_of_messages() -> u32 {
194    10
195}
196
197const fn default_true() -> bool {
198    true
199}
200
201#[derive(Debug, Snafu)]
202pub(super) enum IngestorNewError {
203    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
204    InvalidNumberOfMessages { messages: u32 },
205}
206
207#[allow(clippy::large_enum_variant)]
208#[derive(Debug, Snafu)]
209pub enum ProcessingError {
210    #[snafu(display(
211        "Could not parse SQS message with id {} as S3 notification: {}",
212        message_id,
213        source
214    ))]
215    InvalidSqsMessage {
216        source: serde_json::Error,
217        message_id: String,
218    },
219    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
220    GetObject {
221        source: SdkError<GetObjectError, HttpResponse>,
222        bucket: String,
223        key: String,
224    },
225    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
226    ReadObject {
227        source: Box<dyn FramingError>,
228        bucket: String,
229        key: String,
230    },
231    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
232    PipelineSend {
233        source: crate::source_sender::ClosedError,
234        bucket: String,
235        key: String,
236    },
237    #[snafu(display(
238        "Object notification for s3://{}/{} is a bucket in another region: {}",
239        bucket,
240        key,
241        region
242    ))]
243    WrongRegion {
244        region: String,
245        bucket: String,
246        key: String,
247    },
248    #[snafu(display("Unsupported S3 event version: {}.", version,))]
249    UnsupportedS3EventVersion { version: semver::Version },
250    #[snafu(display(
251        "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
252        region,
253        bucket,
254        key
255    ))]
256    ErrorAcknowledgement {
257        region: String,
258        bucket: String,
259        key: String,
260    },
261    #[snafu(display(
262        "File s3://{}/{} too old.  Forwarded to deferred queue {}",
263        bucket,
264        key,
265        deferred_queue
266    ))]
267    FileTooOld {
268        bucket: String,
269        key: String,
270        deferred_queue: String,
271    },
272}
273
274pub struct State {
275    region: Region,
276
277    s3_client: S3Client,
278    sqs_client: SqsClient,
279
280    multiline: Option<line_agg::Config>,
281    compression: super::Compression,
282
283    queue_url: String,
284    poll_secs: i32,
285    max_number_of_messages: i32,
286    client_concurrency: usize,
287    visibility_timeout_secs: i32,
288    delete_message: bool,
289    delete_failed_message: bool,
290    decoder: Decoder,
291
292    deferred: Option<DeferredConfig>,
293}
294
295pub(super) struct Ingestor {
296    state: Arc<State>,
297}
298
299impl Ingestor {
300    pub(super) async fn new(
301        region: Region,
302        sqs_client: SqsClient,
303        s3_client: S3Client,
304        config: Config,
305        compression: super::Compression,
306        multiline: Option<line_agg::Config>,
307        decoder: Decoder,
308    ) -> Result<Ingestor, IngestorNewError> {
309        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
310            return Err(IngestorNewError::InvalidNumberOfMessages {
311                messages: config.max_number_of_messages,
312            });
313        }
314        let state = Arc::new(State {
315            region,
316
317            s3_client,
318            sqs_client,
319
320            compression,
321            multiline,
322
323            queue_url: config.queue_url,
324            poll_secs: config.poll_secs as i32,
325            max_number_of_messages: config.max_number_of_messages as i32,
326            client_concurrency: config
327                .client_concurrency
328                .map(|n| n.get())
329                .unwrap_or_else(crate::num_threads),
330            visibility_timeout_secs: config.visibility_timeout_secs as i32,
331            delete_message: config.delete_message,
332            delete_failed_message: config.delete_failed_message,
333            decoder,
334
335            deferred: config.deferred,
336        });
337
338        Ok(Ingestor { state })
339    }
340
341    pub(super) async fn run(
342        self,
343        cx: SourceContext,
344        acknowledgements: SourceAcknowledgementsConfig,
345        log_namespace: LogNamespace,
346    ) -> Result<(), ()> {
347        let acknowledgements = cx.do_acknowledgements(acknowledgements);
348        let mut handles = Vec::new();
349        for _ in 0..self.state.client_concurrency {
350            let process = IngestorProcess::new(
351                Arc::clone(&self.state),
352                cx.out.clone(),
353                cx.shutdown.clone(),
354                log_namespace,
355                acknowledgements,
356            );
357            let fut = process.run();
358            let handle = tokio::spawn(fut.in_current_span());
359            handles.push(handle);
360        }
361
362        // Wait for all of the processes to finish.  If any one of them panics, we resume
363        // that panic here to properly shutdown Vector.
364        for handle in handles.drain(..) {
365            if let Err(e) = handle.await
366                && e.is_panic()
367            {
368                panic::resume_unwind(e.into_panic());
369            }
370        }
371
372        Ok(())
373    }
374}
375
376pub struct IngestorProcess {
377    state: Arc<State>,
378    out: SourceSender,
379    shutdown: ShutdownSignal,
380    acknowledgements: bool,
381    log_namespace: LogNamespace,
382    bytes_received: Registered<BytesReceived>,
383    events_received: Registered<EventsReceived>,
384}
385
386impl IngestorProcess {
387    pub fn new(
388        state: Arc<State>,
389        out: SourceSender,
390        shutdown: ShutdownSignal,
391        log_namespace: LogNamespace,
392        acknowledgements: bool,
393    ) -> Self {
394        Self {
395            state,
396            out,
397            shutdown,
398            acknowledgements,
399            log_namespace,
400            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
401            events_received: register!(EventsReceived),
402        }
403    }
404
405    async fn run(mut self) {
406        let shutdown = self.shutdown.clone().fuse();
407        pin!(shutdown);
408
409        loop {
410            select! {
411                _ = &mut shutdown => break,
412                _ = self.run_once() => {},
413            }
414        }
415    }
416
417    async fn run_once(&mut self) {
418        let messages = self.receive_messages().await;
419        let messages = messages
420            .inspect(|messages| {
421                emit!(SqsMessageReceiveSucceeded {
422                    count: messages.len(),
423                });
424            })
425            .inspect_err(|err| {
426                emit!(SqsMessageReceiveError { error: err });
427            })
428            .unwrap_or_default();
429
430        let mut delete_entries = Vec::new();
431        let mut deferred_entries = Vec::new();
432        for message in messages {
433            let receipt_handle = match message.receipt_handle {
434                None => {
435                    // I don't think this will ever actually happen, but is just an artifact of the
436                    // AWS's API predilection for returning nullable values for all response
437                    // attributes
438                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
439                    continue;
440                }
441                Some(ref handle) => handle.to_owned(),
442            };
443
444            let message_id = message
445                .message_id
446                .clone()
447                .unwrap_or_else(|| "<unknown>".to_owned());
448            match self.handle_sqs_message(message.clone()).await {
449                Ok(()) => {
450                    emit!(SqsMessageProcessingSucceeded {
451                        message_id: &message_id
452                    });
453                    if self.state.delete_message {
454                        trace!(
455                            message = "Queued SQS message for deletion.",
456                            id = message_id,
457                            receipt_handle = receipt_handle,
458                        );
459                        delete_entries.push(
460                            DeleteMessageBatchRequestEntry::builder()
461                                .id(message_id.clone())
462                                .receipt_handle(receipt_handle)
463                                .build()
464                                .expect("all required builder params specified"),
465                        );
466                    }
467                }
468                Err(err) => {
469                    match err {
470                        ProcessingError::FileTooOld { .. } => {
471                            emit!(SqsMessageProcessingSucceeded {
472                                message_id: &message_id
473                            });
474                            if let Some(deferred) = &self.state.deferred {
475                                trace!(
476                                    message = "Forwarding message to deferred queue.",
477                                    id = message_id,
478                                    receipt_handle = receipt_handle,
479                                    deferred_queue = deferred.queue_url,
480                                );
481
482                                deferred_entries.push(
483                                    SendMessageBatchRequestEntry::builder()
484                                        .id(message_id.clone())
485                                        .message_body(message.body.unwrap_or_default())
486                                        .build()
487                                        .expect("all required builder params specified"),
488                                );
489                            }
490                            //  maybe delete the message from current queue since we have processed it
491                            if self.state.delete_message {
492                                trace!(
493                                    message = "Queued SQS message for deletion.",
494                                    id = message_id,
495                                    receipt_handle = receipt_handle,
496                                );
497                                delete_entries.push(
498                                    DeleteMessageBatchRequestEntry::builder()
499                                        .id(message_id)
500                                        .receipt_handle(receipt_handle)
501                                        .build()
502                                        .expect("all required builder params specified"),
503                                );
504                            }
505                        }
506                        _ => {
507                            emit!(SqsMessageProcessingError {
508                                message_id: &message_id,
509                                error: &err,
510                            });
511                        }
512                    }
513                }
514            }
515        }
516
517        // Should consider removing failed deferrals from the delete_entries
518        if !deferred_entries.is_empty() {
519            let Some(deferred) = &self.state.deferred else {
520                warn!(
521                    message = "Deferred queue not configured, but received deferred entries.",
522                    internal_log_rate_limit = true
523                );
524                return;
525            };
526            let cloned_entries = deferred_entries.clone();
527            match self
528                .send_messages(deferred_entries, deferred.queue_url.clone())
529                .await
530            {
531                Ok(result) => {
532                    if !result.successful.is_empty() {
533                        emit!(SqsMessageSentSucceeded {
534                            message_ids: result.successful,
535                        })
536                    }
537
538                    if !result.failed.is_empty() {
539                        emit!(SqsMessageSentPartialError {
540                            entries: result.failed
541                        })
542                    }
543                }
544                Err(err) => {
545                    emit!(SqsMessageSendBatchError {
546                        entries: cloned_entries,
547                        error: err,
548                    });
549                }
550            }
551        }
552        if !delete_entries.is_empty() {
553            // We need these for a correct error message if the batch fails overall.
554            let cloned_entries = delete_entries.clone();
555            match self.delete_messages(delete_entries).await {
556                Ok(result) => {
557                    // Batch deletes can have partial successes/failures, so we have to check
558                    // for both cases and emit accordingly.
559                    if !result.successful.is_empty() {
560                        emit!(SqsMessageDeleteSucceeded {
561                            message_ids: result.successful,
562                        });
563                    }
564
565                    if !result.failed.is_empty() {
566                        emit!(SqsMessageDeletePartialError {
567                            entries: result.failed
568                        });
569                    }
570                }
571                Err(err) => {
572                    emit!(SqsMessageDeleteBatchError {
573                        entries: cloned_entries,
574                        error: err,
575                    });
576                }
577            }
578        }
579    }
580
581    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
582        let sqs_body = message.body.unwrap_or_default();
583        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
584            .map(|notification| notification.message)
585            .unwrap_or(sqs_body);
586        let s3_event: SqsEvent =
587            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
588                message_id: message
589                    .message_id
590                    .clone()
591                    .unwrap_or_else(|| "<empty>".to_owned()),
592            })?;
593
594        match s3_event {
595            SqsEvent::TestEvent(_s3_test_event) => {
596                debug!(?message.message_id, message = "Found S3 Test Event.");
597                Ok(())
598            }
599            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
600        }
601    }
602
603    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
604        for record in s3_event.records {
605            self.handle_s3_event_record(record, self.log_namespace)
606                .await?
607        }
608        Ok(())
609    }
610
611    async fn handle_s3_event_record(
612        &mut self,
613        s3_event: S3EventRecord,
614        log_namespace: LogNamespace,
615    ) -> Result<(), ProcessingError> {
616        let event_version: semver::Version = s3_event.event_version.clone().into();
617        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
618            return Err(ProcessingError::UnsupportedS3EventVersion {
619                version: event_version.clone(),
620            });
621        }
622
623        if s3_event.event_name.kind != "ObjectCreated" {
624            emit!(SqsS3EventRecordInvalidEventIgnored {
625                bucket: &s3_event.s3.bucket.name,
626                key: &s3_event.s3.object.key,
627                kind: &s3_event.event_name.kind,
628                name: &s3_event.event_name.name,
629            });
630            return Ok(());
631        }
632
633        // S3 has to send notifications to a queue in the same region so I don't think this will
634        // actually ever be hit unless messages are being forwarded from one queue to another
635        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
636            return Err(ProcessingError::WrongRegion {
637                bucket: s3_event.s3.bucket.name.clone(),
638                key: s3_event.s3.object.key.clone(),
639                region: s3_event.aws_region,
640            });
641        }
642
643        if let Some(deferred) = &self.state.deferred {
644            let delta = Utc::now() - s3_event.event_time;
645            if delta.num_seconds() > deferred.max_age_secs as i64 {
646                return Err(ProcessingError::FileTooOld {
647                    bucket: s3_event.s3.bucket.name.clone(),
648                    key: s3_event.s3.object.key.clone(),
649                    deferred_queue: deferred.queue_url.clone(),
650                });
651            }
652        }
653
654        let object_result = self
655            .state
656            .s3_client
657            .get_object()
658            .bucket(s3_event.s3.bucket.name.clone())
659            .key(s3_event.s3.object.key.clone())
660            .send()
661            .await
662            .context(GetObjectSnafu {
663                bucket: s3_event.s3.bucket.name.clone(),
664                key: s3_event.s3.object.key.clone(),
665            });
666
667        let object = object_result?;
668
669        debug!(
670            message = "Got S3 object from SQS notification.",
671            bucket = s3_event.s3.bucket.name,
672            key = s3_event.s3.object.key,
673        );
674
675        let metadata = object.metadata;
676
677        let timestamp = object.last_modified.map(|ts| {
678            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
679                .single()
680                .expect("invalid timestamp")
681        });
682
683        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
684        let object_reader = super::s3_object_decoder(
685            self.state.compression,
686            &s3_event.s3.object.key,
687            object.content_encoding.as_deref(),
688            object.content_type.as_deref(),
689            object.body,
690        )
691        .await;
692
693        // Record the read error seen to propagate up later so we avoid ack'ing the SQS
694        // message
695        //
696        // String is used as we cannot clone std::io::Error to take ownership in closure
697        //
698        // FramedRead likely stops when it gets an i/o error but I found it more clear to
699        // show that we `take_while` there hasn't been an error
700        //
701        // This can result in objects being partially processed before an error, but we
702        // prefer duplicate lines over message loss. Future work could include recording
703        // the offset of the object that has been read, but this would only be relevant in
704        // the case that the same vector instance processes the same message.
705        let mut read_error = None;
706        let bytes_received = self.bytes_received.clone();
707        let events_received = self.events_received.clone();
708        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
709            FramedRead::new(object_reader, self.state.decoder.framer.clone())
710                .map(|res| {
711                    res.inspect(|bytes| {
712                        bytes_received.emit(ByteSize(bytes.len()));
713                    })
714                    .map_err(|err| {
715                        read_error = Some(err);
716                    })
717                    .ok()
718                })
719                .take_while(|res| ready(res.is_some()))
720                .map(|r| r.expect("validated by take_while")),
721        );
722
723        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
724            Some(config) => Box::new(
725                LineAgg::new(
726                    lines.map(|line| ((), line, ())),
727                    line_agg::Logic::new(config.clone()),
728                )
729                .map(|(_src, line, _context, _lastline_context)| line),
730            ),
731            None => lines,
732        };
733
734        let mut stream = lines.flat_map(|line| {
735            let events = match self.state.decoder.deserializer_parse(line) {
736                Ok((events, _events_size)) => events,
737                Err(_error) => {
738                    // Error is handled by `codecs::Decoder`, no further handling
739                    // is needed here.
740                    SmallVec::new()
741                }
742            };
743
744            let events = events
745                .into_iter()
746                .map(|mut event: Event| {
747                    event = event.with_batch_notifier_option(&batch);
748                    if let Some(log_event) = event.maybe_as_log_mut() {
749                        handle_single_log(
750                            log_event,
751                            log_namespace,
752                            &s3_event,
753                            &metadata,
754                            timestamp,
755                        );
756                    }
757                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
758                    event
759                })
760                .collect::<Vec<Event>>();
761            futures::stream::iter(events)
762        });
763
764        let send_error = match self.out.send_event_stream(&mut stream).await {
765            Ok(_) => None,
766            Err(_) => {
767                let (count, _) = stream.size_hint();
768                emit!(StreamClosedError { count });
769                Some(crate::source_sender::ClosedError)
770            }
771        };
772
773        // Up above, `lines` captures `read_error`, and eventually is captured by `stream`,
774        // so we explicitly drop it so that we can again utilize `read_error` below.
775        drop(stream);
776
777        // The BatchNotifier is cloned for each LogEvent in the batch stream, but the last
778        // reference must be dropped before the status of the batch is sent to the channel.
779        drop(batch);
780
781        if let Some(error) = read_error {
782            Err(ProcessingError::ReadObject {
783                source: error,
784                bucket: s3_event.s3.bucket.name.clone(),
785                key: s3_event.s3.object.key.clone(),
786            })
787        } else if let Some(error) = send_error {
788            Err(ProcessingError::PipelineSend {
789                source: error,
790                bucket: s3_event.s3.bucket.name.clone(),
791                key: s3_event.s3.object.key.clone(),
792            })
793        } else {
794            match receiver {
795                None => Ok(()),
796                Some(receiver) => {
797                    let result = receiver.await;
798                    match result {
799                        BatchStatus::Delivered => {
800                            debug!(
801                                message = "S3 object from SQS delivered.",
802                                bucket = s3_event.s3.bucket.name,
803                                key = s3_event.s3.object.key,
804                            );
805                            Ok(())
806                        }
807                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
808                            bucket: s3_event.s3.bucket.name,
809                            key: s3_event.s3.object.key,
810                            region: s3_event.aws_region,
811                        }),
812                        BatchStatus::Rejected => {
813                            if self.state.delete_failed_message {
814                                warn!(
815                                    message =
816                                        "S3 object from SQS was rejected. Deleting failed message.",
817                                    bucket = s3_event.s3.bucket.name,
818                                    key = s3_event.s3.object.key,
819                                );
820                                Ok(())
821                            } else {
822                                Err(ProcessingError::ErrorAcknowledgement {
823                                    bucket: s3_event.s3.bucket.name,
824                                    key: s3_event.s3.object.key,
825                                    region: s3_event.aws_region,
826                                })
827                            }
828                        }
829                    }
830                }
831            }
832        }
833    }
834
835    async fn receive_messages(
836        &mut self,
837    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
838        self.state
839            .sqs_client
840            .receive_message()
841            .queue_url(self.state.queue_url.clone())
842            .max_number_of_messages(self.state.max_number_of_messages)
843            .visibility_timeout(self.state.visibility_timeout_secs)
844            .wait_time_seconds(self.state.poll_secs)
845            .send()
846            .map_ok(|res| res.messages.unwrap_or_default())
847            .await
848    }
849
850    async fn delete_messages(
851        &mut self,
852        entries: Vec<DeleteMessageBatchRequestEntry>,
853    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
854        self.state
855            .sqs_client
856            .delete_message_batch()
857            .queue_url(self.state.queue_url.clone())
858            .set_entries(Some(entries))
859            .send()
860            .await
861    }
862
863    async fn send_messages(
864        &mut self,
865        entries: Vec<SendMessageBatchRequestEntry>,
866        queue_url: String,
867    ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
868        self.state
869            .sqs_client
870            .send_message_batch()
871            .queue_url(queue_url.clone())
872            .set_entries(Some(entries))
873            .send()
874            .await
875    }
876}
877
878fn handle_single_log(
879    log: &mut LogEvent,
880    log_namespace: LogNamespace,
881    s3_event: &S3EventRecord,
882    metadata: &Option<HashMap<String, String>>,
883    timestamp: Option<DateTime<Utc>>,
884) {
885    log_namespace.insert_source_metadata(
886        AwsS3Config::NAME,
887        log,
888        Some(LegacyKey::Overwrite(path!("bucket"))),
889        path!("bucket"),
890        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
891    );
892
893    log_namespace.insert_source_metadata(
894        AwsS3Config::NAME,
895        log,
896        Some(LegacyKey::Overwrite(path!("object"))),
897        path!("object"),
898        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
899    );
900    log_namespace.insert_source_metadata(
901        AwsS3Config::NAME,
902        log,
903        Some(LegacyKey::Overwrite(path!("region"))),
904        path!("region"),
905        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
906    );
907
908    if let Some(metadata) = metadata {
909        for (key, value) in metadata {
910            log_namespace.insert_source_metadata(
911                AwsS3Config::NAME,
912                log,
913                Some(LegacyKey::Overwrite(path!(key))),
914                path!("metadata", key.as_str()),
915                value.clone(),
916            );
917        }
918    }
919
920    log_namespace.insert_vector_metadata(
921        log,
922        log_schema().source_type_key(),
923        path!("source_type"),
924        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
925    );
926
927    // This handles the transition from the original timestamp logic. Originally the
928    // `timestamp_key` was populated by the `last_modified` time on the object, falling
929    // back to calling `now()`.
930    match log_namespace {
931        LogNamespace::Vector => {
932            if let Some(timestamp) = timestamp {
933                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
934            }
935
936            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
937        }
938        LogNamespace::Legacy => {
939            if let Some(timestamp_key) = log_schema().timestamp_key() {
940                log.try_insert(
941                    (PathPrefix::Event, timestamp_key),
942                    timestamp.unwrap_or_else(Utc::now),
943                );
944            }
945        }
946    };
947}
948
949// https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
950#[derive(Clone, Debug, Deserialize)]
951#[serde(rename_all = "PascalCase")]
952pub struct SnsNotification {
953    pub message: String,
954    pub timestamp: DateTime<Utc>,
955}
956
957// https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html
958#[derive(Clone, Debug, Deserialize)]
959#[serde(untagged)]
960enum SqsEvent {
961    Event(S3Event),
962    TestEvent(S3TestEvent),
963}
964
965#[derive(Clone, Debug, Deserialize)]
966#[serde(rename_all = "PascalCase")]
967pub struct S3TestEvent {
968    pub service: String,
969    pub event: S3EventName,
970    pub bucket: String,
971}
972
973// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
974#[derive(Clone, Debug, Deserialize, Serialize)]
975#[serde(rename_all = "PascalCase")]
976pub struct S3Event {
977    pub records: Vec<S3EventRecord>,
978}
979
980#[derive(Clone, Debug, Deserialize, Serialize)]
981#[serde(rename_all = "camelCase")]
982pub struct S3EventRecord {
983    pub event_version: S3EventVersion,
984    pub event_source: String,
985    pub aws_region: String,
986    pub event_name: S3EventName,
987    pub event_time: DateTime<Utc>,
988
989    pub s3: S3Message,
990}
991
992#[derive(Clone, Debug)]
993pub struct S3EventVersion {
994    pub major: u64,
995    pub minor: u64,
996}
997
998impl From<S3EventVersion> for semver::Version {
999    fn from(v: S3EventVersion) -> semver::Version {
1000        semver::Version::new(v.major, v.minor, 0)
1001    }
1002}
1003
1004// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1005// <major>.<minor>
1006impl<'de> Deserialize<'de> for S3EventVersion {
1007    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1008    where
1009        D: Deserializer<'de>,
1010    {
1011        use serde::de::Error;
1012
1013        let s = String::deserialize(deserializer)?;
1014
1015        let mut parts = s.splitn(2, '.');
1016
1017        let major = parts
1018            .next()
1019            .ok_or_else(|| D::Error::custom("Missing major version number"))?
1020            .parse::<u64>()
1021            .map_err(D::Error::custom)?;
1022
1023        let minor = parts
1024            .next()
1025            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1026            .parse::<u64>()
1027            .map_err(D::Error::custom)?;
1028
1029        Ok(S3EventVersion { major, minor })
1030    }
1031}
1032
1033impl Serialize for S3EventVersion {
1034    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1035    where
1036        S: Serializer,
1037    {
1038        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1039    }
1040}
1041
1042#[derive(Clone, Debug)]
1043pub struct S3EventName {
1044    pub kind: String,
1045    pub name: String,
1046}
1047
1048// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
1049//
1050// we could use enums here, but that seems overly brittle as deserialization would break if they
1051// add new event types or names
1052impl<'de> Deserialize<'de> for S3EventName {
1053    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1054    where
1055        D: Deserializer<'de>,
1056    {
1057        use serde::de::Error;
1058
1059        let s = String::deserialize(deserializer)?;
1060
1061        let mut parts = s.splitn(2, ':');
1062
1063        let kind = parts
1064            .next()
1065            .ok_or_else(|| D::Error::custom("Missing event kind"))?
1066            .parse::<String>()
1067            .map_err(D::Error::custom)?;
1068
1069        let name = parts
1070            .next()
1071            .ok_or_else(|| D::Error::custom("Missing event name"))?
1072            .parse::<String>()
1073            .map_err(D::Error::custom)?;
1074
1075        Ok(S3EventName { kind, name })
1076    }
1077}
1078
1079impl Serialize for S3EventName {
1080    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1081    where
1082        S: Serializer,
1083    {
1084        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1085    }
1086}
1087
1088#[derive(Clone, Debug, Deserialize, Serialize)]
1089#[serde(rename_all = "camelCase")]
1090pub struct S3Message {
1091    pub bucket: S3Bucket,
1092    pub object: S3Object,
1093}
1094
1095#[derive(Clone, Debug, Deserialize, Serialize)]
1096#[serde(rename_all = "camelCase")]
1097pub struct S3Bucket {
1098    pub name: String,
1099}
1100
1101#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1102#[serde(rename_all = "camelCase")]
1103pub struct S3Object {
1104    // S3ObjectKeys are URL encoded
1105    // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
1106    #[serde(with = "urlencoded_string")]
1107    pub key: String,
1108}
1109
1110mod urlencoded_string {
1111    use percent_encoding::{percent_decode, utf8_percent_encode};
1112
1113    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1114    where
1115        D: serde::de::Deserializer<'de>,
1116    {
1117        use serde::de::Error;
1118
1119        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1120            let decoded = if s.contains(&b'+') {
1121                // AWS encodes spaces as `+` rather than `%20`, so we first need to handle this.
1122                let s = s
1123                    .iter()
1124                    .map(|c| if *c == b'+' { b' ' } else { *c })
1125                    .collect::<Vec<_>>();
1126                percent_decode(&s).decode_utf8().map(Into::into)
1127            } else {
1128                percent_decode(s).decode_utf8().map(Into::into)
1129            };
1130
1131            decoded
1132                .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1133        })
1134    }
1135
1136    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1137    where
1138        S: serde::ser::Serializer,
1139    {
1140        serializer.serialize_str(
1141            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1142        )
1143    }
1144}
1145
1146#[test]
1147fn test_key_deserialize() {
1148    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1149    assert_eq!(
1150        S3Object {
1151            key: "noog nork".to_string(),
1152        },
1153        value
1154    );
1155
1156    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1157    assert_eq!(
1158        S3Object {
1159            key: "noog+nork".to_string(),
1160        },
1161        value
1162    );
1163}
1164
1165#[test]
1166fn test_s3_testevent() {
1167    let value: S3TestEvent = serde_json::from_str(
1168        r#"{
1169        "Service":"Amazon S3",
1170        "Event":"s3:TestEvent",
1171        "Time":"2014-10-13T15:57:02.089Z",
1172        "Bucket":"bucketname",
1173        "RequestId":"5582815E1AEA5ADF",
1174        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1175     }"#,
1176    )
1177    .unwrap();
1178
1179    assert_eq!(value.service, "Amazon S3".to_string());
1180    assert_eq!(value.bucket, "bucketname".to_string());
1181    assert_eq!(value.event.kind, "s3".to_string());
1182    assert_eq!(value.event.name, "TestEvent".to_string());
1183}
1184
1185#[test]
1186fn test_s3_sns_testevent() {
1187    let sns_value: SnsNotification = serde_json::from_str(
1188        r#"{
1189        "Type" : "Notification",
1190        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1191        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1192        "Subject" : "Testing publish to subscribed queues",
1193        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1194        "Timestamp" : "2012-03-29T05:12:16.901Z",
1195        "SignatureVersion" : "1",
1196        "Signature" : "EXAMPLEnTrFPa3...",
1197        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1198        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1199     }"#,
1200    ).unwrap();
1201
1202    assert_eq!(
1203        sns_value.timestamp,
1204        DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1205            .unwrap()
1206            .to_utc()
1207    );
1208
1209    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1210
1211    assert_eq!(value.service, "Amazon S3".to_string());
1212    assert_eq!(value.bucket, "bucketname".to_string());
1213    assert_eq!(value.event.kind, "s3".to_string());
1214    assert_eq!(value.event.name, "TestEvent".to_string());
1215}
1216
1217#[test]
1218fn parse_sqs_config() {
1219    let config: Config = toml::from_str(
1220        r#"
1221            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1222        "#,
1223    )
1224    .unwrap();
1225    assert_eq!(
1226        config.queue_url,
1227        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1228    );
1229    assert!(config.deferred.is_none());
1230
1231    let config: Config = toml::from_str(
1232        r#"
1233            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1234            [deferred]
1235            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1236            max_age_secs = 3600
1237        "#,
1238    )
1239    .unwrap();
1240    assert_eq!(
1241        config.queue_url,
1242        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1243    );
1244    let Some(deferred) = config.deferred else {
1245        panic!("Expected deferred config");
1246    };
1247    assert_eq!(
1248        deferred.queue_url,
1249        "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1250    );
1251    assert_eq!(deferred.max_age_secs, 3600);
1252
1253    let test: Result<Config, toml::de::Error> = toml::from_str(
1254        r#"
1255            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1256            [deferred]
1257            max_age_secs = 3600
1258        "#,
1259    );
1260    assert!(test.is_err());
1261
1262    let test: Result<Config, toml::de::Error> = toml::from_str(
1263        r#"
1264            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1265            [deferred]
1266            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1267        "#,
1268    );
1269    assert!(test.is_err());
1270}