1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 panic,
6 sync::{Arc, LazyLock},
7 time::Duration,
8};
9
10use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
11use aws_sdk_sqs::{
12 Client as SqsClient,
13 operation::{
14 delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
15 receive_message::ReceiveMessageError,
16 send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
17 },
18 types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
19};
20use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
21use aws_types::region::Region;
22use bytes::Bytes;
23use chrono::{DateTime, TimeZone, Utc};
24use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
25use serde::{Deserialize, Deserializer, Serialize, Serializer};
26use serde_with::serde_as;
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use tokio::{pin, select};
30use tokio_util::codec::FramedRead;
31use tracing::Instrument;
32use vector_lib::{
33 codecs::decoding::FramingError,
34 config::{LegacyKey, LogNamespace, log_schema},
35 configurable::configurable_component,
36 event::MaybeAsLogMut,
37 internal_event::{
38 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
39 },
40 lookup::{PathPrefix, metadata_path, path},
41};
42
43use crate::{
44 SourceSender,
45 aws::AwsTimeout,
46 codecs::Decoder,
47 common::backoff::ExponentialBackoff,
48 config::{SourceAcknowledgementsConfig, SourceContext},
49 event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
50 internal_events::{
51 EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
52 SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
53 SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsMessageSendBatchError,
54 SqsMessageSentPartialError, SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored,
55 StreamClosedError,
56 },
57 line_agg::{self, LineAgg},
58 shutdown::ShutdownSignal,
59 sources::aws_s3::AwsS3Config,
60 tls::TlsConfig,
61};
62
63static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
64 LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
65
66#[serde_as]
68#[configurable_component]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub(super) struct DeferredConfig {
72 #[configurable(metadata(
74 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
75 ))]
76 #[configurable(validation(format = "uri"))]
77 pub(super) queue_url: String,
78
79 #[configurable(metadata(docs::type_unit = "seconds"))]
83 #[configurable(metadata(docs::examples = 3600))]
84 pub(super) max_age_secs: u64,
85}
86
87#[serde_as]
92#[configurable_component]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub(super) struct Config {
97 #[configurable(metadata(
99 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
100 ))]
101 #[configurable(validation(format = "uri"))]
102 pub(super) queue_url: String,
103
104 #[serde(default = "default_poll_secs")]
111 #[derivative(Default(value = "default_poll_secs()"))]
112 #[configurable(metadata(docs::type_unit = "seconds"))]
113 pub(super) poll_secs: u32,
114
115 #[serde(default = "default_visibility_timeout_secs")]
124 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
125 #[configurable(metadata(docs::type_unit = "seconds"))]
126 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
127 pub(super) visibility_timeout_secs: u32,
128
129 #[serde(default = "default_true")]
133 #[derivative(Default(value = "default_true()"))]
134 pub(super) delete_message: bool,
135
136 #[serde(default = "default_true")]
140 #[derivative(Default(value = "default_true()"))]
141 pub(super) delete_failed_message: bool,
142
143 #[configurable(metadata(docs::type_unit = "tasks"))]
153 #[configurable(metadata(docs::examples = 5))]
154 pub(super) client_concurrency: Option<NonZeroUsize>,
155
156 #[serde(default = "default_max_number_of_messages")]
164 #[derivative(Default(value = "default_max_number_of_messages()"))]
165 #[configurable(metadata(docs::human_name = "Max Messages"))]
166 #[configurable(metadata(docs::examples = 1))]
167 pub(super) max_number_of_messages: u32,
168
169 #[configurable(derived)]
170 #[serde(default)]
171 #[derivative(Default)]
172 pub(super) tls_options: Option<TlsConfig>,
173
174 #[configurable(derived)]
177 #[derivative(Default)]
178 #[serde(default)]
179 #[serde(flatten)]
180 pub(super) timeout: Option<AwsTimeout>,
181
182 #[configurable(derived)]
184 pub(super) deferred: Option<DeferredConfig>,
185}
186
187const fn default_poll_secs() -> u32 {
188 15
189}
190
191const fn default_visibility_timeout_secs() -> u32 {
192 300
193}
194
195const fn default_max_number_of_messages() -> u32 {
196 10
197}
198
199const fn default_true() -> bool {
200 true
201}
202
203#[derive(Debug, Snafu)]
204pub(super) enum IngestorNewError {
205 #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
206 InvalidNumberOfMessages { messages: u32 },
207}
208
209#[allow(clippy::large_enum_variant)]
210#[derive(Debug, Snafu)]
211pub enum ProcessingError {
212 #[snafu(display(
213 "Could not parse SQS message with id {} as S3 notification: {}",
214 message_id,
215 source
216 ))]
217 InvalidSqsMessage {
218 source: serde_json::Error,
219 message_id: String,
220 },
221 #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
222 GetObject {
223 source: SdkError<GetObjectError, HttpResponse>,
224 bucket: String,
225 key: String,
226 },
227 #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
228 ReadObject {
229 source: Box<dyn FramingError>,
230 bucket: String,
231 key: String,
232 },
233 #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
234 PipelineSend {
235 source: crate::source_sender::ClosedError,
236 bucket: String,
237 key: String,
238 },
239 #[snafu(display(
240 "Object notification for s3://{}/{} is a bucket in another region: {}",
241 bucket,
242 key,
243 region
244 ))]
245 WrongRegion {
246 region: String,
247 bucket: String,
248 key: String,
249 },
250 #[snafu(display("Unsupported S3 event version: {}.", version,))]
251 UnsupportedS3EventVersion { version: semver::Version },
252 #[snafu(display(
253 "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
254 region,
255 bucket,
256 key
257 ))]
258 ErrorAcknowledgement {
259 region: String,
260 bucket: String,
261 key: String,
262 },
263 #[snafu(display(
264 "File s3://{}/{} too old. Forwarded to deferred queue {}",
265 bucket,
266 key,
267 deferred_queue
268 ))]
269 FileTooOld {
270 bucket: String,
271 key: String,
272 deferred_queue: String,
273 },
274}
275
276pub struct State {
277 region: Region,
278
279 s3_client: S3Client,
280 sqs_client: SqsClient,
281
282 multiline: Option<line_agg::Config>,
283 compression: super::Compression,
284
285 queue_url: String,
286 poll_secs: i32,
287 max_number_of_messages: i32,
288 client_concurrency: usize,
289 visibility_timeout_secs: i32,
290 delete_message: bool,
291 delete_failed_message: bool,
292 decoder: Decoder,
293
294 deferred: Option<DeferredConfig>,
295}
296
297pub(super) struct Ingestor {
298 state: Arc<State>,
299}
300
301impl Ingestor {
302 pub(super) async fn new(
303 region: Region,
304 sqs_client: SqsClient,
305 s3_client: S3Client,
306 config: Config,
307 compression: super::Compression,
308 multiline: Option<line_agg::Config>,
309 decoder: Decoder,
310 ) -> Result<Ingestor, IngestorNewError> {
311 if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
312 return Err(IngestorNewError::InvalidNumberOfMessages {
313 messages: config.max_number_of_messages,
314 });
315 }
316 let state = Arc::new(State {
317 region,
318
319 s3_client,
320 sqs_client,
321
322 compression,
323 multiline,
324
325 queue_url: config.queue_url,
326 poll_secs: config.poll_secs as i32,
327 max_number_of_messages: config.max_number_of_messages as i32,
328 client_concurrency: config
329 .client_concurrency
330 .map(|n| n.get())
331 .unwrap_or_else(crate::num_threads),
332 visibility_timeout_secs: config.visibility_timeout_secs as i32,
333 delete_message: config.delete_message,
334 delete_failed_message: config.delete_failed_message,
335 decoder,
336
337 deferred: config.deferred,
338 });
339
340 Ok(Ingestor { state })
341 }
342
343 pub(super) async fn run(
344 self,
345 cx: SourceContext,
346 acknowledgements: SourceAcknowledgementsConfig,
347 log_namespace: LogNamespace,
348 ) -> Result<(), ()> {
349 let acknowledgements = cx.do_acknowledgements(acknowledgements);
350 let mut handles = Vec::new();
351 for _ in 0..self.state.client_concurrency {
352 let process = IngestorProcess::new(
353 Arc::clone(&self.state),
354 cx.out.clone(),
355 cx.shutdown.clone(),
356 log_namespace,
357 acknowledgements,
358 );
359 let fut = process.run();
360 let handle = tokio::spawn(fut.in_current_span());
361 handles.push(handle);
362 }
363
364 for handle in handles.drain(..) {
367 if let Err(e) = handle.await
368 && e.is_panic()
369 {
370 panic::resume_unwind(e.into_panic());
371 }
372 }
373
374 Ok(())
375 }
376}
377
378pub struct IngestorProcess {
379 state: Arc<State>,
380 out: SourceSender,
381 shutdown: ShutdownSignal,
382 acknowledgements: bool,
383 log_namespace: LogNamespace,
384 bytes_received: Registered<BytesReceived>,
385 events_received: Registered<EventsReceived>,
386 backoff: ExponentialBackoff,
387}
388
389impl IngestorProcess {
390 pub fn new(
391 state: Arc<State>,
392 out: SourceSender,
393 shutdown: ShutdownSignal,
394 log_namespace: LogNamespace,
395 acknowledgements: bool,
396 ) -> Self {
397 Self {
398 state,
399 out,
400 shutdown,
401 acknowledgements,
402 log_namespace,
403 bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
404 events_received: register!(EventsReceived),
405 backoff: ExponentialBackoff::from_millis(2)
406 .factor(250)
407 .max_delay(Duration::from_secs(30)),
408 }
409 }
410
411 async fn run(mut self) {
412 let shutdown = self.shutdown.clone().fuse();
413 pin!(shutdown);
414
415 loop {
416 select! {
417 _ = &mut shutdown => break,
418 result = self.run_once() => {
419 match result {
420 Ok(()) => {
421 self.backoff.reset();
423 }
424 Err(_) => {
425 let delay = self.backoff.next().expect("backoff never ends");
426 trace!(
427 delay_ms = delay.as_millis(),
428 "`run_once` failed, will retry after delay.",
429 );
430 tokio::time::sleep(delay).await;
431 }
432 }
433 },
434 }
435 }
436 }
437
438 async fn run_once(&mut self) -> Result<(), ()> {
439 let messages = match self.receive_messages().await {
440 Ok(messages) => {
441 emit!(SqsMessageReceiveSucceeded {
442 count: messages.len(),
443 });
444 messages
445 }
446 Err(err) => {
447 emit!(SqsMessageReceiveError { error: &err });
448 return Err(());
449 }
450 };
451
452 let mut delete_entries = Vec::new();
453 let mut deferred_entries = Vec::new();
454 for message in messages {
455 let receipt_handle = match message.receipt_handle {
456 None => {
457 warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
461 continue;
462 }
463 Some(ref handle) => handle.to_owned(),
464 };
465
466 let message_id = message
467 .message_id
468 .clone()
469 .unwrap_or_else(|| "<unknown>".to_owned());
470 match self.handle_sqs_message(message.clone()).await {
471 Ok(()) => {
472 emit!(SqsMessageProcessingSucceeded {
473 message_id: &message_id
474 });
475 if self.state.delete_message {
476 trace!(
477 message = "Queued SQS message for deletion.",
478 id = message_id,
479 receipt_handle = receipt_handle,
480 );
481 delete_entries.push(
482 DeleteMessageBatchRequestEntry::builder()
483 .id(message_id.clone())
484 .receipt_handle(receipt_handle)
485 .build()
486 .expect("all required builder params specified"),
487 );
488 }
489 }
490 Err(err) => {
491 match err {
492 ProcessingError::FileTooOld { .. } => {
493 emit!(SqsMessageProcessingSucceeded {
494 message_id: &message_id
495 });
496 if let Some(deferred) = &self.state.deferred {
497 trace!(
498 message = "Forwarding message to deferred queue.",
499 id = message_id,
500 receipt_handle = receipt_handle,
501 deferred_queue = deferred.queue_url,
502 );
503
504 deferred_entries.push(
505 SendMessageBatchRequestEntry::builder()
506 .id(message_id.clone())
507 .message_body(message.body.unwrap_or_default())
508 .build()
509 .expect("all required builder params specified"),
510 );
511 }
512 if self.state.delete_message {
514 trace!(
515 message = "Queued SQS message for deletion.",
516 id = message_id,
517 receipt_handle = receipt_handle,
518 );
519 delete_entries.push(
520 DeleteMessageBatchRequestEntry::builder()
521 .id(message_id)
522 .receipt_handle(receipt_handle)
523 .build()
524 .expect("all required builder params specified"),
525 );
526 }
527 }
528 _ => {
529 emit!(SqsMessageProcessingError {
530 message_id: &message_id,
531 error: &err,
532 });
533 }
534 }
535 }
536 }
537 }
538
539 if !deferred_entries.is_empty() {
541 let Some(deferred) = &self.state.deferred else {
542 warn!("Deferred queue not configured, but received deferred entries.");
543 return Ok(());
544 };
545 let cloned_entries = deferred_entries.clone();
546 match self
547 .send_messages(deferred_entries, deferred.queue_url.clone())
548 .await
549 {
550 Ok(result) => {
551 if !result.successful.is_empty() {
552 emit!(SqsMessageSentSucceeded {
553 message_ids: result.successful,
554 })
555 }
556
557 if !result.failed.is_empty() {
558 emit!(SqsMessageSentPartialError {
559 entries: result.failed
560 })
561 }
562 }
563 Err(err) => {
564 emit!(SqsMessageSendBatchError {
565 entries: cloned_entries,
566 error: err,
567 });
568 }
569 }
570 }
571 if !delete_entries.is_empty() {
572 let cloned_entries = delete_entries.clone();
574 match self.delete_messages(delete_entries).await {
575 Ok(result) => {
576 if !result.successful.is_empty() {
579 emit!(SqsMessageDeleteSucceeded {
580 message_ids: result.successful,
581 });
582 }
583
584 if !result.failed.is_empty() {
585 emit!(SqsMessageDeletePartialError {
586 entries: result.failed
587 });
588 }
589 }
590 Err(err) => {
591 emit!(SqsMessageDeleteBatchError {
592 entries: cloned_entries,
593 error: err,
594 });
595 }
596 }
597 }
598 Ok(())
599 }
600
601 async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
602 let sqs_body = message.body.unwrap_or_default();
603 let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
604 .map(|notification| notification.message)
605 .unwrap_or(sqs_body);
606 let s3_event: SqsEvent =
607 serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
608 message_id: message
609 .message_id
610 .clone()
611 .unwrap_or_else(|| "<empty>".to_owned()),
612 })?;
613
614 match s3_event {
615 SqsEvent::TestEvent(_s3_test_event) => {
616 debug!(?message.message_id, message = "Found S3 Test Event.");
617 Ok(())
618 }
619 SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
620 }
621 }
622
623 async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
624 for record in s3_event.records {
625 self.handle_s3_event_record(record, self.log_namespace)
626 .await?
627 }
628 Ok(())
629 }
630
631 async fn handle_s3_event_record(
632 &mut self,
633 s3_event: S3EventRecord,
634 log_namespace: LogNamespace,
635 ) -> Result<(), ProcessingError> {
636 let event_version: semver::Version = s3_event.event_version.clone().into();
637 if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
638 return Err(ProcessingError::UnsupportedS3EventVersion {
639 version: event_version.clone(),
640 });
641 }
642
643 if s3_event.event_name.kind != "ObjectCreated" {
644 emit!(SqsS3EventRecordInvalidEventIgnored {
645 bucket: &s3_event.s3.bucket.name,
646 key: &s3_event.s3.object.key,
647 kind: &s3_event.event_name.kind,
648 name: &s3_event.event_name.name,
649 });
650 return Ok(());
651 }
652
653 if self.state.region.as_ref() != s3_event.aws_region.as_str() {
656 return Err(ProcessingError::WrongRegion {
657 bucket: s3_event.s3.bucket.name.clone(),
658 key: s3_event.s3.object.key.clone(),
659 region: s3_event.aws_region,
660 });
661 }
662
663 if let Some(deferred) = &self.state.deferred {
664 let delta = Utc::now() - s3_event.event_time;
665 if delta.num_seconds() > deferred.max_age_secs as i64 {
666 return Err(ProcessingError::FileTooOld {
667 bucket: s3_event.s3.bucket.name.clone(),
668 key: s3_event.s3.object.key.clone(),
669 deferred_queue: deferred.queue_url.clone(),
670 });
671 }
672 }
673
674 let object_result = self
675 .state
676 .s3_client
677 .get_object()
678 .bucket(s3_event.s3.bucket.name.clone())
679 .key(s3_event.s3.object.key.clone())
680 .send()
681 .await
682 .context(GetObjectSnafu {
683 bucket: s3_event.s3.bucket.name.clone(),
684 key: s3_event.s3.object.key.clone(),
685 });
686
687 let object = object_result?;
688
689 debug!(
690 message = "Got S3 object from SQS notification.",
691 bucket = s3_event.s3.bucket.name,
692 key = s3_event.s3.object.key,
693 );
694
695 let metadata = object.metadata;
696
697 let timestamp = object.last_modified.map(|ts| {
698 Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
699 .single()
700 .expect("invalid timestamp")
701 });
702
703 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
704 let object_reader = super::s3_object_decoder(
705 self.state.compression,
706 &s3_event.s3.object.key,
707 object.content_encoding.as_deref(),
708 object.content_type.as_deref(),
709 object.body,
710 )
711 .await;
712
713 let mut read_error = None;
726 let bytes_received = self.bytes_received.clone();
727 let events_received = self.events_received.clone();
728 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
729 FramedRead::new(object_reader, self.state.decoder.framer.clone())
730 .map(|res| {
731 res.inspect(|bytes| {
732 bytes_received.emit(ByteSize(bytes.len()));
733 })
734 .map_err(|err| {
735 read_error = Some(err);
736 })
737 .ok()
738 })
739 .take_while(|res| ready(res.is_some()))
740 .map(|r| r.expect("validated by take_while")),
741 );
742
743 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
744 Some(config) => Box::new(
745 LineAgg::new(
746 lines.map(|line| ((), line, ())),
747 line_agg::Logic::new(config.clone()),
748 )
749 .map(|(_src, line, _context, _lastline_context)| line),
750 ),
751 None => lines,
752 };
753
754 let mut stream = lines.flat_map(|line| {
755 let events = match self.state.decoder.deserializer_parse(line) {
756 Ok((events, _events_size)) => events,
757 Err(_error) => {
758 SmallVec::new()
761 }
762 };
763
764 let events = events
765 .into_iter()
766 .map(|mut event: Event| {
767 event = event.with_batch_notifier_option(&batch);
768 if let Some(log_event) = event.maybe_as_log_mut() {
769 handle_single_log(
770 log_event,
771 log_namespace,
772 &s3_event,
773 &metadata,
774 timestamp,
775 );
776 }
777 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
778 event
779 })
780 .collect::<Vec<Event>>();
781 futures::stream::iter(events)
782 });
783
784 let send_error = match self.out.send_event_stream(&mut stream).await {
785 Ok(_) => None,
786 Err(_) => {
787 let (count, _) = stream.size_hint();
788 emit!(StreamClosedError { count });
789 Some(crate::source_sender::ClosedError)
790 }
791 };
792
793 drop(stream);
796
797 drop(batch);
800
801 if let Some(error) = read_error {
802 Err(ProcessingError::ReadObject {
803 source: error,
804 bucket: s3_event.s3.bucket.name.clone(),
805 key: s3_event.s3.object.key.clone(),
806 })
807 } else if let Some(error) = send_error {
808 Err(ProcessingError::PipelineSend {
809 source: error,
810 bucket: s3_event.s3.bucket.name.clone(),
811 key: s3_event.s3.object.key.clone(),
812 })
813 } else {
814 match receiver {
815 None => Ok(()),
816 Some(receiver) => {
817 let result = receiver.await;
818 match result {
819 BatchStatus::Delivered => {
820 debug!(
821 message = "S3 object from SQS delivered.",
822 bucket = s3_event.s3.bucket.name,
823 key = s3_event.s3.object.key,
824 );
825 Ok(())
826 }
827 BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
828 bucket: s3_event.s3.bucket.name,
829 key: s3_event.s3.object.key,
830 region: s3_event.aws_region,
831 }),
832 BatchStatus::Rejected => {
833 if self.state.delete_failed_message {
834 warn!(
835 message =
836 "S3 object from SQS was rejected. Deleting failed message.",
837 bucket = s3_event.s3.bucket.name,
838 key = s3_event.s3.object.key,
839 );
840 Ok(())
841 } else {
842 Err(ProcessingError::ErrorAcknowledgement {
843 bucket: s3_event.s3.bucket.name,
844 key: s3_event.s3.object.key,
845 region: s3_event.aws_region,
846 })
847 }
848 }
849 }
850 }
851 }
852 }
853 }
854
855 async fn receive_messages(
856 &mut self,
857 ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
858 self.state
859 .sqs_client
860 .receive_message()
861 .queue_url(self.state.queue_url.clone())
862 .max_number_of_messages(self.state.max_number_of_messages)
863 .visibility_timeout(self.state.visibility_timeout_secs)
864 .wait_time_seconds(self.state.poll_secs)
865 .send()
866 .map_ok(|res| res.messages.unwrap_or_default())
867 .await
868 }
869
870 async fn delete_messages(
871 &mut self,
872 entries: Vec<DeleteMessageBatchRequestEntry>,
873 ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
874 self.state
875 .sqs_client
876 .delete_message_batch()
877 .queue_url(self.state.queue_url.clone())
878 .set_entries(Some(entries))
879 .send()
880 .await
881 }
882
883 async fn send_messages(
884 &mut self,
885 entries: Vec<SendMessageBatchRequestEntry>,
886 queue_url: String,
887 ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
888 self.state
889 .sqs_client
890 .send_message_batch()
891 .queue_url(queue_url.clone())
892 .set_entries(Some(entries))
893 .send()
894 .await
895 }
896}
897
898fn handle_single_log(
899 log: &mut LogEvent,
900 log_namespace: LogNamespace,
901 s3_event: &S3EventRecord,
902 metadata: &Option<HashMap<String, String>>,
903 timestamp: Option<DateTime<Utc>>,
904) {
905 log_namespace.insert_source_metadata(
906 AwsS3Config::NAME,
907 log,
908 Some(LegacyKey::Overwrite(path!("bucket"))),
909 path!("bucket"),
910 Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
911 );
912
913 log_namespace.insert_source_metadata(
914 AwsS3Config::NAME,
915 log,
916 Some(LegacyKey::Overwrite(path!("object"))),
917 path!("object"),
918 Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
919 );
920 log_namespace.insert_source_metadata(
921 AwsS3Config::NAME,
922 log,
923 Some(LegacyKey::Overwrite(path!("region"))),
924 path!("region"),
925 Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
926 );
927
928 if let Some(metadata) = metadata {
929 for (key, value) in metadata {
930 log_namespace.insert_source_metadata(
931 AwsS3Config::NAME,
932 log,
933 Some(LegacyKey::Overwrite(path!(key))),
934 path!("metadata", key.as_str()),
935 value.clone(),
936 );
937 }
938 }
939
940 log_namespace.insert_vector_metadata(
941 log,
942 log_schema().source_type_key(),
943 path!("source_type"),
944 Bytes::from_static(AwsS3Config::NAME.as_bytes()),
945 );
946
947 match log_namespace {
951 LogNamespace::Vector => {
952 if let Some(timestamp) = timestamp {
953 log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
954 }
955
956 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
957 }
958 LogNamespace::Legacy => {
959 if let Some(timestamp_key) = log_schema().timestamp_key() {
960 log.try_insert(
961 (PathPrefix::Event, timestamp_key),
962 timestamp.unwrap_or_else(Utc::now),
963 );
964 }
965 }
966 };
967}
968
969#[derive(Clone, Debug, Deserialize)]
971#[serde(rename_all = "PascalCase")]
972pub struct SnsNotification {
973 pub message: String,
974 pub timestamp: DateTime<Utc>,
975}
976
977#[derive(Clone, Debug, Deserialize)]
979#[serde(untagged)]
980enum SqsEvent {
981 Event(S3Event),
982 TestEvent(S3TestEvent),
983}
984
985#[derive(Clone, Debug, Deserialize)]
986#[serde(rename_all = "PascalCase")]
987pub struct S3TestEvent {
988 pub service: String,
989 pub event: S3EventName,
990 pub bucket: String,
991}
992
993#[derive(Clone, Debug, Deserialize, Serialize)]
995#[serde(rename_all = "PascalCase")]
996pub struct S3Event {
997 pub records: Vec<S3EventRecord>,
998}
999
1000#[derive(Clone, Debug, Deserialize, Serialize)]
1001#[serde(rename_all = "camelCase")]
1002pub struct S3EventRecord {
1003 pub event_version: S3EventVersion,
1004 pub event_source: String,
1005 pub aws_region: String,
1006 pub event_name: S3EventName,
1007 pub event_time: DateTime<Utc>,
1008
1009 pub s3: S3Message,
1010}
1011
1012#[derive(Clone, Debug)]
1013pub struct S3EventVersion {
1014 pub major: u64,
1015 pub minor: u64,
1016}
1017
1018impl From<S3EventVersion> for semver::Version {
1019 fn from(v: S3EventVersion) -> semver::Version {
1020 semver::Version::new(v.major, v.minor, 0)
1021 }
1022}
1023
1024impl<'de> Deserialize<'de> for S3EventVersion {
1027 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1028 where
1029 D: Deserializer<'de>,
1030 {
1031 use serde::de::Error;
1032
1033 let s = String::deserialize(deserializer)?;
1034
1035 let mut parts = s.splitn(2, '.');
1036
1037 let major = parts
1038 .next()
1039 .ok_or_else(|| D::Error::custom("Missing major version number"))?
1040 .parse::<u64>()
1041 .map_err(D::Error::custom)?;
1042
1043 let minor = parts
1044 .next()
1045 .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1046 .parse::<u64>()
1047 .map_err(D::Error::custom)?;
1048
1049 Ok(S3EventVersion { major, minor })
1050 }
1051}
1052
1053impl Serialize for S3EventVersion {
1054 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1055 where
1056 S: Serializer,
1057 {
1058 serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1059 }
1060}
1061
1062#[derive(Clone, Debug)]
1063pub struct S3EventName {
1064 pub kind: String,
1065 pub name: String,
1066}
1067
1068impl<'de> Deserialize<'de> for S3EventName {
1073 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1074 where
1075 D: Deserializer<'de>,
1076 {
1077 use serde::de::Error;
1078
1079 let s = String::deserialize(deserializer)?;
1080
1081 let mut parts = s.splitn(2, ':');
1082
1083 let kind = parts
1084 .next()
1085 .ok_or_else(|| D::Error::custom("Missing event kind"))?
1086 .parse::<String>()
1087 .map_err(D::Error::custom)?;
1088
1089 let name = parts
1090 .next()
1091 .ok_or_else(|| D::Error::custom("Missing event name"))?
1092 .parse::<String>()
1093 .map_err(D::Error::custom)?;
1094
1095 Ok(S3EventName { kind, name })
1096 }
1097}
1098
1099impl Serialize for S3EventName {
1100 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1101 where
1102 S: Serializer,
1103 {
1104 serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1105 }
1106}
1107
1108#[derive(Clone, Debug, Deserialize, Serialize)]
1109#[serde(rename_all = "camelCase")]
1110pub struct S3Message {
1111 pub bucket: S3Bucket,
1112 pub object: S3Object,
1113}
1114
1115#[derive(Clone, Debug, Deserialize, Serialize)]
1116#[serde(rename_all = "camelCase")]
1117pub struct S3Bucket {
1118 pub name: String,
1119}
1120
1121#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1122#[serde(rename_all = "camelCase")]
1123pub struct S3Object {
1124 #[serde(with = "urlencoded_string")]
1127 pub key: String,
1128}
1129
1130mod urlencoded_string {
1131 use percent_encoding::{percent_decode, utf8_percent_encode};
1132
1133 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1134 where
1135 D: serde::de::Deserializer<'de>,
1136 {
1137 use serde::de::Error;
1138
1139 serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1140 let decoded = if s.contains(&b'+') {
1141 let s = s
1143 .iter()
1144 .map(|c| if *c == b'+' { b' ' } else { *c })
1145 .collect::<Vec<_>>();
1146 percent_decode(&s).decode_utf8().map(Into::into)
1147 } else {
1148 percent_decode(s).decode_utf8().map(Into::into)
1149 };
1150
1151 decoded
1152 .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1153 })
1154 }
1155
1156 pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1157 where
1158 S: serde::ser::Serializer,
1159 {
1160 serializer.serialize_str(
1161 &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1162 )
1163 }
1164}
1165
1166#[test]
1167fn test_key_deserialize() {
1168 let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1169 assert_eq!(
1170 S3Object {
1171 key: "noog nork".to_string(),
1172 },
1173 value
1174 );
1175
1176 let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1177 assert_eq!(
1178 S3Object {
1179 key: "noog+nork".to_string(),
1180 },
1181 value
1182 );
1183}
1184
1185#[test]
1186fn test_s3_testevent() {
1187 let value: S3TestEvent = serde_json::from_str(
1188 r#"{
1189 "Service":"Amazon S3",
1190 "Event":"s3:TestEvent",
1191 "Time":"2014-10-13T15:57:02.089Z",
1192 "Bucket":"bucketname",
1193 "RequestId":"5582815E1AEA5ADF",
1194 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1195 }"#,
1196 )
1197 .unwrap();
1198
1199 assert_eq!(value.service, "Amazon S3".to_string());
1200 assert_eq!(value.bucket, "bucketname".to_string());
1201 assert_eq!(value.event.kind, "s3".to_string());
1202 assert_eq!(value.event.name, "TestEvent".to_string());
1203}
1204
1205#[test]
1206fn test_s3_sns_testevent() {
1207 let sns_value: SnsNotification = serde_json::from_str(
1208 r#"{
1209 "Type" : "Notification",
1210 "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1211 "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1212 "Subject" : "Testing publish to subscribed queues",
1213 "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1214 "Timestamp" : "2012-03-29T05:12:16.901Z",
1215 "SignatureVersion" : "1",
1216 "Signature" : "EXAMPLEnTrFPa3...",
1217 "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1218 "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1219 }"#,
1220 ).unwrap();
1221
1222 assert_eq!(
1223 sns_value.timestamp,
1224 DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1225 .unwrap()
1226 .to_utc()
1227 );
1228
1229 let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1230
1231 assert_eq!(value.service, "Amazon S3".to_string());
1232 assert_eq!(value.bucket, "bucketname".to_string());
1233 assert_eq!(value.event.kind, "s3".to_string());
1234 assert_eq!(value.event.name, "TestEvent".to_string());
1235}
1236
1237#[test]
1238fn parse_sqs_config() {
1239 let config: Config = toml::from_str(
1240 r#"
1241 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1242 "#,
1243 )
1244 .unwrap();
1245 assert_eq!(
1246 config.queue_url,
1247 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1248 );
1249 assert!(config.deferred.is_none());
1250
1251 let config: Config = toml::from_str(
1252 r#"
1253 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1254 [deferred]
1255 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1256 max_age_secs = 3600
1257 "#,
1258 )
1259 .unwrap();
1260 assert_eq!(
1261 config.queue_url,
1262 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1263 );
1264 let Some(deferred) = config.deferred else {
1265 panic!("Expected deferred config");
1266 };
1267 assert_eq!(
1268 deferred.queue_url,
1269 "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1270 );
1271 assert_eq!(deferred.max_age_secs, 3600);
1272
1273 let test: Result<Config, toml::de::Error> = toml::from_str(
1274 r#"
1275 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1276 [deferred]
1277 max_age_secs = 3600
1278 "#,
1279 );
1280 assert!(test.is_err());
1281
1282 let test: Result<Config, toml::de::Error> = toml::from_str(
1283 r#"
1284 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1285 [deferred]
1286 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1287 "#,
1288 );
1289 assert!(test.is_err());
1290}