vector/sources/aws_s3/
sqs.rs

1use std::collections::HashMap;
2use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock};
3
4use aws_sdk_s3::operation::get_object::GetObjectError;
5use aws_sdk_s3::Client as S3Client;
6use aws_sdk_sqs::operation::delete_message_batch::{
7    DeleteMessageBatchError, DeleteMessageBatchOutput,
8};
9use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
10use aws_sdk_sqs::operation::send_message_batch::{SendMessageBatchError, SendMessageBatchOutput};
11use aws_sdk_sqs::types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry};
12use aws_sdk_sqs::Client as SqsClient;
13use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
14use aws_smithy_runtime_api::client::result::SdkError;
15use aws_types::region::Region;
16use bytes::Bytes;
17use chrono::{DateTime, TimeZone, Utc};
18use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use serde_with::serde_as;
21use smallvec::SmallVec;
22use snafu::{ResultExt, Snafu};
23use tokio::{pin, select};
24use tokio_util::codec::FramedRead;
25use tracing::Instrument;
26use vector_lib::codecs::decoding::FramingError;
27use vector_lib::configurable::configurable_component;
28use vector_lib::internal_event::{
29    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
30};
31
32use crate::codecs::Decoder;
33use crate::event::{Event, LogEvent};
34use crate::internal_events::{
35    SqsMessageSendBatchError, SqsMessageSentPartialError, SqsMessageSentSucceeded,
36};
37use crate::{
38    aws::AwsTimeout,
39    config::{SourceAcknowledgementsConfig, SourceContext},
40    event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf},
41    internal_events::{
42        EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
43        SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
44        SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsS3EventRecordInvalidEventIgnored,
45        StreamClosedError,
46    },
47    line_agg::{self, LineAgg},
48    shutdown::ShutdownSignal,
49    sources::aws_s3::AwsS3Config,
50    tls::TlsConfig,
51    SourceSender,
52};
53use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
54use vector_lib::event::MaybeAsLogMut;
55use vector_lib::lookup::{metadata_path, path, PathPrefix};
56
57static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
58    LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
59
60/// Configuration for deferring events based on their age.
61#[serde_as]
62#[configurable_component]
63#[derive(Clone, Debug, Default)]
64#[serde(deny_unknown_fields)]
65pub(super) struct DeferredConfig {
66    /// The URL of the queue to forward events to when they are older than `max_age_secs`.
67    #[configurable(metadata(
68        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
69    ))]
70    #[configurable(validation(format = "uri"))]
71    pub(super) queue_url: String,
72
73    /// Event must have been emitted within the last `max_age_secs` seconds to be processed.
74    ///
75    /// If the event is older, it is forwarded to the `queue_url` for later processing.
76    #[configurable(metadata(docs::type_unit = "seconds"))]
77    #[configurable(metadata(docs::examples = 3600))]
78    pub(super) max_age_secs: u64,
79}
80
81/// SQS configuration options.
82//
83// TODO: It seems awfully likely that we could re-use the existing configuration type for the `aws_sqs` source in some
84// way, given the near 100% overlap in configurable values.
85#[serde_as]
86#[configurable_component]
87#[derive(Clone, Debug, Derivative)]
88#[derivative(Default)]
89#[serde(deny_unknown_fields)]
90pub(super) struct Config {
91    /// The URL of the SQS queue to poll for bucket notifications.
92    #[configurable(metadata(
93        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
94    ))]
95    #[configurable(validation(format = "uri"))]
96    pub(super) queue_url: String,
97
98    /// How long to wait while polling the queue for new messages, in seconds.
99    ///
100    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
101    /// they are always consumed, regardless of the value of `poll_secs`.
102    // NOTE: We restrict this to u32 for safe conversion to i32 later.
103    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
104    #[serde(default = "default_poll_secs")]
105    #[derivative(Default(value = "default_poll_secs()"))]
106    #[configurable(metadata(docs::type_unit = "seconds"))]
107    pub(super) poll_secs: u32,
108
109    /// The visibility timeout to use for messages, in seconds.
110    ///
111    /// This controls how long a message is left unavailable after it is received. If a message is received, and
112    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
113    ///
114    /// This can happen if there is an issue between consuming a message and deleting it.
115    // NOTE: We restrict this to u32 for safe conversion to i32 later.
116    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
117    #[serde(default = "default_visibility_timeout_secs")]
118    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
119    #[configurable(metadata(docs::type_unit = "seconds"))]
120    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
121    pub(super) visibility_timeout_secs: u32,
122
123    /// Whether to delete the message once it is processed.
124    ///
125    /// It can be useful to set this to `false` for debugging or during the initial setup.
126    #[serde(default = "default_true")]
127    #[derivative(Default(value = "default_true()"))]
128    pub(super) delete_message: bool,
129
130    /// Whether to delete non-retryable messages.
131    ///
132    /// If a message is rejected by the sink and not retryable, it is deleted from the queue.
133    #[serde(default = "default_true")]
134    #[derivative(Default(value = "default_true()"))]
135    pub(super) delete_failed_message: bool,
136
137    /// Number of concurrent tasks to create for polling the queue for messages.
138    ///
139    /// Defaults to the number of available CPUs on the system.
140    ///
141    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
142    /// value when there is a high rate of messages being pushed into the queue and the objects
143    /// being fetched are small. In these cases, system resources may not be fully utilized without
144    /// fetching more messages per second, as the SQS message consumption rate affects the S3 object
145    /// retrieval rate.
146    #[configurable(metadata(docs::type_unit = "tasks"))]
147    #[configurable(metadata(docs::examples = 5))]
148    pub(super) client_concurrency: Option<NonZeroUsize>,
149
150    /// Maximum number of messages to poll from SQS in a batch
151    ///
152    /// Defaults to 10
153    ///
154    /// Should be set to a smaller value when the files are large to help prevent the ingestion of
155    /// one file from causing the other files to exceed the visibility_timeout. Valid values are 1 - 10
156    // NOTE: We restrict this to u32 for safe conversion to i32 later.
157    #[serde(default = "default_max_number_of_messages")]
158    #[derivative(Default(value = "default_max_number_of_messages()"))]
159    #[configurable(metadata(docs::human_name = "Max Messages"))]
160    #[configurable(metadata(docs::examples = 1))]
161    pub(super) max_number_of_messages: u32,
162
163    #[configurable(derived)]
164    #[serde(default)]
165    #[derivative(Default)]
166    pub(super) tls_options: Option<TlsConfig>,
167
168    // Client timeout configuration for SQS operations. Take care when configuring these settings
169    // to allow enough time for the polling interval configured in `poll_secs`.
170    #[configurable(derived)]
171    #[derivative(Default)]
172    #[serde(default)]
173    #[serde(flatten)]
174    pub(super) timeout: Option<AwsTimeout>,
175
176    /// Configuration for deferring events to another queue based on their age.
177    #[configurable(derived)]
178    pub(super) deferred: Option<DeferredConfig>,
179}
180
181const fn default_poll_secs() -> u32 {
182    15
183}
184
185const fn default_visibility_timeout_secs() -> u32 {
186    300
187}
188
189const fn default_max_number_of_messages() -> u32 {
190    10
191}
192
193const fn default_true() -> bool {
194    true
195}
196
197#[derive(Debug, Snafu)]
198pub(super) enum IngestorNewError {
199    #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
200    InvalidNumberOfMessages { messages: u32 },
201}
202
203#[allow(clippy::large_enum_variant)]
204#[derive(Debug, Snafu)]
205pub enum ProcessingError {
206    #[snafu(display(
207        "Could not parse SQS message with id {} as S3 notification: {}",
208        message_id,
209        source
210    ))]
211    InvalidSqsMessage {
212        source: serde_json::Error,
213        message_id: String,
214    },
215    #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
216    GetObject {
217        source: SdkError<GetObjectError, HttpResponse>,
218        bucket: String,
219        key: String,
220    },
221    #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
222    ReadObject {
223        source: Box<dyn FramingError>,
224        bucket: String,
225        key: String,
226    },
227    #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
228    PipelineSend {
229        source: crate::source_sender::ClosedError,
230        bucket: String,
231        key: String,
232    },
233    #[snafu(display(
234        "Object notification for s3://{}/{} is a bucket in another region: {}",
235        bucket,
236        key,
237        region
238    ))]
239    WrongRegion {
240        region: String,
241        bucket: String,
242        key: String,
243    },
244    #[snafu(display("Unsupported S3 event version: {}.", version,))]
245    UnsupportedS3EventVersion { version: semver::Version },
246    #[snafu(display(
247        "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
248        region,
249        bucket,
250        key
251    ))]
252    ErrorAcknowledgement {
253        region: String,
254        bucket: String,
255        key: String,
256    },
257    #[snafu(display(
258        "File s3://{}/{} too old.  Forwarded to deferred queue {}",
259        bucket,
260        key,
261        deferred_queue
262    ))]
263    FileTooOld {
264        bucket: String,
265        key: String,
266        deferred_queue: String,
267    },
268}
269
270pub struct State {
271    region: Region,
272
273    s3_client: S3Client,
274    sqs_client: SqsClient,
275
276    multiline: Option<line_agg::Config>,
277    compression: super::Compression,
278
279    queue_url: String,
280    poll_secs: i32,
281    max_number_of_messages: i32,
282    client_concurrency: usize,
283    visibility_timeout_secs: i32,
284    delete_message: bool,
285    delete_failed_message: bool,
286    decoder: Decoder,
287
288    deferred: Option<DeferredConfig>,
289}
290
291pub(super) struct Ingestor {
292    state: Arc<State>,
293}
294
295impl Ingestor {
296    pub(super) async fn new(
297        region: Region,
298        sqs_client: SqsClient,
299        s3_client: S3Client,
300        config: Config,
301        compression: super::Compression,
302        multiline: Option<line_agg::Config>,
303        decoder: Decoder,
304    ) -> Result<Ingestor, IngestorNewError> {
305        if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
306            return Err(IngestorNewError::InvalidNumberOfMessages {
307                messages: config.max_number_of_messages,
308            });
309        }
310        let state = Arc::new(State {
311            region,
312
313            s3_client,
314            sqs_client,
315
316            compression,
317            multiline,
318
319            queue_url: config.queue_url,
320            poll_secs: config.poll_secs as i32,
321            max_number_of_messages: config.max_number_of_messages as i32,
322            client_concurrency: config
323                .client_concurrency
324                .map(|n| n.get())
325                .unwrap_or_else(crate::num_threads),
326            visibility_timeout_secs: config.visibility_timeout_secs as i32,
327            delete_message: config.delete_message,
328            delete_failed_message: config.delete_failed_message,
329            decoder,
330
331            deferred: config.deferred,
332        });
333
334        Ok(Ingestor { state })
335    }
336
337    pub(super) async fn run(
338        self,
339        cx: SourceContext,
340        acknowledgements: SourceAcknowledgementsConfig,
341        log_namespace: LogNamespace,
342    ) -> Result<(), ()> {
343        let acknowledgements = cx.do_acknowledgements(acknowledgements);
344        let mut handles = Vec::new();
345        for _ in 0..self.state.client_concurrency {
346            let process = IngestorProcess::new(
347                Arc::clone(&self.state),
348                cx.out.clone(),
349                cx.shutdown.clone(),
350                log_namespace,
351                acknowledgements,
352            );
353            let fut = process.run();
354            let handle = tokio::spawn(fut.in_current_span());
355            handles.push(handle);
356        }
357
358        // Wait for all of the processes to finish.  If any one of them panics, we resume
359        // that panic here to properly shutdown Vector.
360        for handle in handles.drain(..) {
361            if let Err(e) = handle.await {
362                if e.is_panic() {
363                    panic::resume_unwind(e.into_panic());
364                }
365            }
366        }
367
368        Ok(())
369    }
370}
371
372pub struct IngestorProcess {
373    state: Arc<State>,
374    out: SourceSender,
375    shutdown: ShutdownSignal,
376    acknowledgements: bool,
377    log_namespace: LogNamespace,
378    bytes_received: Registered<BytesReceived>,
379    events_received: Registered<EventsReceived>,
380}
381
382impl IngestorProcess {
383    pub fn new(
384        state: Arc<State>,
385        out: SourceSender,
386        shutdown: ShutdownSignal,
387        log_namespace: LogNamespace,
388        acknowledgements: bool,
389    ) -> Self {
390        Self {
391            state,
392            out,
393            shutdown,
394            acknowledgements,
395            log_namespace,
396            bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
397            events_received: register!(EventsReceived),
398        }
399    }
400
401    async fn run(mut self) {
402        let shutdown = self.shutdown.clone().fuse();
403        pin!(shutdown);
404
405        loop {
406            select! {
407                _ = &mut shutdown => break,
408                _ = self.run_once() => {},
409            }
410        }
411    }
412
413    async fn run_once(&mut self) {
414        let messages = self.receive_messages().await;
415        let messages = messages
416            .inspect(|messages| {
417                emit!(SqsMessageReceiveSucceeded {
418                    count: messages.len(),
419                });
420            })
421            .inspect_err(|err| {
422                emit!(SqsMessageReceiveError { error: err });
423            })
424            .unwrap_or_default();
425
426        let mut delete_entries = Vec::new();
427        let mut deferred_entries = Vec::new();
428        for message in messages {
429            let receipt_handle = match message.receipt_handle {
430                None => {
431                    // I don't think this will ever actually happen, but is just an artifact of the
432                    // AWS's API predilection for returning nullable values for all response
433                    // attributes
434                    warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
435                    continue;
436                }
437                Some(ref handle) => handle.to_owned(),
438            };
439
440            let message_id = message
441                .message_id
442                .clone()
443                .unwrap_or_else(|| "<unknown>".to_owned());
444            match self.handle_sqs_message(message.clone()).await {
445                Ok(()) => {
446                    emit!(SqsMessageProcessingSucceeded {
447                        message_id: &message_id
448                    });
449                    if self.state.delete_message {
450                        trace!(
451                            message = "Queued SQS message for deletion.",
452                            id = message_id,
453                            receipt_handle = receipt_handle,
454                        );
455                        delete_entries.push(
456                            DeleteMessageBatchRequestEntry::builder()
457                                .id(message_id.clone())
458                                .receipt_handle(receipt_handle)
459                                .build()
460                                .expect("all required builder params specified"),
461                        );
462                    }
463                }
464                Err(err) => {
465                    match err {
466                        ProcessingError::FileTooOld { .. } => {
467                            emit!(SqsMessageProcessingSucceeded {
468                                message_id: &message_id
469                            });
470                            if let Some(deferred) = &self.state.deferred {
471                                trace!(
472                                    message = "Forwarding message to deferred queue.",
473                                    id = message_id,
474                                    receipt_handle = receipt_handle,
475                                    deferred_queue = deferred.queue_url,
476                                );
477
478                                deferred_entries.push(
479                                    SendMessageBatchRequestEntry::builder()
480                                        .id(message_id.clone())
481                                        .message_body(message.body.unwrap_or_default())
482                                        .build()
483                                        .expect("all required builder params specified"),
484                                );
485                            }
486                            //  maybe delete the message from current queue since we have processed it
487                            if self.state.delete_message {
488                                trace!(
489                                    message = "Queued SQS message for deletion.",
490                                    id = message_id,
491                                    receipt_handle = receipt_handle,
492                                );
493                                delete_entries.push(
494                                    DeleteMessageBatchRequestEntry::builder()
495                                        .id(message_id)
496                                        .receipt_handle(receipt_handle)
497                                        .build()
498                                        .expect("all required builder params specified"),
499                                );
500                            }
501                        }
502                        _ => {
503                            emit!(SqsMessageProcessingError {
504                                message_id: &message_id,
505                                error: &err,
506                            });
507                        }
508                    }
509                }
510            }
511        }
512
513        // Should consider removing failed deferrals from the delete_entries
514        if !deferred_entries.is_empty() {
515            let Some(deferred) = &self.state.deferred else {
516                warn!(
517                    message = "Deferred queue not configured, but received deferred entries.",
518                    internal_log_rate_limit = true
519                );
520                return;
521            };
522            let cloned_entries = deferred_entries.clone();
523            match self
524                .send_messages(deferred_entries, deferred.queue_url.clone())
525                .await
526            {
527                Ok(result) => {
528                    if !result.successful.is_empty() {
529                        emit!(SqsMessageSentSucceeded {
530                            message_ids: result.successful,
531                        })
532                    }
533
534                    if !result.failed.is_empty() {
535                        emit!(SqsMessageSentPartialError {
536                            entries: result.failed
537                        })
538                    }
539                }
540                Err(err) => {
541                    emit!(SqsMessageSendBatchError {
542                        entries: cloned_entries,
543                        error: err,
544                    });
545                }
546            }
547        }
548        if !delete_entries.is_empty() {
549            // We need these for a correct error message if the batch fails overall.
550            let cloned_entries = delete_entries.clone();
551            match self.delete_messages(delete_entries).await {
552                Ok(result) => {
553                    // Batch deletes can have partial successes/failures, so we have to check
554                    // for both cases and emit accordingly.
555                    if !result.successful.is_empty() {
556                        emit!(SqsMessageDeleteSucceeded {
557                            message_ids: result.successful,
558                        });
559                    }
560
561                    if !result.failed.is_empty() {
562                        emit!(SqsMessageDeletePartialError {
563                            entries: result.failed
564                        });
565                    }
566                }
567                Err(err) => {
568                    emit!(SqsMessageDeleteBatchError {
569                        entries: cloned_entries,
570                        error: err,
571                    });
572                }
573            }
574        }
575    }
576
577    async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
578        let sqs_body = message.body.unwrap_or_default();
579        let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
580            .map(|notification| notification.message)
581            .unwrap_or(sqs_body);
582        let s3_event: SqsEvent =
583            serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
584                message_id: message
585                    .message_id
586                    .clone()
587                    .unwrap_or_else(|| "<empty>".to_owned()),
588            })?;
589
590        match s3_event {
591            SqsEvent::TestEvent(_s3_test_event) => {
592                debug!(?message.message_id, message = "Found S3 Test Event.");
593                Ok(())
594            }
595            SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
596        }
597    }
598
599    async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
600        for record in s3_event.records {
601            self.handle_s3_event_record(record, self.log_namespace)
602                .await?
603        }
604        Ok(())
605    }
606
607    async fn handle_s3_event_record(
608        &mut self,
609        s3_event: S3EventRecord,
610        log_namespace: LogNamespace,
611    ) -> Result<(), ProcessingError> {
612        let event_version: semver::Version = s3_event.event_version.clone().into();
613        if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
614            return Err(ProcessingError::UnsupportedS3EventVersion {
615                version: event_version.clone(),
616            });
617        }
618
619        if s3_event.event_name.kind != "ObjectCreated" {
620            emit!(SqsS3EventRecordInvalidEventIgnored {
621                bucket: &s3_event.s3.bucket.name,
622                key: &s3_event.s3.object.key,
623                kind: &s3_event.event_name.kind,
624                name: &s3_event.event_name.name,
625            });
626            return Ok(());
627        }
628
629        // S3 has to send notifications to a queue in the same region so I don't think this will
630        // actually ever be hit unless messages are being forwarded from one queue to another
631        if self.state.region.as_ref() != s3_event.aws_region.as_str() {
632            return Err(ProcessingError::WrongRegion {
633                bucket: s3_event.s3.bucket.name.clone(),
634                key: s3_event.s3.object.key.clone(),
635                region: s3_event.aws_region,
636            });
637        }
638
639        if let Some(deferred) = &self.state.deferred {
640            let delta = Utc::now() - s3_event.event_time;
641            if delta.num_seconds() > deferred.max_age_secs as i64 {
642                return Err(ProcessingError::FileTooOld {
643                    bucket: s3_event.s3.bucket.name.clone(),
644                    key: s3_event.s3.object.key.clone(),
645                    deferred_queue: deferred.queue_url.clone(),
646                });
647            }
648        }
649
650        let object_result = self
651            .state
652            .s3_client
653            .get_object()
654            .bucket(s3_event.s3.bucket.name.clone())
655            .key(s3_event.s3.object.key.clone())
656            .send()
657            .await
658            .context(GetObjectSnafu {
659                bucket: s3_event.s3.bucket.name.clone(),
660                key: s3_event.s3.object.key.clone(),
661            });
662
663        let object = object_result?;
664
665        debug!(
666            message = "Got S3 object from SQS notification.",
667            bucket = s3_event.s3.bucket.name,
668            key = s3_event.s3.object.key,
669        );
670
671        let metadata = object.metadata;
672
673        let timestamp = object.last_modified.map(|ts| {
674            Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
675                .single()
676                .expect("invalid timestamp")
677        });
678
679        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
680        let object_reader = super::s3_object_decoder(
681            self.state.compression,
682            &s3_event.s3.object.key,
683            object.content_encoding.as_deref(),
684            object.content_type.as_deref(),
685            object.body,
686        )
687        .await;
688
689        // Record the read error seen to propagate up later so we avoid ack'ing the SQS
690        // message
691        //
692        // String is used as we cannot clone std::io::Error to take ownership in closure
693        //
694        // FramedRead likely stops when it gets an i/o error but I found it more clear to
695        // show that we `take_while` there hasn't been an error
696        //
697        // This can result in objects being partially processed before an error, but we
698        // prefer duplicate lines over message loss. Future work could include recording
699        // the offset of the object that has been read, but this would only be relevant in
700        // the case that the same vector instance processes the same message.
701        let mut read_error = None;
702        let bytes_received = self.bytes_received.clone();
703        let events_received = self.events_received.clone();
704        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
705            FramedRead::new(object_reader, self.state.decoder.framer.clone())
706                .map(|res| {
707                    res.inspect(|bytes| {
708                        bytes_received.emit(ByteSize(bytes.len()));
709                    })
710                    .map_err(|err| {
711                        read_error = Some(err);
712                    })
713                    .ok()
714                })
715                .take_while(|res| ready(res.is_some()))
716                .map(|r| r.expect("validated by take_while")),
717        );
718
719        let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
720            Some(config) => Box::new(
721                LineAgg::new(
722                    lines.map(|line| ((), line, ())),
723                    line_agg::Logic::new(config.clone()),
724                )
725                .map(|(_src, line, _context, _lastline_context)| line),
726            ),
727            None => lines,
728        };
729
730        let mut stream = lines.flat_map(|line| {
731            let events = match self.state.decoder.deserializer_parse(line) {
732                Ok((events, _events_size)) => events,
733                Err(_error) => {
734                    // Error is handled by `codecs::Decoder`, no further handling
735                    // is needed here.
736                    SmallVec::new()
737                }
738            };
739
740            let events = events
741                .into_iter()
742                .map(|mut event: Event| {
743                    event = event.with_batch_notifier_option(&batch);
744                    if let Some(log_event) = event.maybe_as_log_mut() {
745                        handle_single_log(
746                            log_event,
747                            log_namespace,
748                            &s3_event,
749                            &metadata,
750                            timestamp,
751                        );
752                    }
753                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
754                    event
755                })
756                .collect::<Vec<Event>>();
757            futures::stream::iter(events)
758        });
759
760        let send_error = match self.out.send_event_stream(&mut stream).await {
761            Ok(_) => None,
762            Err(_) => {
763                let (count, _) = stream.size_hint();
764                emit!(StreamClosedError { count });
765                Some(crate::source_sender::ClosedError)
766            }
767        };
768
769        // Up above, `lines` captures `read_error`, and eventually is captured by `stream`,
770        // so we explicitly drop it so that we can again utilize `read_error` below.
771        drop(stream);
772
773        // The BatchNotifier is cloned for each LogEvent in the batch stream, but the last
774        // reference must be dropped before the status of the batch is sent to the channel.
775        drop(batch);
776
777        if let Some(error) = read_error {
778            Err(ProcessingError::ReadObject {
779                source: error,
780                bucket: s3_event.s3.bucket.name.clone(),
781                key: s3_event.s3.object.key.clone(),
782            })
783        } else if let Some(error) = send_error {
784            Err(ProcessingError::PipelineSend {
785                source: error,
786                bucket: s3_event.s3.bucket.name.clone(),
787                key: s3_event.s3.object.key.clone(),
788            })
789        } else {
790            match receiver {
791                None => Ok(()),
792                Some(receiver) => {
793                    let result = receiver.await;
794                    match result {
795                        BatchStatus::Delivered => {
796                            debug!(
797                                message = "S3 object from SQS delivered.",
798                                bucket = s3_event.s3.bucket.name,
799                                key = s3_event.s3.object.key,
800                            );
801                            Ok(())
802                        }
803                        BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
804                            bucket: s3_event.s3.bucket.name,
805                            key: s3_event.s3.object.key,
806                            region: s3_event.aws_region,
807                        }),
808                        BatchStatus::Rejected => {
809                            if self.state.delete_failed_message {
810                                warn!(
811                                    message =
812                                        "S3 object from SQS was rejected. Deleting failed message.",
813                                    bucket = s3_event.s3.bucket.name,
814                                    key = s3_event.s3.object.key,
815                                );
816                                Ok(())
817                            } else {
818                                Err(ProcessingError::ErrorAcknowledgement {
819                                    bucket: s3_event.s3.bucket.name,
820                                    key: s3_event.s3.object.key,
821                                    region: s3_event.aws_region,
822                                })
823                            }
824                        }
825                    }
826                }
827            }
828        }
829    }
830
831    async fn receive_messages(
832        &mut self,
833    ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
834        self.state
835            .sqs_client
836            .receive_message()
837            .queue_url(self.state.queue_url.clone())
838            .max_number_of_messages(self.state.max_number_of_messages)
839            .visibility_timeout(self.state.visibility_timeout_secs)
840            .wait_time_seconds(self.state.poll_secs)
841            .send()
842            .map_ok(|res| res.messages.unwrap_or_default())
843            .await
844    }
845
846    async fn delete_messages(
847        &mut self,
848        entries: Vec<DeleteMessageBatchRequestEntry>,
849    ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
850        self.state
851            .sqs_client
852            .delete_message_batch()
853            .queue_url(self.state.queue_url.clone())
854            .set_entries(Some(entries))
855            .send()
856            .await
857    }
858
859    async fn send_messages(
860        &mut self,
861        entries: Vec<SendMessageBatchRequestEntry>,
862        queue_url: String,
863    ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
864        self.state
865            .sqs_client
866            .send_message_batch()
867            .queue_url(queue_url.clone())
868            .set_entries(Some(entries))
869            .send()
870            .await
871    }
872}
873
874fn handle_single_log(
875    log: &mut LogEvent,
876    log_namespace: LogNamespace,
877    s3_event: &S3EventRecord,
878    metadata: &Option<HashMap<String, String>>,
879    timestamp: Option<DateTime<Utc>>,
880) {
881    log_namespace.insert_source_metadata(
882        AwsS3Config::NAME,
883        log,
884        Some(LegacyKey::Overwrite(path!("bucket"))),
885        path!("bucket"),
886        Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
887    );
888
889    log_namespace.insert_source_metadata(
890        AwsS3Config::NAME,
891        log,
892        Some(LegacyKey::Overwrite(path!("object"))),
893        path!("object"),
894        Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
895    );
896    log_namespace.insert_source_metadata(
897        AwsS3Config::NAME,
898        log,
899        Some(LegacyKey::Overwrite(path!("region"))),
900        path!("region"),
901        Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
902    );
903
904    if let Some(metadata) = metadata {
905        for (key, value) in metadata {
906            log_namespace.insert_source_metadata(
907                AwsS3Config::NAME,
908                log,
909                Some(LegacyKey::Overwrite(path!(key))),
910                path!("metadata", key.as_str()),
911                value.clone(),
912            );
913        }
914    }
915
916    log_namespace.insert_vector_metadata(
917        log,
918        log_schema().source_type_key(),
919        path!("source_type"),
920        Bytes::from_static(AwsS3Config::NAME.as_bytes()),
921    );
922
923    // This handles the transition from the original timestamp logic. Originally the
924    // `timestamp_key` was populated by the `last_modified` time on the object, falling
925    // back to calling `now()`.
926    match log_namespace {
927        LogNamespace::Vector => {
928            if let Some(timestamp) = timestamp {
929                log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
930            }
931
932            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
933        }
934        LogNamespace::Legacy => {
935            if let Some(timestamp_key) = log_schema().timestamp_key() {
936                log.try_insert(
937                    (PathPrefix::Event, timestamp_key),
938                    timestamp.unwrap_or_else(Utc::now),
939                );
940            }
941        }
942    };
943}
944
945// https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html
946#[derive(Clone, Debug, Deserialize)]
947#[serde(rename_all = "PascalCase")]
948pub struct SnsNotification {
949    pub message: String,
950    pub timestamp: DateTime<Utc>,
951}
952
953// https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-enable-disable-notification-intro.html
954#[derive(Clone, Debug, Deserialize)]
955#[serde(untagged)]
956enum SqsEvent {
957    Event(S3Event),
958    TestEvent(S3TestEvent),
959}
960
961#[derive(Clone, Debug, Deserialize)]
962#[serde(rename_all = "PascalCase")]
963pub struct S3TestEvent {
964    pub service: String,
965    pub event: S3EventName,
966    pub bucket: String,
967}
968
969// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
970#[derive(Clone, Debug, Deserialize, Serialize)]
971#[serde(rename_all = "PascalCase")]
972pub struct S3Event {
973    pub records: Vec<S3EventRecord>,
974}
975
976#[derive(Clone, Debug, Deserialize, Serialize)]
977#[serde(rename_all = "camelCase")]
978pub struct S3EventRecord {
979    pub event_version: S3EventVersion,
980    pub event_source: String,
981    pub aws_region: String,
982    pub event_name: S3EventName,
983    pub event_time: DateTime<Utc>,
984
985    pub s3: S3Message,
986}
987
988#[derive(Clone, Debug)]
989pub struct S3EventVersion {
990    pub major: u64,
991    pub minor: u64,
992}
993
994impl From<S3EventVersion> for semver::Version {
995    fn from(v: S3EventVersion) -> semver::Version {
996        semver::Version::new(v.major, v.minor, 0)
997    }
998}
999
1000// https://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
1001// <major>.<minor>
1002impl<'de> Deserialize<'de> for S3EventVersion {
1003    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1004    where
1005        D: Deserializer<'de>,
1006    {
1007        use serde::de::Error;
1008
1009        let s = String::deserialize(deserializer)?;
1010
1011        let mut parts = s.splitn(2, '.');
1012
1013        let major = parts
1014            .next()
1015            .ok_or_else(|| D::Error::custom("Missing major version number"))?
1016            .parse::<u64>()
1017            .map_err(D::Error::custom)?;
1018
1019        let minor = parts
1020            .next()
1021            .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1022            .parse::<u64>()
1023            .map_err(D::Error::custom)?;
1024
1025        Ok(S3EventVersion { major, minor })
1026    }
1027}
1028
1029impl Serialize for S3EventVersion {
1030    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1031    where
1032        S: Serializer,
1033    {
1034        serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1035    }
1036}
1037
1038#[derive(Clone, Debug)]
1039pub struct S3EventName {
1040    pub kind: String,
1041    pub name: String,
1042}
1043
1044// https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html#supported-notification-event-types
1045//
1046// we could use enums here, but that seems overly brittle as deserialization would break if they
1047// add new event types or names
1048impl<'de> Deserialize<'de> for S3EventName {
1049    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1050    where
1051        D: Deserializer<'de>,
1052    {
1053        use serde::de::Error;
1054
1055        let s = String::deserialize(deserializer)?;
1056
1057        let mut parts = s.splitn(2, ':');
1058
1059        let kind = parts
1060            .next()
1061            .ok_or_else(|| D::Error::custom("Missing event kind"))?
1062            .parse::<String>()
1063            .map_err(D::Error::custom)?;
1064
1065        let name = parts
1066            .next()
1067            .ok_or_else(|| D::Error::custom("Missing event name"))?
1068            .parse::<String>()
1069            .map_err(D::Error::custom)?;
1070
1071        Ok(S3EventName { kind, name })
1072    }
1073}
1074
1075impl Serialize for S3EventName {
1076    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1077    where
1078        S: Serializer,
1079    {
1080        serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1081    }
1082}
1083
1084#[derive(Clone, Debug, Deserialize, Serialize)]
1085#[serde(rename_all = "camelCase")]
1086pub struct S3Message {
1087    pub bucket: S3Bucket,
1088    pub object: S3Object,
1089}
1090
1091#[derive(Clone, Debug, Deserialize, Serialize)]
1092#[serde(rename_all = "camelCase")]
1093pub struct S3Bucket {
1094    pub name: String,
1095}
1096
1097#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1098#[serde(rename_all = "camelCase")]
1099pub struct S3Object {
1100    // S3ObjectKeys are URL encoded
1101    // https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
1102    #[serde(with = "urlencoded_string")]
1103    pub key: String,
1104}
1105
1106mod urlencoded_string {
1107    use percent_encoding::{percent_decode, utf8_percent_encode};
1108
1109    pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1110    where
1111        D: serde::de::Deserializer<'de>,
1112    {
1113        use serde::de::Error;
1114
1115        serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1116            let decoded = if s.contains(&b'+') {
1117                // AWS encodes spaces as `+` rather than `%20`, so we first need to handle this.
1118                let s = s
1119                    .iter()
1120                    .map(|c| if *c == b'+' { b' ' } else { *c })
1121                    .collect::<Vec<_>>();
1122                percent_decode(&s).decode_utf8().map(Into::into)
1123            } else {
1124                percent_decode(s).decode_utf8().map(Into::into)
1125            };
1126
1127            decoded
1128                .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1129        })
1130    }
1131
1132    pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1133    where
1134        S: serde::ser::Serializer,
1135    {
1136        serializer.serialize_str(
1137            &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1138        )
1139    }
1140}
1141
1142#[test]
1143fn test_key_deserialize() {
1144    let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1145    assert_eq!(
1146        S3Object {
1147            key: "noog nork".to_string(),
1148        },
1149        value
1150    );
1151
1152    let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1153    assert_eq!(
1154        S3Object {
1155            key: "noog+nork".to_string(),
1156        },
1157        value
1158    );
1159}
1160
1161#[test]
1162fn test_s3_testevent() {
1163    let value: S3TestEvent = serde_json::from_str(
1164        r#"{
1165        "Service":"Amazon S3",
1166        "Event":"s3:TestEvent",
1167        "Time":"2014-10-13T15:57:02.089Z",
1168        "Bucket":"bucketname",
1169        "RequestId":"5582815E1AEA5ADF",
1170        "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1171     }"#,
1172    )
1173    .unwrap();
1174
1175    assert_eq!(value.service, "Amazon S3".to_string());
1176    assert_eq!(value.bucket, "bucketname".to_string());
1177    assert_eq!(value.event.kind, "s3".to_string());
1178    assert_eq!(value.event.name, "TestEvent".to_string());
1179}
1180
1181#[test]
1182fn test_s3_sns_testevent() {
1183    let sns_value: SnsNotification = serde_json::from_str(
1184        r#"{
1185        "Type" : "Notification",
1186        "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1187        "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1188        "Subject" : "Testing publish to subscribed queues",
1189        "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1190        "Timestamp" : "2012-03-29T05:12:16.901Z",
1191        "SignatureVersion" : "1",
1192        "Signature" : "EXAMPLEnTrFPa3...",
1193        "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1194        "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1195     }"#,
1196    ).unwrap();
1197
1198    assert_eq!(
1199        sns_value.timestamp,
1200        DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1201            .unwrap()
1202            .to_utc()
1203    );
1204
1205    let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1206
1207    assert_eq!(value.service, "Amazon S3".to_string());
1208    assert_eq!(value.bucket, "bucketname".to_string());
1209    assert_eq!(value.event.kind, "s3".to_string());
1210    assert_eq!(value.event.name, "TestEvent".to_string());
1211}
1212
1213#[test]
1214fn parse_sqs_config() {
1215    let config: Config = toml::from_str(
1216        r#"
1217            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1218        "#,
1219    )
1220    .unwrap();
1221    assert_eq!(
1222        config.queue_url,
1223        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1224    );
1225    assert!(config.deferred.is_none());
1226
1227    let config: Config = toml::from_str(
1228        r#"
1229            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1230            [deferred]
1231            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1232            max_age_secs = 3600
1233        "#,
1234    )
1235    .unwrap();
1236    assert_eq!(
1237        config.queue_url,
1238        "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1239    );
1240    let Some(deferred) = config.deferred else {
1241        panic!("Expected deferred config");
1242    };
1243    assert_eq!(
1244        deferred.queue_url,
1245        "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1246    );
1247    assert_eq!(deferred.max_age_secs, 3600);
1248
1249    let test: Result<Config, toml::de::Error> = toml::from_str(
1250        r#"
1251            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1252            [deferred]
1253            max_age_secs = 3600
1254        "#,
1255    );
1256    assert!(test.is_err());
1257
1258    let test: Result<Config, toml::de::Error> = toml::from_str(
1259        r#"
1260            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1261            [deferred]
1262            queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1263        "#,
1264    );
1265    assert!(test.is_err());
1266}