1use std::{
2 collections::HashMap,
3 future::ready,
4 num::NonZeroUsize,
5 panic,
6 sync::{Arc, LazyLock},
7};
8
9use aws_sdk_s3::{Client as S3Client, operation::get_object::GetObjectError};
10use aws_sdk_sqs::{
11 Client as SqsClient,
12 operation::{
13 delete_message_batch::{DeleteMessageBatchError, DeleteMessageBatchOutput},
14 receive_message::ReceiveMessageError,
15 send_message_batch::{SendMessageBatchError, SendMessageBatchOutput},
16 },
17 types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry},
18};
19use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
20use aws_types::region::Region;
21use bytes::Bytes;
22use chrono::{DateTime, TimeZone, Utc};
23use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
24use serde::{Deserialize, Deserializer, Serialize, Serializer};
25use serde_with::serde_as;
26use smallvec::SmallVec;
27use snafu::{ResultExt, Snafu};
28use tokio::{pin, select};
29use tokio_util::codec::FramedRead;
30use tracing::Instrument;
31use vector_lib::{
32 codecs::decoding::FramingError,
33 config::{LegacyKey, LogNamespace, log_schema},
34 configurable::configurable_component,
35 event::MaybeAsLogMut,
36 internal_event::{
37 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
38 },
39 lookup::{PathPrefix, metadata_path, path},
40};
41
42use crate::{
43 SourceSender,
44 aws::AwsTimeout,
45 codecs::Decoder,
46 config::{SourceAcknowledgementsConfig, SourceContext},
47 event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf, Event, LogEvent},
48 internal_events::{
49 EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
50 SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
51 SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsMessageSendBatchError,
52 SqsMessageSentPartialError, SqsMessageSentSucceeded, SqsS3EventRecordInvalidEventIgnored,
53 StreamClosedError,
54 },
55 line_agg::{self, LineAgg},
56 shutdown::ShutdownSignal,
57 sources::aws_s3::AwsS3Config,
58 tls::TlsConfig,
59};
60
61static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
62 LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
63
64#[serde_as]
66#[configurable_component]
67#[derive(Clone, Debug, Default)]
68#[serde(deny_unknown_fields)]
69pub(super) struct DeferredConfig {
70 #[configurable(metadata(
72 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
73 ))]
74 #[configurable(validation(format = "uri"))]
75 pub(super) queue_url: String,
76
77 #[configurable(metadata(docs::type_unit = "seconds"))]
81 #[configurable(metadata(docs::examples = 3600))]
82 pub(super) max_age_secs: u64,
83}
84
85#[serde_as]
90#[configurable_component]
91#[derive(Clone, Debug, Derivative)]
92#[derivative(Default)]
93#[serde(deny_unknown_fields)]
94pub(super) struct Config {
95 #[configurable(metadata(
97 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
98 ))]
99 #[configurable(validation(format = "uri"))]
100 pub(super) queue_url: String,
101
102 #[serde(default = "default_poll_secs")]
109 #[derivative(Default(value = "default_poll_secs()"))]
110 #[configurable(metadata(docs::type_unit = "seconds"))]
111 pub(super) poll_secs: u32,
112
113 #[serde(default = "default_visibility_timeout_secs")]
122 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
123 #[configurable(metadata(docs::type_unit = "seconds"))]
124 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
125 pub(super) visibility_timeout_secs: u32,
126
127 #[serde(default = "default_true")]
131 #[derivative(Default(value = "default_true()"))]
132 pub(super) delete_message: bool,
133
134 #[serde(default = "default_true")]
138 #[derivative(Default(value = "default_true()"))]
139 pub(super) delete_failed_message: bool,
140
141 #[configurable(metadata(docs::type_unit = "tasks"))]
151 #[configurable(metadata(docs::examples = 5))]
152 pub(super) client_concurrency: Option<NonZeroUsize>,
153
154 #[serde(default = "default_max_number_of_messages")]
162 #[derivative(Default(value = "default_max_number_of_messages()"))]
163 #[configurable(metadata(docs::human_name = "Max Messages"))]
164 #[configurable(metadata(docs::examples = 1))]
165 pub(super) max_number_of_messages: u32,
166
167 #[configurable(derived)]
168 #[serde(default)]
169 #[derivative(Default)]
170 pub(super) tls_options: Option<TlsConfig>,
171
172 #[configurable(derived)]
175 #[derivative(Default)]
176 #[serde(default)]
177 #[serde(flatten)]
178 pub(super) timeout: Option<AwsTimeout>,
179
180 #[configurable(derived)]
182 pub(super) deferred: Option<DeferredConfig>,
183}
184
185const fn default_poll_secs() -> u32 {
186 15
187}
188
189const fn default_visibility_timeout_secs() -> u32 {
190 300
191}
192
193const fn default_max_number_of_messages() -> u32 {
194 10
195}
196
197const fn default_true() -> bool {
198 true
199}
200
201#[derive(Debug, Snafu)]
202pub(super) enum IngestorNewError {
203 #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
204 InvalidNumberOfMessages { messages: u32 },
205}
206
207#[allow(clippy::large_enum_variant)]
208#[derive(Debug, Snafu)]
209pub enum ProcessingError {
210 #[snafu(display(
211 "Could not parse SQS message with id {} as S3 notification: {}",
212 message_id,
213 source
214 ))]
215 InvalidSqsMessage {
216 source: serde_json::Error,
217 message_id: String,
218 },
219 #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
220 GetObject {
221 source: SdkError<GetObjectError, HttpResponse>,
222 bucket: String,
223 key: String,
224 },
225 #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
226 ReadObject {
227 source: Box<dyn FramingError>,
228 bucket: String,
229 key: String,
230 },
231 #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
232 PipelineSend {
233 source: crate::source_sender::ClosedError,
234 bucket: String,
235 key: String,
236 },
237 #[snafu(display(
238 "Object notification for s3://{}/{} is a bucket in another region: {}",
239 bucket,
240 key,
241 region
242 ))]
243 WrongRegion {
244 region: String,
245 bucket: String,
246 key: String,
247 },
248 #[snafu(display("Unsupported S3 event version: {}.", version,))]
249 UnsupportedS3EventVersion { version: semver::Version },
250 #[snafu(display(
251 "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
252 region,
253 bucket,
254 key
255 ))]
256 ErrorAcknowledgement {
257 region: String,
258 bucket: String,
259 key: String,
260 },
261 #[snafu(display(
262 "File s3://{}/{} too old. Forwarded to deferred queue {}",
263 bucket,
264 key,
265 deferred_queue
266 ))]
267 FileTooOld {
268 bucket: String,
269 key: String,
270 deferred_queue: String,
271 },
272}
273
274pub struct State {
275 region: Region,
276
277 s3_client: S3Client,
278 sqs_client: SqsClient,
279
280 multiline: Option<line_agg::Config>,
281 compression: super::Compression,
282
283 queue_url: String,
284 poll_secs: i32,
285 max_number_of_messages: i32,
286 client_concurrency: usize,
287 visibility_timeout_secs: i32,
288 delete_message: bool,
289 delete_failed_message: bool,
290 decoder: Decoder,
291
292 deferred: Option<DeferredConfig>,
293}
294
295pub(super) struct Ingestor {
296 state: Arc<State>,
297}
298
299impl Ingestor {
300 pub(super) async fn new(
301 region: Region,
302 sqs_client: SqsClient,
303 s3_client: S3Client,
304 config: Config,
305 compression: super::Compression,
306 multiline: Option<line_agg::Config>,
307 decoder: Decoder,
308 ) -> Result<Ingestor, IngestorNewError> {
309 if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
310 return Err(IngestorNewError::InvalidNumberOfMessages {
311 messages: config.max_number_of_messages,
312 });
313 }
314 let state = Arc::new(State {
315 region,
316
317 s3_client,
318 sqs_client,
319
320 compression,
321 multiline,
322
323 queue_url: config.queue_url,
324 poll_secs: config.poll_secs as i32,
325 max_number_of_messages: config.max_number_of_messages as i32,
326 client_concurrency: config
327 .client_concurrency
328 .map(|n| n.get())
329 .unwrap_or_else(crate::num_threads),
330 visibility_timeout_secs: config.visibility_timeout_secs as i32,
331 delete_message: config.delete_message,
332 delete_failed_message: config.delete_failed_message,
333 decoder,
334
335 deferred: config.deferred,
336 });
337
338 Ok(Ingestor { state })
339 }
340
341 pub(super) async fn run(
342 self,
343 cx: SourceContext,
344 acknowledgements: SourceAcknowledgementsConfig,
345 log_namespace: LogNamespace,
346 ) -> Result<(), ()> {
347 let acknowledgements = cx.do_acknowledgements(acknowledgements);
348 let mut handles = Vec::new();
349 for _ in 0..self.state.client_concurrency {
350 let process = IngestorProcess::new(
351 Arc::clone(&self.state),
352 cx.out.clone(),
353 cx.shutdown.clone(),
354 log_namespace,
355 acknowledgements,
356 );
357 let fut = process.run();
358 let handle = tokio::spawn(fut.in_current_span());
359 handles.push(handle);
360 }
361
362 for handle in handles.drain(..) {
365 if let Err(e) = handle.await
366 && e.is_panic()
367 {
368 panic::resume_unwind(e.into_panic());
369 }
370 }
371
372 Ok(())
373 }
374}
375
376pub struct IngestorProcess {
377 state: Arc<State>,
378 out: SourceSender,
379 shutdown: ShutdownSignal,
380 acknowledgements: bool,
381 log_namespace: LogNamespace,
382 bytes_received: Registered<BytesReceived>,
383 events_received: Registered<EventsReceived>,
384}
385
386impl IngestorProcess {
387 pub fn new(
388 state: Arc<State>,
389 out: SourceSender,
390 shutdown: ShutdownSignal,
391 log_namespace: LogNamespace,
392 acknowledgements: bool,
393 ) -> Self {
394 Self {
395 state,
396 out,
397 shutdown,
398 acknowledgements,
399 log_namespace,
400 bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
401 events_received: register!(EventsReceived),
402 }
403 }
404
405 async fn run(mut self) {
406 let shutdown = self.shutdown.clone().fuse();
407 pin!(shutdown);
408
409 loop {
410 select! {
411 _ = &mut shutdown => break,
412 _ = self.run_once() => {},
413 }
414 }
415 }
416
417 async fn run_once(&mut self) {
418 let messages = self.receive_messages().await;
419 let messages = messages
420 .inspect(|messages| {
421 emit!(SqsMessageReceiveSucceeded {
422 count: messages.len(),
423 });
424 })
425 .inspect_err(|err| {
426 emit!(SqsMessageReceiveError { error: err });
427 })
428 .unwrap_or_default();
429
430 let mut delete_entries = Vec::new();
431 let mut deferred_entries = Vec::new();
432 for message in messages {
433 let receipt_handle = match message.receipt_handle {
434 None => {
435 warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
439 continue;
440 }
441 Some(ref handle) => handle.to_owned(),
442 };
443
444 let message_id = message
445 .message_id
446 .clone()
447 .unwrap_or_else(|| "<unknown>".to_owned());
448 match self.handle_sqs_message(message.clone()).await {
449 Ok(()) => {
450 emit!(SqsMessageProcessingSucceeded {
451 message_id: &message_id
452 });
453 if self.state.delete_message {
454 trace!(
455 message = "Queued SQS message for deletion.",
456 id = message_id,
457 receipt_handle = receipt_handle,
458 );
459 delete_entries.push(
460 DeleteMessageBatchRequestEntry::builder()
461 .id(message_id.clone())
462 .receipt_handle(receipt_handle)
463 .build()
464 .expect("all required builder params specified"),
465 );
466 }
467 }
468 Err(err) => {
469 match err {
470 ProcessingError::FileTooOld { .. } => {
471 emit!(SqsMessageProcessingSucceeded {
472 message_id: &message_id
473 });
474 if let Some(deferred) = &self.state.deferred {
475 trace!(
476 message = "Forwarding message to deferred queue.",
477 id = message_id,
478 receipt_handle = receipt_handle,
479 deferred_queue = deferred.queue_url,
480 );
481
482 deferred_entries.push(
483 SendMessageBatchRequestEntry::builder()
484 .id(message_id.clone())
485 .message_body(message.body.unwrap_or_default())
486 .build()
487 .expect("all required builder params specified"),
488 );
489 }
490 if self.state.delete_message {
492 trace!(
493 message = "Queued SQS message for deletion.",
494 id = message_id,
495 receipt_handle = receipt_handle,
496 );
497 delete_entries.push(
498 DeleteMessageBatchRequestEntry::builder()
499 .id(message_id)
500 .receipt_handle(receipt_handle)
501 .build()
502 .expect("all required builder params specified"),
503 );
504 }
505 }
506 _ => {
507 emit!(SqsMessageProcessingError {
508 message_id: &message_id,
509 error: &err,
510 });
511 }
512 }
513 }
514 }
515 }
516
517 if !deferred_entries.is_empty() {
519 let Some(deferred) = &self.state.deferred else {
520 warn!(
521 message = "Deferred queue not configured, but received deferred entries.",
522 internal_log_rate_limit = true
523 );
524 return;
525 };
526 let cloned_entries = deferred_entries.clone();
527 match self
528 .send_messages(deferred_entries, deferred.queue_url.clone())
529 .await
530 {
531 Ok(result) => {
532 if !result.successful.is_empty() {
533 emit!(SqsMessageSentSucceeded {
534 message_ids: result.successful,
535 })
536 }
537
538 if !result.failed.is_empty() {
539 emit!(SqsMessageSentPartialError {
540 entries: result.failed
541 })
542 }
543 }
544 Err(err) => {
545 emit!(SqsMessageSendBatchError {
546 entries: cloned_entries,
547 error: err,
548 });
549 }
550 }
551 }
552 if !delete_entries.is_empty() {
553 let cloned_entries = delete_entries.clone();
555 match self.delete_messages(delete_entries).await {
556 Ok(result) => {
557 if !result.successful.is_empty() {
560 emit!(SqsMessageDeleteSucceeded {
561 message_ids: result.successful,
562 });
563 }
564
565 if !result.failed.is_empty() {
566 emit!(SqsMessageDeletePartialError {
567 entries: result.failed
568 });
569 }
570 }
571 Err(err) => {
572 emit!(SqsMessageDeleteBatchError {
573 entries: cloned_entries,
574 error: err,
575 });
576 }
577 }
578 }
579 }
580
581 async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
582 let sqs_body = message.body.unwrap_or_default();
583 let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
584 .map(|notification| notification.message)
585 .unwrap_or(sqs_body);
586 let s3_event: SqsEvent =
587 serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
588 message_id: message
589 .message_id
590 .clone()
591 .unwrap_or_else(|| "<empty>".to_owned()),
592 })?;
593
594 match s3_event {
595 SqsEvent::TestEvent(_s3_test_event) => {
596 debug!(?message.message_id, message = "Found S3 Test Event.");
597 Ok(())
598 }
599 SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
600 }
601 }
602
603 async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
604 for record in s3_event.records {
605 self.handle_s3_event_record(record, self.log_namespace)
606 .await?
607 }
608 Ok(())
609 }
610
611 async fn handle_s3_event_record(
612 &mut self,
613 s3_event: S3EventRecord,
614 log_namespace: LogNamespace,
615 ) -> Result<(), ProcessingError> {
616 let event_version: semver::Version = s3_event.event_version.clone().into();
617 if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
618 return Err(ProcessingError::UnsupportedS3EventVersion {
619 version: event_version.clone(),
620 });
621 }
622
623 if s3_event.event_name.kind != "ObjectCreated" {
624 emit!(SqsS3EventRecordInvalidEventIgnored {
625 bucket: &s3_event.s3.bucket.name,
626 key: &s3_event.s3.object.key,
627 kind: &s3_event.event_name.kind,
628 name: &s3_event.event_name.name,
629 });
630 return Ok(());
631 }
632
633 if self.state.region.as_ref() != s3_event.aws_region.as_str() {
636 return Err(ProcessingError::WrongRegion {
637 bucket: s3_event.s3.bucket.name.clone(),
638 key: s3_event.s3.object.key.clone(),
639 region: s3_event.aws_region,
640 });
641 }
642
643 if let Some(deferred) = &self.state.deferred {
644 let delta = Utc::now() - s3_event.event_time;
645 if delta.num_seconds() > deferred.max_age_secs as i64 {
646 return Err(ProcessingError::FileTooOld {
647 bucket: s3_event.s3.bucket.name.clone(),
648 key: s3_event.s3.object.key.clone(),
649 deferred_queue: deferred.queue_url.clone(),
650 });
651 }
652 }
653
654 let object_result = self
655 .state
656 .s3_client
657 .get_object()
658 .bucket(s3_event.s3.bucket.name.clone())
659 .key(s3_event.s3.object.key.clone())
660 .send()
661 .await
662 .context(GetObjectSnafu {
663 bucket: s3_event.s3.bucket.name.clone(),
664 key: s3_event.s3.object.key.clone(),
665 });
666
667 let object = object_result?;
668
669 debug!(
670 message = "Got S3 object from SQS notification.",
671 bucket = s3_event.s3.bucket.name,
672 key = s3_event.s3.object.key,
673 );
674
675 let metadata = object.metadata;
676
677 let timestamp = object.last_modified.map(|ts| {
678 Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
679 .single()
680 .expect("invalid timestamp")
681 });
682
683 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
684 let object_reader = super::s3_object_decoder(
685 self.state.compression,
686 &s3_event.s3.object.key,
687 object.content_encoding.as_deref(),
688 object.content_type.as_deref(),
689 object.body,
690 )
691 .await;
692
693 let mut read_error = None;
706 let bytes_received = self.bytes_received.clone();
707 let events_received = self.events_received.clone();
708 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
709 FramedRead::new(object_reader, self.state.decoder.framer.clone())
710 .map(|res| {
711 res.inspect(|bytes| {
712 bytes_received.emit(ByteSize(bytes.len()));
713 })
714 .map_err(|err| {
715 read_error = Some(err);
716 })
717 .ok()
718 })
719 .take_while(|res| ready(res.is_some()))
720 .map(|r| r.expect("validated by take_while")),
721 );
722
723 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
724 Some(config) => Box::new(
725 LineAgg::new(
726 lines.map(|line| ((), line, ())),
727 line_agg::Logic::new(config.clone()),
728 )
729 .map(|(_src, line, _context, _lastline_context)| line),
730 ),
731 None => lines,
732 };
733
734 let mut stream = lines.flat_map(|line| {
735 let events = match self.state.decoder.deserializer_parse(line) {
736 Ok((events, _events_size)) => events,
737 Err(_error) => {
738 SmallVec::new()
741 }
742 };
743
744 let events = events
745 .into_iter()
746 .map(|mut event: Event| {
747 event = event.with_batch_notifier_option(&batch);
748 if let Some(log_event) = event.maybe_as_log_mut() {
749 handle_single_log(
750 log_event,
751 log_namespace,
752 &s3_event,
753 &metadata,
754 timestamp,
755 );
756 }
757 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
758 event
759 })
760 .collect::<Vec<Event>>();
761 futures::stream::iter(events)
762 });
763
764 let send_error = match self.out.send_event_stream(&mut stream).await {
765 Ok(_) => None,
766 Err(_) => {
767 let (count, _) = stream.size_hint();
768 emit!(StreamClosedError { count });
769 Some(crate::source_sender::ClosedError)
770 }
771 };
772
773 drop(stream);
776
777 drop(batch);
780
781 if let Some(error) = read_error {
782 Err(ProcessingError::ReadObject {
783 source: error,
784 bucket: s3_event.s3.bucket.name.clone(),
785 key: s3_event.s3.object.key.clone(),
786 })
787 } else if let Some(error) = send_error {
788 Err(ProcessingError::PipelineSend {
789 source: error,
790 bucket: s3_event.s3.bucket.name.clone(),
791 key: s3_event.s3.object.key.clone(),
792 })
793 } else {
794 match receiver {
795 None => Ok(()),
796 Some(receiver) => {
797 let result = receiver.await;
798 match result {
799 BatchStatus::Delivered => {
800 debug!(
801 message = "S3 object from SQS delivered.",
802 bucket = s3_event.s3.bucket.name,
803 key = s3_event.s3.object.key,
804 );
805 Ok(())
806 }
807 BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
808 bucket: s3_event.s3.bucket.name,
809 key: s3_event.s3.object.key,
810 region: s3_event.aws_region,
811 }),
812 BatchStatus::Rejected => {
813 if self.state.delete_failed_message {
814 warn!(
815 message =
816 "S3 object from SQS was rejected. Deleting failed message.",
817 bucket = s3_event.s3.bucket.name,
818 key = s3_event.s3.object.key,
819 );
820 Ok(())
821 } else {
822 Err(ProcessingError::ErrorAcknowledgement {
823 bucket: s3_event.s3.bucket.name,
824 key: s3_event.s3.object.key,
825 region: s3_event.aws_region,
826 })
827 }
828 }
829 }
830 }
831 }
832 }
833 }
834
835 async fn receive_messages(
836 &mut self,
837 ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
838 self.state
839 .sqs_client
840 .receive_message()
841 .queue_url(self.state.queue_url.clone())
842 .max_number_of_messages(self.state.max_number_of_messages)
843 .visibility_timeout(self.state.visibility_timeout_secs)
844 .wait_time_seconds(self.state.poll_secs)
845 .send()
846 .map_ok(|res| res.messages.unwrap_or_default())
847 .await
848 }
849
850 async fn delete_messages(
851 &mut self,
852 entries: Vec<DeleteMessageBatchRequestEntry>,
853 ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
854 self.state
855 .sqs_client
856 .delete_message_batch()
857 .queue_url(self.state.queue_url.clone())
858 .set_entries(Some(entries))
859 .send()
860 .await
861 }
862
863 async fn send_messages(
864 &mut self,
865 entries: Vec<SendMessageBatchRequestEntry>,
866 queue_url: String,
867 ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
868 self.state
869 .sqs_client
870 .send_message_batch()
871 .queue_url(queue_url.clone())
872 .set_entries(Some(entries))
873 .send()
874 .await
875 }
876}
877
878fn handle_single_log(
879 log: &mut LogEvent,
880 log_namespace: LogNamespace,
881 s3_event: &S3EventRecord,
882 metadata: &Option<HashMap<String, String>>,
883 timestamp: Option<DateTime<Utc>>,
884) {
885 log_namespace.insert_source_metadata(
886 AwsS3Config::NAME,
887 log,
888 Some(LegacyKey::Overwrite(path!("bucket"))),
889 path!("bucket"),
890 Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
891 );
892
893 log_namespace.insert_source_metadata(
894 AwsS3Config::NAME,
895 log,
896 Some(LegacyKey::Overwrite(path!("object"))),
897 path!("object"),
898 Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
899 );
900 log_namespace.insert_source_metadata(
901 AwsS3Config::NAME,
902 log,
903 Some(LegacyKey::Overwrite(path!("region"))),
904 path!("region"),
905 Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
906 );
907
908 if let Some(metadata) = metadata {
909 for (key, value) in metadata {
910 log_namespace.insert_source_metadata(
911 AwsS3Config::NAME,
912 log,
913 Some(LegacyKey::Overwrite(path!(key))),
914 path!("metadata", key.as_str()),
915 value.clone(),
916 );
917 }
918 }
919
920 log_namespace.insert_vector_metadata(
921 log,
922 log_schema().source_type_key(),
923 path!("source_type"),
924 Bytes::from_static(AwsS3Config::NAME.as_bytes()),
925 );
926
927 match log_namespace {
931 LogNamespace::Vector => {
932 if let Some(timestamp) = timestamp {
933 log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
934 }
935
936 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
937 }
938 LogNamespace::Legacy => {
939 if let Some(timestamp_key) = log_schema().timestamp_key() {
940 log.try_insert(
941 (PathPrefix::Event, timestamp_key),
942 timestamp.unwrap_or_else(Utc::now),
943 );
944 }
945 }
946 };
947}
948
949#[derive(Clone, Debug, Deserialize)]
951#[serde(rename_all = "PascalCase")]
952pub struct SnsNotification {
953 pub message: String,
954 pub timestamp: DateTime<Utc>,
955}
956
957#[derive(Clone, Debug, Deserialize)]
959#[serde(untagged)]
960enum SqsEvent {
961 Event(S3Event),
962 TestEvent(S3TestEvent),
963}
964
965#[derive(Clone, Debug, Deserialize)]
966#[serde(rename_all = "PascalCase")]
967pub struct S3TestEvent {
968 pub service: String,
969 pub event: S3EventName,
970 pub bucket: String,
971}
972
973#[derive(Clone, Debug, Deserialize, Serialize)]
975#[serde(rename_all = "PascalCase")]
976pub struct S3Event {
977 pub records: Vec<S3EventRecord>,
978}
979
980#[derive(Clone, Debug, Deserialize, Serialize)]
981#[serde(rename_all = "camelCase")]
982pub struct S3EventRecord {
983 pub event_version: S3EventVersion,
984 pub event_source: String,
985 pub aws_region: String,
986 pub event_name: S3EventName,
987 pub event_time: DateTime<Utc>,
988
989 pub s3: S3Message,
990}
991
992#[derive(Clone, Debug)]
993pub struct S3EventVersion {
994 pub major: u64,
995 pub minor: u64,
996}
997
998impl From<S3EventVersion> for semver::Version {
999 fn from(v: S3EventVersion) -> semver::Version {
1000 semver::Version::new(v.major, v.minor, 0)
1001 }
1002}
1003
1004impl<'de> Deserialize<'de> for S3EventVersion {
1007 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1008 where
1009 D: Deserializer<'de>,
1010 {
1011 use serde::de::Error;
1012
1013 let s = String::deserialize(deserializer)?;
1014
1015 let mut parts = s.splitn(2, '.');
1016
1017 let major = parts
1018 .next()
1019 .ok_or_else(|| D::Error::custom("Missing major version number"))?
1020 .parse::<u64>()
1021 .map_err(D::Error::custom)?;
1022
1023 let minor = parts
1024 .next()
1025 .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1026 .parse::<u64>()
1027 .map_err(D::Error::custom)?;
1028
1029 Ok(S3EventVersion { major, minor })
1030 }
1031}
1032
1033impl Serialize for S3EventVersion {
1034 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1035 where
1036 S: Serializer,
1037 {
1038 serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1039 }
1040}
1041
1042#[derive(Clone, Debug)]
1043pub struct S3EventName {
1044 pub kind: String,
1045 pub name: String,
1046}
1047
1048impl<'de> Deserialize<'de> for S3EventName {
1053 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1054 where
1055 D: Deserializer<'de>,
1056 {
1057 use serde::de::Error;
1058
1059 let s = String::deserialize(deserializer)?;
1060
1061 let mut parts = s.splitn(2, ':');
1062
1063 let kind = parts
1064 .next()
1065 .ok_or_else(|| D::Error::custom("Missing event kind"))?
1066 .parse::<String>()
1067 .map_err(D::Error::custom)?;
1068
1069 let name = parts
1070 .next()
1071 .ok_or_else(|| D::Error::custom("Missing event name"))?
1072 .parse::<String>()
1073 .map_err(D::Error::custom)?;
1074
1075 Ok(S3EventName { kind, name })
1076 }
1077}
1078
1079impl Serialize for S3EventName {
1080 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1081 where
1082 S: Serializer,
1083 {
1084 serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1085 }
1086}
1087
1088#[derive(Clone, Debug, Deserialize, Serialize)]
1089#[serde(rename_all = "camelCase")]
1090pub struct S3Message {
1091 pub bucket: S3Bucket,
1092 pub object: S3Object,
1093}
1094
1095#[derive(Clone, Debug, Deserialize, Serialize)]
1096#[serde(rename_all = "camelCase")]
1097pub struct S3Bucket {
1098 pub name: String,
1099}
1100
1101#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1102#[serde(rename_all = "camelCase")]
1103pub struct S3Object {
1104 #[serde(with = "urlencoded_string")]
1107 pub key: String,
1108}
1109
1110mod urlencoded_string {
1111 use percent_encoding::{percent_decode, utf8_percent_encode};
1112
1113 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1114 where
1115 D: serde::de::Deserializer<'de>,
1116 {
1117 use serde::de::Error;
1118
1119 serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1120 let decoded = if s.contains(&b'+') {
1121 let s = s
1123 .iter()
1124 .map(|c| if *c == b'+' { b' ' } else { *c })
1125 .collect::<Vec<_>>();
1126 percent_decode(&s).decode_utf8().map(Into::into)
1127 } else {
1128 percent_decode(s).decode_utf8().map(Into::into)
1129 };
1130
1131 decoded
1132 .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1133 })
1134 }
1135
1136 pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1137 where
1138 S: serde::ser::Serializer,
1139 {
1140 serializer.serialize_str(
1141 &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1142 )
1143 }
1144}
1145
1146#[test]
1147fn test_key_deserialize() {
1148 let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1149 assert_eq!(
1150 S3Object {
1151 key: "noog nork".to_string(),
1152 },
1153 value
1154 );
1155
1156 let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1157 assert_eq!(
1158 S3Object {
1159 key: "noog+nork".to_string(),
1160 },
1161 value
1162 );
1163}
1164
1165#[test]
1166fn test_s3_testevent() {
1167 let value: S3TestEvent = serde_json::from_str(
1168 r#"{
1169 "Service":"Amazon S3",
1170 "Event":"s3:TestEvent",
1171 "Time":"2014-10-13T15:57:02.089Z",
1172 "Bucket":"bucketname",
1173 "RequestId":"5582815E1AEA5ADF",
1174 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1175 }"#,
1176 )
1177 .unwrap();
1178
1179 assert_eq!(value.service, "Amazon S3".to_string());
1180 assert_eq!(value.bucket, "bucketname".to_string());
1181 assert_eq!(value.event.kind, "s3".to_string());
1182 assert_eq!(value.event.name, "TestEvent".to_string());
1183}
1184
1185#[test]
1186fn test_s3_sns_testevent() {
1187 let sns_value: SnsNotification = serde_json::from_str(
1188 r#"{
1189 "Type" : "Notification",
1190 "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1191 "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1192 "Subject" : "Testing publish to subscribed queues",
1193 "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1194 "Timestamp" : "2012-03-29T05:12:16.901Z",
1195 "SignatureVersion" : "1",
1196 "Signature" : "EXAMPLEnTrFPa3...",
1197 "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1198 "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1199 }"#,
1200 ).unwrap();
1201
1202 assert_eq!(
1203 sns_value.timestamp,
1204 DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1205 .unwrap()
1206 .to_utc()
1207 );
1208
1209 let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1210
1211 assert_eq!(value.service, "Amazon S3".to_string());
1212 assert_eq!(value.bucket, "bucketname".to_string());
1213 assert_eq!(value.event.kind, "s3".to_string());
1214 assert_eq!(value.event.name, "TestEvent".to_string());
1215}
1216
1217#[test]
1218fn parse_sqs_config() {
1219 let config: Config = toml::from_str(
1220 r#"
1221 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1222 "#,
1223 )
1224 .unwrap();
1225 assert_eq!(
1226 config.queue_url,
1227 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1228 );
1229 assert!(config.deferred.is_none());
1230
1231 let config: Config = toml::from_str(
1232 r#"
1233 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1234 [deferred]
1235 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1236 max_age_secs = 3600
1237 "#,
1238 )
1239 .unwrap();
1240 assert_eq!(
1241 config.queue_url,
1242 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1243 );
1244 let Some(deferred) = config.deferred else {
1245 panic!("Expected deferred config");
1246 };
1247 assert_eq!(
1248 deferred.queue_url,
1249 "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1250 );
1251 assert_eq!(deferred.max_age_secs, 3600);
1252
1253 let test: Result<Config, toml::de::Error> = toml::from_str(
1254 r#"
1255 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1256 [deferred]
1257 max_age_secs = 3600
1258 "#,
1259 );
1260 assert!(test.is_err());
1261
1262 let test: Result<Config, toml::de::Error> = toml::from_str(
1263 r#"
1264 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1265 [deferred]
1266 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1267 "#,
1268 );
1269 assert!(test.is_err());
1270}