1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 panic,
6 sync::{Arc, LazyLock},
7 time::{Duration, Instant},
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12 Client as SqsClient,
13 operation::{
14 delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15 receive_message::ReceiveMessageError,
16 send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17 },
18 types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use tracing::Instrument;
32use vector_lib::{
33 codecs::decoding::FramingError,
34 config::{LegacyKey, LogNamespace, log_schema},
35 configurable::configurable_component,
36 event::MaybeAsLogMut,
37 internal_event::{
38 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
39 },
40 lookup::{PathPrefix, metadata_path, path},
41 source_sender::SendError,
42};
43
44use crate::{
45 SourceSender,
46 aws::AwsTimeout,
47 codecs::Decoder,
48 common::backoff::ExponentialBackoff,
49 config::{SourceAcknowledgementsConfig, SourceContext},
50 event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
51 internal_events::{
52 EventsReceived, S3ObjectProcessingFailed, S3ObjectProcessingSucceeded,
53 SqsMessageDeleteBatchError, SqsMessageDeletePartialError, SqsMessageDeleteSucceeded,
54 SqsMessageProcessingError, SqsMessageProcessingSucceeded, SqsMessageReceiveError,
55 SqsMessageReceiveSucceeded, SqsMessageSendBatchError, SqsMessageSentPartialError,
56 SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored, StreamClosedError,
57 },
58 line_agg::{self, LineAgg},
59 shutdown::ShutdownSignal,
60 sources::aws_s3::AwsS3Config,
61 tls::TlsConfig,
62};
63
64static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
65 LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
66
67#[serde_as]
69#[configurable_component]
70#[derive(Clone, Debug, Default)]
71#[serde(deny_unknown_fields)]
72pub(super) struct DeferredConfig {
73 #[configurable(metadata(
75 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
76 ))]
77 #[configurable(validation(format = "uri"))]
78 pub(super) queue_url: String,
79
80 #[configurable(metadata(docs::type_unit = "seconds"))]
84 #[configurable(metadata(docs::examples = 3600))]
85 pub(super) max_age_secs: u64,
86}
87
88#[serde_as]
93#[configurable_component]
94#[derive(Clone, Debug, Derivative)]
95#[derivative(Default)]
96#[serde(deny_unknown_fields)]
97pub(super) struct Config {
98 #[configurable(metadata(
100 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
101 ))]
102 #[configurable(validation(format = "uri"))]
103 pub(super) queue_url: String,
104
105 #[serde(default = "default_poll_secs")]
112 #[derivative(Default(value = "default_poll_secs()"))]
113 #[configurable(metadata(docs::type_unit = "seconds"))]
114 pub(super) poll_secs: u32,
115
116 #[serde(default = "default_visibility_timeout_secs")]
125 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
126 #[configurable(metadata(docs::type_unit = "seconds"))]
127 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
128 pub(super) visibility_timeout_secs: u32,
129
130 #[serde(default = "default_true")]
134 #[derivative(Default(value = "default_true()"))]
135 pub(super) delete_message: bool,
136
137 #[serde(default = "default_true")]
141 #[derivative(Default(value = "default_true()"))]
142 pub(super) delete_failed_message: bool,
143
144 #[configurable(metadata(docs::type_unit = "tasks"))]
154 #[configurable(metadata(docs::examples = 5))]
155 pub(super) client_concurrency: Option<NonZeroUsize>,
156
157 #[serde(default = "default_max_number_of_messages")]
165 #[derivative(Default(value = "default_max_number_of_messages()"))]
166 #[configurable(metadata(docs::human_name = "Max Messages"))]
167 #[configurable(metadata(docs::examples = 1))]
168 pub(super) max_number_of_messages: u32,
169
170 #[configurable(derived)]
171 #[serde(default)]
172 #[derivative(Default)]
173 pub(super) tls_options: Option<TlsConfig>,
174
175 #[configurable(derived)]
178 #[derivative(Default)]
179 #[serde(default)]
180 #[serde(flatten)]
181 pub(super) timeout: Option<AwsTimeout>,
182
183 #[configurable(derived)]
185 pub(super) deferred: Option<DeferredConfig>,
186}
187
188const fn default_poll_secs() -> u32 {
189 15
190}
191
192const fn default_visibility_timeout_secs() -> u32 {
193 300
194}
195
196const fn default_max_number_of_messages() -> u32 {
197 10
198}
199
200const fn default_true() -> bool {
201 true
202}
203
204#[derive(Debug, Snafu)]
205pub(super) enum IngestorNewError {
206 #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
207 InvalidNumberOfMessages { messages: u32 },
208}
209
210#[allow(clippy::large_enum_variant)]
211#[derive(Debug, Snafu)]
212pub enum ProcessingError {
213 #[snafu(display(
214 "Could not parse SQS message with id {} as S3 notification: {}",
215 message_id,
216 source
217 ))]
218 InvalidSqsMessage {
219 source: serde_json::Error,
220 message_id: String,
221 },
222 #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
223 GetObject {
224 source: SdkError<GetObjectError, HttpResponse>,
225 bucket: String,
226 key: String,
227 },
228 #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
229 ReadObject {
230 source: Box<dyn FramingError>,
231 bucket: String,
232 key: String,
233 },
234 #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
235 PipelineSend {
236 source: vector_lib::source_sender::SendError,
237 bucket: String,
238 key: String,
239 },
240 #[snafu(display(
241 "Object notification for s3://{}/{} is a bucket in another region: {}",
242 bucket,
243 key,
244 region
245 ))]
246 WrongRegion {
247 region: String,
248 bucket: String,
249 key: String,
250 },
251 #[snafu(display("Unsupported S3 event version: {}.", version,))]
252 UnsupportedS3EventVersion { version: semver::Version },
253 #[snafu(display(
254 "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
255 region,
256 bucket,
257 key
258 ))]
259 ErrorAcknowledgement {
260 region: String,
261 bucket: String,
262 key: String,
263 },
264 #[snafu(display(
265 "File s3://{}/{} too old. Forwarded to deferred queue {}",
266 bucket,
267 key,
268 deferred_queue
269 ))]
270 FileTooOld {
271 bucket: String,
272 key: String,
273 deferred_queue: String,
274 },
275}
276
277pub struct State {
278 region: Region,
279
280 s3_client: S3Client,
281 sqs_client: SqsClient,
282
283 multiline: Option<line_agg::Config>,
284 compression: super::Compression,
285
286 queue_url: String,
287 poll_secs: i32,
288 max_number_of_messages: i32,
289 client_concurrency: usize,
290 visibility_timeout_secs: i32,
291 delete_message: bool,
292 delete_failed_message: bool,
293 decoder: Decoder,
294
295 deferred: Option<DeferredConfig>,
296}
297
298pub(super) struct Ingestor {
299 state: Arc<State>,
300}
301
302impl Ingestor {
303 pub(super) async fn new(
304 region: Region,
305 sqs_client: SqsClient,
306 s3_client: S3Client,
307 config: Config,
308 compression: super::Compression,
309 multiline: Option<line_agg::Config>,
310 decoder: Decoder,
311 ) -> Result<Ingestor, IngestorNewError> {
312 if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
313 return Err(IngestorNewError::InvalidNumberOfMessages {
314 messages: config.max_number_of_messages,
315 });
316 }
317 let state = Arc::new(State {
318 region,
319
320 s3_client,
321 sqs_client,
322
323 compression,
324 multiline,
325
326 queue_url: config.queue_url,
327 poll_secs: config.poll_secs as i32,
328 max_number_of_messages: config.max_number_of_messages as i32,
329 client_concurrency: config
330 .client_concurrency
331 .map(|n| n.get())
332 .unwrap_or_else(crate::num_threads),
333 visibility_timeout_secs: config.visibility_timeout_secs as i32,
334 delete_message: config.delete_message,
335 delete_failed_message: config.delete_failed_message,
336 decoder,
337
338 deferred: config.deferred,
339 });
340
341 Ok(Ingestor { state })
342 }
343
344 pub(super) async fn run(
345 self,
346 cx: SourceContext,
347 acknowledgements: SourceAcknowledgementsConfig,
348 log_namespace: LogNamespace,
349 ) -> Result<(), ()> {
350 let acknowledgements = cx.do_acknowledgements(acknowledgements);
351 let mut handles = Vec::new();
352 for _ in 0..self.state.client_concurrency {
353 let process = IngestorProcess::new(
354 Arc::clone(&self.state),
355 cx.out.clone(),
356 cx.shutdown.clone(),
357 log_namespace,
358 acknowledgements,
359 );
360 let fut = process.run();
361 let handle = tokio::spawn(fut.in_current_span());
362 handles.push(handle);
363 }
364
365 for handle in handles.drain(..) {
368 if let Err(e) = handle.await
369 && e.is_panic()
370 {
371 panic::resume_unwind(e.into_panic());
372 }
373 }
374
375 Ok(())
376 }
377}
378
379pub struct IngestorProcess {
380 state: Arc<State>,
381 out: SourceSender,
382 shutdown: ShutdownSignal,
383 acknowledgements: bool,
384 log_namespace: LogNamespace,
385 bytes_received: Registered<BytesReceived>,
386 events_received: Registered<EventsReceived>,
387 backoff: ExponentialBackoff,
388}
389
390impl IngestorProcess {
391 pub fn new(
392 state: Arc<State>,
393 out: SourceSender,
394 shutdown: ShutdownSignal,
395 log_namespace: LogNamespace,
396 acknowledgements: bool,
397 ) -> Self {
398 Self {
399 state,
400 out,
401 shutdown,
402 acknowledgements,
403 log_namespace,
404 bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
405 events_received: register!(EventsReceived),
406 backoff: ExponentialBackoff::default().max_delay(Duration::from_secs(30)),
407 }
408 }
409
410 async fn run(mut self) {
411 let shutdown = self.shutdown.clone().fuse();
412 pin!(shutdown);
413
414 loop {
415 select! {
416 _ = &mut shutdown => break,
417 result = self.run_once() => {
418 match result {
419 Ok(()) => {
420 self.backoff.reset();
422 }
423 Err(_) => {
424 let delay = self.backoff.next().expect("backoff never ends");
425 trace!(
426 delay_ms = delay.as_millis(),
427 "`run_once` failed, will retry after delay.",
428 );
429 tokio::time::sleep(delay).await;
430 }
431 }
432 },
433 }
434 }
435 }
436
437 async fn run_once(&mut self) -> Result<(), ()> {
438 let messages = match self.receive_messages().await {
439 Ok(messages) => {
440 emit!(SqsMessageReceiveSucceeded {
441 count: messages.len(),
442 });
443 messages
444 }
445 Err(err) => {
446 emit!(SqsMessageReceiveError { error: &err });
447 return Err(());
448 }
449 };
450
451 let mut delete_entries = Vec::new();
452 let mut deferred_entries = Vec::new();
453 for message in messages {
454 let receipt_handle = match message.receipt_handle {
455 None => {
456 warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
460 continue;
461 }
462 Some(ref handle) => handle.to_owned(),
463 };
464
465 let message_id = message
466 .message_id
467 .clone()
468 .unwrap_or_else(|| "<unknown>".to_owned());
469 match self.handle_sqs_message(message.clone()).await {
470 Ok(()) => {
471 emit!(SqsMessageProcessingSucceeded {
472 message_id: &message_id
473 });
474 if self.state.delete_message {
475 trace!(
476 message = "Queued SQS message for deletion.",
477 id = message_id,
478 receipt_handle = receipt_handle,
479 );
480 delete_entries.push(
481 DeleteMessageBatchRequestEntry::builder()
482 .id(message_id.clone())
483 .receipt_handle(receipt_handle)
484 .build()
485 .expect("all required builder params specified"),
486 );
487 }
488 }
489 Err(err) => {
490 match err {
491 ProcessingError::FileTooOld { .. } => {
492 emit!(SqsMessageProcessingSucceeded {
493 message_id: &message_id
494 });
495 if let Some(deferred) = &self.state.deferred {
496 trace!(
497 message = "Forwarding message to deferred queue.",
498 id = message_id,
499 receipt_handle = receipt_handle,
500 deferred_queue = deferred.queue_url,
501 );
502
503 deferred_entries.push(
504 SendMessageBatchRequestEntry::builder()
505 .id(message_id.clone())
506 .message_body(message.body.unwrap_or_default())
507 .build()
508 .expect("all required builder params specified"),
509 );
510 }
511 if self.state.delete_message {
513 trace!(
514 message = "Queued SQS message for deletion.",
515 id = message_id,
516 receipt_handle = receipt_handle,
517 );
518 delete_entries.push(
519 DeleteMessageBatchRequestEntry::builder()
520 .id(message_id)
521 .receipt_handle(receipt_handle)
522 .build()
523 .expect("all required builder params specified"),
524 );
525 }
526 }
527 _ => {
528 emit!(SqsMessageProcessingError {
529 message_id: &message_id,
530 error: &err,
531 });
532 }
533 }
534 }
535 }
536 }
537
538 if !deferred_entries.is_empty() {
540 let Some(deferred) = &self.state.deferred else {
541 warn!("Deferred queue not configured, but received deferred entries.");
542 return Ok(());
543 };
544 let cloned_entries = deferred_entries.clone();
545 match self
546 .send_messages(deferred_entries, deferred.queue_url.clone())
547 .await
548 {
549 Ok(result) => {
550 if !result.successful.is_empty() {
551 emit!(SqsMessageSentSucceeded {
552 message_ids: result.successful,
553 })
554 }
555
556 if !result.failed.is_empty() {
557 emit!(SqsMessageSentPartialError {
558 entries: result.failed
559 })
560 }
561 }
562 Err(err) => {
563 emit!(SqsMessageSendBatchError {
564 entries: cloned_entries,
565 error: err,
566 });
567 }
568 }
569 }
570 if !delete_entries.is_empty() {
571 let cloned_entries = delete_entries.clone();
573 match self.delete_messages(delete_entries).await {
574 Ok(result) => {
575 if !result.successful.is_empty() {
578 emit!(SqsMessageDeleteSucceeded {
579 message_ids: result.successful,
580 });
581 }
582
583 if !result.failed.is_empty() {
584 emit!(SqsMessageDeletePartialError {
585 entries: result.failed
586 });
587 }
588 }
589 Err(err) => {
590 emit!(SqsMessageDeleteBatchError {
591 entries: cloned_entries,
592 error: err,
593 });
594 }
595 }
596 }
597 Ok(())
598 }
599
600 async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
601 let sqs_body = message.body.unwrap_or_default();
602 let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
603 .map(|notification| notification.message)
604 .unwrap_or(sqs_body);
605 let s3_event: SqsEvent =
606 serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
607 message_id: message
608 .message_id
609 .clone()
610 .unwrap_or_else(|| "<empty>".to_owned()),
611 })?;
612
613 match s3_event {
614 SqsEvent::TestEvent(_s3_test_event) => {
615 debug!(?message.message_id, message = "Found S3 Test Event.");
616 Ok(())
617 }
618 SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
619 }
620 }
621
622 async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
623 for record in s3_event.records {
624 self.handle_s3_event_record(record, self.log_namespace)
625 .await?
626 }
627 Ok(())
628 }
629
630 async fn handle_s3_event_record(
631 &mut self,
632 s3_event: S3EventRecord,
633 log_namespace: LogNamespace,
634 ) -> Result<(), ProcessingError> {
635 let event_version: semver::Version = s3_event.event_version.clone().into();
636 if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
637 return Err(ProcessingError::UnsupportedS3EventVersion {
638 version: event_version.clone(),
639 });
640 }
641
642 if s3_event.event_name.kind != "ObjectCreated" {
643 emit!(SqsS3EventRecordInvalidEventIgnored {
644 bucket: &s3_event.s3.bucket.name,
645 key: &s3_event.s3.object.key,
646 kind: &s3_event.event_name.kind,
647 name: &s3_event.event_name.name,
648 });
649 return Ok(());
650 }
651
652 if self.state.region.as_ref() != s3_event.aws_region.as_str() {
655 return Err(ProcessingError::WrongRegion {
656 bucket: s3_event.s3.bucket.name.clone(),
657 key: s3_event.s3.object.key.clone(),
658 region: s3_event.aws_region,
659 });
660 }
661
662 if let Some(deferred) = &self.state.deferred {
663 let delta = Utc::now() - s3_event.event_time;
664 if delta.num_seconds() > deferred.max_age_secs as i64 {
665 return Err(ProcessingError::FileTooOld {
666 bucket: s3_event.s3.bucket.name.clone(),
667 key: s3_event.s3.object.key.clone(),
668 deferred_queue: deferred.queue_url.clone(),
669 });
670 }
671 }
672
673 let download_start = Instant::now();
674
675 let object_result = self
676 .state
677 .s3_client
678 .get_object()
679 .bucket(s3_event.s3.bucket.name.clone())
680 .key(s3_event.s3.object.key.clone())
681 .send()
682 .await
683 .context(GetObjectSnafu {
684 bucket: s3_event.s3.bucket.name.clone(),
685 key: s3_event.s3.object.key.clone(),
686 });
687
688 let object = object_result?;
689
690 debug!(
691 message = "Got S3 object from SQS notification.",
692 bucket = s3_event.s3.bucket.name,
693 key = s3_event.s3.object.key,
694 );
695
696 let metadata = object.metadata;
697
698 let timestamp = object.last_modified.map(|ts| {
699 Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
700 .single()
701 .expect("invalid timestamp")
702 });
703
704 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
705 let object_reader = super::s3_object_decoder(
706 self.state.compression,
707 &s3_event.s3.object.key,
708 object.content_encoding.as_deref(),
709 object.content_type.as_deref(),
710 object.body,
711 )
712 .await;
713
714 let mut read_error = None;
727 let bytes_received = self.bytes_received.clone();
728 let events_received = self.events_received.clone();
729 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
730 FramedRead::new(object_reader, self.state.decoder.framer.clone())
731 .map(|res| {
732 res.inspect(|bytes| {
733 bytes_received.emit(ByteSize(bytes.len()));
734 })
735 .map_err(|err| {
736 read_error = Some(err);
737 })
738 .ok()
739 })
740 .take_while(|res| ready(res.is_some()))
741 .map(|r| r.expect("validated by take_while")),
742 );
743
744 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
745 Some(config) => Box::new(
746 LineAgg::new(
747 lines.map(|line| ((), line, ())),
748 line_agg::Logic::new(config.clone()),
749 )
750 .map(|(_src, line, _context, _lastline_context)| line),
751 ),
752 None => lines,
753 };
754
755 let mut stream = lines.flat_map(|line| {
756 let events = match self.state.decoder.deserializer_parse(line) {
757 Ok((events, _events_size)) => events,
758 Err(_error) => {
759 SmallVec::new()
762 }
763 };
764
765 let events = events
766 .into_iter()
767 .map(|mut event: Event| {
768 event = event.with_batch_notifier_option(&batch);
769 if let Some(log_event) = event.maybe_as_log_mut() {
770 handle_single_log(
771 log_event,
772 log_namespace,
773 &s3_event,
774 &metadata,
775 timestamp,
776 );
777 }
778 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
779 event
780 })
781 .collect::<Vec<Event>>();
782 futures::stream::iter(events)
783 });
784
785 let send_error = match self.out.send_event_stream(&mut stream).await {
786 Ok(_) => None,
787 Err(SendError::Closed) => {
788 let (count, _) = stream.size_hint();
789 emit!(StreamClosedError { count });
790 Some(SendError::Closed)
791 }
792 Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
793 };
794
795 drop(stream);
798
799 let bucket = &s3_event.s3.bucket.name;
800 let duration = download_start.elapsed();
801
802 if read_error.is_some() {
803 emit!(S3ObjectProcessingFailed { bucket, duration });
804 } else {
805 emit!(S3ObjectProcessingSucceeded { bucket, duration });
806 }
807
808 drop(batch);
811
812 if let Some(error) = read_error {
813 Err(ProcessingError::ReadObject {
814 source: error,
815 bucket: s3_event.s3.bucket.name.clone(),
816 key: s3_event.s3.object.key.clone(),
817 })
818 } else if let Some(error) = send_error {
819 Err(ProcessingError::PipelineSend {
820 source: error,
821 bucket: s3_event.s3.bucket.name.clone(),
822 key: s3_event.s3.object.key.clone(),
823 })
824 } else {
825 match receiver {
826 None => Ok(()),
827 Some(receiver) => {
828 let result = receiver.await;
829 match result {
830 BatchStatus::Delivered => {
831 debug!(
832 message = "S3 object from SQS delivered.",
833 bucket = s3_event.s3.bucket.name,
834 key = s3_event.s3.object.key,
835 );
836 Ok(())
837 }
838 BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
839 bucket: s3_event.s3.bucket.name,
840 key: s3_event.s3.object.key,
841 region: s3_event.aws_region,
842 }),
843 BatchStatus::Rejected => {
844 if self.state.delete_failed_message {
845 warn!(
846 message =
847 "S3 object from SQS was rejected. Deleting failed message.",
848 bucket = s3_event.s3.bucket.name,
849 key = s3_event.s3.object.key,
850 );
851 Ok(())
852 } else {
853 Err(ProcessingError::ErrorAcknowledgement {
854 bucket: s3_event.s3.bucket.name,
855 key: s3_event.s3.object.key,
856 region: s3_event.aws_region,
857 })
858 }
859 }
860 }
861 }
862 }
863 }
864 }
865
866 async fn receive_messages(
867 &mut self,
868 ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
869 self.state
870 .sqs_client
871 .receive_message()
872 .queue_url(self.state.queue_url.clone())
873 .max_number_of_messages(self.state.max_number_of_messages)
874 .visibility_timeout(self.state.visibility_timeout_secs)
875 .wait_time_seconds(self.state.poll_secs)
876 .send()
877 .map_ok(|res| res.messages.unwrap_or_default())
878 .await
879 }
880
881 async fn delete_messages(
882 &mut self,
883 entries: Vec<DeleteMessageBatchRequestEntry>,
884 ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
885 self.state
886 .sqs_client
887 .delete_message_batch()
888 .queue_url(self.state.queue_url.clone())
889 .set_entries(Some(entries))
890 .send()
891 .await
892 }
893
894 async fn send_messages(
895 &mut self,
896 entries: Vec<SendMessageBatchRequestEntry>,
897 queue_url: String,
898 ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
899 self.state
900 .sqs_client
901 .send_message_batch()
902 .queue_url(queue_url.clone())
903 .set_entries(Some(entries))
904 .send()
905 .await
906 }
907}
908
909fn handle_single_log(
910 log: &mut LogEvent,
911 log_namespace: LogNamespace,
912 s3_event: &S3EventRecord,
913 metadata: &Option<HashMap<String, String>>,
914 timestamp: Option<DateTime<Utc>>,
915) {
916 log_namespace.insert_source_metadata(
917 AwsS3Config::NAME,
918 log,
919 Some(LegacyKey::Overwrite(path!("bucket"))),
920 path!("bucket"),
921 Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
922 );
923
924 log_namespace.insert_source_metadata(
925 AwsS3Config::NAME,
926 log,
927 Some(LegacyKey::Overwrite(path!("object"))),
928 path!("object"),
929 Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
930 );
931 log_namespace.insert_source_metadata(
932 AwsS3Config::NAME,
933 log,
934 Some(LegacyKey::Overwrite(path!("region"))),
935 path!("region"),
936 Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
937 );
938
939 if let Some(metadata) = metadata {
940 for (key, value) in metadata {
941 log_namespace.insert_source_metadata(
942 AwsS3Config::NAME,
943 log,
944 Some(LegacyKey::Overwrite(path!(key))),
945 path!("metadata", key.as_str()),
946 value.clone(),
947 );
948 }
949 }
950
951 log_namespace.insert_vector_metadata(
952 log,
953 log_schema().source_type_key(),
954 path!("source_type"),
955 Bytes::from_static(AwsS3Config::NAME.as_bytes()),
956 );
957
958 match log_namespace {
962 LogNamespace::Vector => {
963 if let Some(timestamp) = timestamp {
964 log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
965 }
966
967 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
968 }
969 LogNamespace::Legacy => {
970 if let Some(timestamp_key) = log_schema().timestamp_key() {
971 log.try_insert(
972 (PathPrefix::Event, timestamp_key),
973 timestamp.unwrap_or_else(Utc::now),
974 );
975 }
976 }
977 };
978}
979
980#[derive(Clone, Debug, Deserialize)]
982#[serde(rename_all = "PascalCase")]
983pub struct SnsNotification {
984 pub message: String,
985 pub timestamp: DateTime<Utc>,
986}
987
988#[derive(Clone, Debug, Deserialize)]
990#[serde(untagged)]
991enum SqsEvent {
992 Event(S3Event),
993 TestEvent(S3TestEvent),
994}
995
996#[derive(Clone, Debug, Deserialize)]
997#[serde(rename_all = "PascalCase")]
998pub struct S3TestEvent {
999 pub service: String,
1000 pub event: S3EventName,
1001 pub bucket: String,
1002}
1003
1004#[derive(Clone, Debug, Deserialize, Serialize)]
1006#[serde(rename_all = "PascalCase")]
1007pub struct S3Event {
1008 pub records: Vec<S3EventRecord>,
1009}
1010
1011#[derive(Clone, Debug, Deserialize, Serialize)]
1012#[serde(rename_all = "camelCase")]
1013pub struct S3EventRecord {
1014 pub event_version: S3EventVersion,
1015 pub event_source: String,
1016 pub aws_region: String,
1017 pub event_name: S3EventName,
1018 pub event_time: DateTime<Utc>,
1019
1020 pub s3: S3Message,
1021}
1022
1023#[derive(Clone, Debug)]
1024pub struct S3EventVersion {
1025 pub major: u64,
1026 pub minor: u64,
1027}
1028
1029impl From<S3EventVersion> for semver::Version {
1030 fn from(v: S3EventVersion) -> semver::Version {
1031 semver::Version::new(v.major, v.minor, 0)
1032 }
1033}
1034
1035impl<'de> Deserialize<'de> for S3EventVersion {
1038 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1039 where
1040 D: Deserializer<'de>,
1041 {
1042 use serde::de::Error;
1043
1044 let s = String::deserialize(deserializer)?;
1045
1046 let mut parts = s.splitn(2, '.');
1047
1048 let major = parts
1049 .next()
1050 .ok_or_else(|| D::Error::custom("Missing major version number"))?
1051 .parse::<u64>()
1052 .map_err(D::Error::custom)?;
1053
1054 let minor = parts
1055 .next()
1056 .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1057 .parse::<u64>()
1058 .map_err(D::Error::custom)?;
1059
1060 Ok(S3EventVersion { major, minor })
1061 }
1062}
1063
1064impl Serialize for S3EventVersion {
1065 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1066 where
1067 S: Serializer,
1068 {
1069 serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1070 }
1071}
1072
1073#[derive(Clone, Debug)]
1074pub struct S3EventName {
1075 pub kind: String,
1076 pub name: String,
1077}
1078
1079impl<'de> Deserialize<'de> for S3EventName {
1084 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1085 where
1086 D: Deserializer<'de>,
1087 {
1088 use serde::de::Error;
1089
1090 let s = String::deserialize(deserializer)?;
1091
1092 let mut parts = s.splitn(2, ':');
1093
1094 let kind = parts
1095 .next()
1096 .ok_or_else(|| D::Error::custom("Missing event kind"))?
1097 .parse::<String>()
1098 .map_err(D::Error::custom)?;
1099
1100 let name = parts
1101 .next()
1102 .ok_or_else(|| D::Error::custom("Missing event name"))?
1103 .parse::<String>()
1104 .map_err(D::Error::custom)?;
1105
1106 Ok(S3EventName { kind, name })
1107 }
1108}
1109
1110impl Serialize for S3EventName {
1111 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1112 where
1113 S: Serializer,
1114 {
1115 serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1116 }
1117}
1118
1119#[derive(Clone, Debug, Deserialize, Serialize)]
1120#[serde(rename_all = "camelCase")]
1121pub struct S3Message {
1122 pub bucket: S3Bucket,
1123 pub object: S3Object,
1124}
1125
1126#[derive(Clone, Debug, Deserialize, Serialize)]
1127#[serde(rename_all = "camelCase")]
1128pub struct S3Bucket {
1129 pub name: String,
1130}
1131
1132#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1133#[serde(rename_all = "camelCase")]
1134pub struct S3Object {
1135 #[serde(with = "urlencoded_string")]
1138 pub key: String,
1139}
1140
1141mod urlencoded_string {
1142 use percent_encoding::{percent_decode, utf8_percent_encode};
1143
1144 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1145 where
1146 D: serde::de::Deserializer<'de>,
1147 {
1148 use serde::de::Error;
1149
1150 serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1151 let decoded = if s.contains(&b'+') {
1152 let s = s
1154 .iter()
1155 .map(|c| if *c == b'+' { b' ' } else { *c })
1156 .collect::<Vec<_>>();
1157 percent_decode(&s).decode_utf8().map(Into::into)
1158 } else {
1159 percent_decode(s).decode_utf8().map(Into::into)
1160 };
1161
1162 decoded
1163 .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1164 })
1165 }
1166
1167 pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1168 where
1169 S: serde::ser::Serializer,
1170 {
1171 serializer.serialize_str(
1172 &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1173 )
1174 }
1175}
1176
1177#[test]
1178fn test_key_deserialize() {
1179 let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1180 assert_eq!(
1181 S3Object {
1182 key: "noog nork".to_string(),
1183 },
1184 value
1185 );
1186
1187 let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1188 assert_eq!(
1189 S3Object {
1190 key: "noog+nork".to_string(),
1191 },
1192 value
1193 );
1194}
1195
1196#[test]
1197fn test_s3_testevent() {
1198 let value: S3TestEvent = serde_json::from_str(
1199 r#"{
1200 "Service":"Amazon S3",
1201 "Event":"s3:TestEvent",
1202 "Time":"2014-10-13T15:57:02.089Z",
1203 "Bucket":"bucketname",
1204 "RequestId":"5582815E1AEA5ADF",
1205 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1206 }"#,
1207 )
1208 .unwrap();
1209
1210 assert_eq!(value.service, "Amazon S3".to_string());
1211 assert_eq!(value.bucket, "bucketname".to_string());
1212 assert_eq!(value.event.kind, "s3".to_string());
1213 assert_eq!(value.event.name, "TestEvent".to_string());
1214}
1215
1216#[test]
1217fn test_s3_sns_testevent() {
1218 let sns_value: SnsNotification = serde_json::from_str(
1219 r#"{
1220 "Type" : "Notification",
1221 "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1222 "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1223 "Subject" : "Testing publish to subscribed queues",
1224 "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1225 "Timestamp" : "2012-03-29T05:12:16.901Z",
1226 "SignatureVersion" : "1",
1227 "Signature" : "EXAMPLEnTrFPa3...",
1228 "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1229 "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1230 }"#,
1231 ).unwrap();
1232
1233 assert_eq!(
1234 sns_value.timestamp,
1235 DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1236 .unwrap()
1237 .to_utc()
1238 );
1239
1240 let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1241
1242 assert_eq!(value.service, "Amazon S3".to_string());
1243 assert_eq!(value.bucket, "bucketname".to_string());
1244 assert_eq!(value.event.kind, "s3".to_string());
1245 assert_eq!(value.event.name, "TestEvent".to_string());
1246}
1247
1248#[test]
1249fn parse_sqs_config() {
1250 let config: Config = toml::from_str(
1251 r#"
1252 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1253 "#,
1254 )
1255 .unwrap();
1256 assert_eq!(
1257 config.queue_url,
1258 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1259 );
1260 assert!(config.deferred.is_none());
1261
1262 let config: Config = toml::from_str(
1263 r#"
1264 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1265 [deferred]
1266 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1267 max_age_secs = 3600
1268 "#,
1269 )
1270 .unwrap();
1271 assert_eq!(
1272 config.queue_url,
1273 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1274 );
1275 let Some(deferred) = config.deferred else {
1276 panic!("Expected deferred config");
1277 };
1278 assert_eq!(
1279 deferred.queue_url,
1280 "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1281 );
1282 assert_eq!(deferred.max_age_secs, 3600);
1283
1284 let test: Result<Config, toml::de::Error> = toml::from_str(
1285 r#"
1286 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1287 [deferred]
1288 max_age_secs = 3600
1289 "#,
1290 );
1291 assert!(test.is_err());
1292
1293 let test: Result<Config, toml::de::Error> = toml::from_str(
1294 r#"
1295 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1296 [deferred]
1297 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1298 "#,
1299 );
1300 assert!(test.is_err());
1301}