1use std::collections::HashMap;
2use std::{future::ready, num::NonZeroUsize, panic, sync::Arc, sync::LazyLock};
3
4use aws_sdk_s3::operation::get_object::GetObjectError;
5use aws_sdk_s3::Client as S3Client;
6use aws_sdk_sqs::operation::delete_message_batch::{
7 DeleteMessageBatchError, DeleteMessageBatchOutput,
8};
9use aws_sdk_sqs::operation::receive_message::ReceiveMessageError;
10use aws_sdk_sqs::operation::send_message_batch::{SendMessageBatchError, SendMessageBatchOutput};
11use aws_sdk_sqs::types::{DeleteMessageBatchRequestEntry, Message, SendMessageBatchRequestEntry};
12use aws_sdk_sqs::Client as SqsClient;
13use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
14use aws_smithy_runtime_api::client::result::SdkError;
15use aws_types::region::Region;
16use bytes::Bytes;
17use chrono::{DateTime, TimeZone, Utc};
18use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
19use serde::{Deserialize, Deserializer, Serialize, Serializer};
20use serde_with::serde_as;
21use smallvec::SmallVec;
22use snafu::{ResultExt, Snafu};
23use tokio::{pin, select};
24use tokio_util::codec::FramedRead;
25use tracing::Instrument;
26use vector_lib::codecs::decoding::FramingError;
27use vector_lib::configurable::configurable_component;
28use vector_lib::internal_event::{
29 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
30};
31
32use crate::codecs::Decoder;
33use crate::event::{Event, LogEvent};
34use crate::internal_events::{
35 SqsMessageSendBatchError, SqsMessageSentPartialError, SqsMessageSentSucceeded,
36};
37use crate::{
38 aws::AwsTimeout,
39 config::{SourceAcknowledgementsConfig, SourceContext},
40 event::{BatchNotifier, BatchStatus, EstimatedJsonEncodedSizeOf},
41 internal_events::{
42 EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
43 SqsMessageDeleteSucceeded, SqsMessageProcessingError, SqsMessageProcessingSucceeded,
44 SqsMessageReceiveError, SqsMessageReceiveSucceeded, SqsS3EventRecordInvalidEventIgnored,
45 StreamClosedError,
46 },
47 line_agg::{self, LineAgg},
48 shutdown::ShutdownSignal,
49 sources::aws_s3::AwsS3Config,
50 tls::TlsConfig,
51 SourceSender,
52};
53use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
54use vector_lib::event::MaybeAsLogMut;
55use vector_lib::lookup::{metadata_path, path, PathPrefix};
56
57static SUPPORTED_S3_EVENT_VERSION: LazyLock<semver::VersionReq> =
58 LazyLock::new(|| semver::VersionReq::parse("~2").unwrap());
59
60#[serde_as]
62#[configurable_component]
63#[derive(Clone, Debug, Default)]
64#[serde(deny_unknown_fields)]
65pub(super) struct DeferredConfig {
66 #[configurable(metadata(
68 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
69 ))]
70 #[configurable(validation(format = "uri"))]
71 pub(super) queue_url: String,
72
73 #[configurable(metadata(docs::type_unit = "seconds"))]
77 #[configurable(metadata(docs::examples = 3600))]
78 pub(super) max_age_secs: u64,
79}
80
81#[serde_as]
86#[configurable_component]
87#[derive(Clone, Debug, Derivative)]
88#[derivative(Default)]
89#[serde(deny_unknown_fields)]
90pub(super) struct Config {
91 #[configurable(metadata(
93 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
94 ))]
95 #[configurable(validation(format = "uri"))]
96 pub(super) queue_url: String,
97
98 #[serde(default = "default_poll_secs")]
105 #[derivative(Default(value = "default_poll_secs()"))]
106 #[configurable(metadata(docs::type_unit = "seconds"))]
107 pub(super) poll_secs: u32,
108
109 #[serde(default = "default_visibility_timeout_secs")]
118 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
119 #[configurable(metadata(docs::type_unit = "seconds"))]
120 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
121 pub(super) visibility_timeout_secs: u32,
122
123 #[serde(default = "default_true")]
127 #[derivative(Default(value = "default_true()"))]
128 pub(super) delete_message: bool,
129
130 #[serde(default = "default_true")]
134 #[derivative(Default(value = "default_true()"))]
135 pub(super) delete_failed_message: bool,
136
137 #[configurable(metadata(docs::type_unit = "tasks"))]
147 #[configurable(metadata(docs::examples = 5))]
148 pub(super) client_concurrency: Option<NonZeroUsize>,
149
150 #[serde(default = "default_max_number_of_messages")]
158 #[derivative(Default(value = "default_max_number_of_messages()"))]
159 #[configurable(metadata(docs::human_name = "Max Messages"))]
160 #[configurable(metadata(docs::examples = 1))]
161 pub(super) max_number_of_messages: u32,
162
163 #[configurable(derived)]
164 #[serde(default)]
165 #[derivative(Default)]
166 pub(super) tls_options: Option<TlsConfig>,
167
168 #[configurable(derived)]
171 #[derivative(Default)]
172 #[serde(default)]
173 #[serde(flatten)]
174 pub(super) timeout: Option<AwsTimeout>,
175
176 #[configurable(derived)]
178 pub(super) deferred: Option<DeferredConfig>,
179}
180
181const fn default_poll_secs() -> u32 {
182 15
183}
184
185const fn default_visibility_timeout_secs() -> u32 {
186 300
187}
188
189const fn default_max_number_of_messages() -> u32 {
190 10
191}
192
193const fn default_true() -> bool {
194 true
195}
196
197#[derive(Debug, Snafu)]
198pub(super) enum IngestorNewError {
199 #[snafu(display("Invalid value for max_number_of_messages {}", messages))]
200 InvalidNumberOfMessages { messages: u32 },
201}
202
203#[allow(clippy::large_enum_variant)]
204#[derive(Debug, Snafu)]
205pub enum ProcessingError {
206 #[snafu(display(
207 "Could not parse SQS message with id {} as S3 notification: {}",
208 message_id,
209 source
210 ))]
211 InvalidSqsMessage {
212 source: serde_json::Error,
213 message_id: String,
214 },
215 #[snafu(display("Failed to fetch s3://{}/{}: {}", bucket, key, source))]
216 GetObject {
217 source: SdkError<GetObjectError, HttpResponse>,
218 bucket: String,
219 key: String,
220 },
221 #[snafu(display("Failed to read all of s3://{}/{}: {}", bucket, key, source))]
222 ReadObject {
223 source: Box<dyn FramingError>,
224 bucket: String,
225 key: String,
226 },
227 #[snafu(display("Failed to flush all of s3://{}/{}: {}", bucket, key, source))]
228 PipelineSend {
229 source: crate::source_sender::ClosedError,
230 bucket: String,
231 key: String,
232 },
233 #[snafu(display(
234 "Object notification for s3://{}/{} is a bucket in another region: {}",
235 bucket,
236 key,
237 region
238 ))]
239 WrongRegion {
240 region: String,
241 bucket: String,
242 key: String,
243 },
244 #[snafu(display("Unsupported S3 event version: {}.", version,))]
245 UnsupportedS3EventVersion { version: semver::Version },
246 #[snafu(display(
247 "Sink reported an error sending events for an s3 object in region {}: s3://{}/{}",
248 region,
249 bucket,
250 key
251 ))]
252 ErrorAcknowledgement {
253 region: String,
254 bucket: String,
255 key: String,
256 },
257 #[snafu(display(
258 "File s3://{}/{} too old. Forwarded to deferred queue {}",
259 bucket,
260 key,
261 deferred_queue
262 ))]
263 FileTooOld {
264 bucket: String,
265 key: String,
266 deferred_queue: String,
267 },
268}
269
270pub struct State {
271 region: Region,
272
273 s3_client: S3Client,
274 sqs_client: SqsClient,
275
276 multiline: Option<line_agg::Config>,
277 compression: super::Compression,
278
279 queue_url: String,
280 poll_secs: i32,
281 max_number_of_messages: i32,
282 client_concurrency: usize,
283 visibility_timeout_secs: i32,
284 delete_message: bool,
285 delete_failed_message: bool,
286 decoder: Decoder,
287
288 deferred: Option<DeferredConfig>,
289}
290
291pub(super) struct Ingestor {
292 state: Arc<State>,
293}
294
295impl Ingestor {
296 pub(super) async fn new(
297 region: Region,
298 sqs_client: SqsClient,
299 s3_client: S3Client,
300 config: Config,
301 compression: super::Compression,
302 multiline: Option<line_agg::Config>,
303 decoder: Decoder,
304 ) -> Result<Ingestor, IngestorNewError> {
305 if config.max_number_of_messages < 1 || config.max_number_of_messages > 10 {
306 return Err(IngestorNewError::InvalidNumberOfMessages {
307 messages: config.max_number_of_messages,
308 });
309 }
310 let state = Arc::new(State {
311 region,
312
313 s3_client,
314 sqs_client,
315
316 compression,
317 multiline,
318
319 queue_url: config.queue_url,
320 poll_secs: config.poll_secs as i32,
321 max_number_of_messages: config.max_number_of_messages as i32,
322 client_concurrency: config
323 .client_concurrency
324 .map(|n| n.get())
325 .unwrap_or_else(crate::num_threads),
326 visibility_timeout_secs: config.visibility_timeout_secs as i32,
327 delete_message: config.delete_message,
328 delete_failed_message: config.delete_failed_message,
329 decoder,
330
331 deferred: config.deferred,
332 });
333
334 Ok(Ingestor { state })
335 }
336
337 pub(super) async fn run(
338 self,
339 cx: SourceContext,
340 acknowledgements: SourceAcknowledgementsConfig,
341 log_namespace: LogNamespace,
342 ) -> Result<(), ()> {
343 let acknowledgements = cx.do_acknowledgements(acknowledgements);
344 let mut handles = Vec::new();
345 for _ in 0..self.state.client_concurrency {
346 let process = IngestorProcess::new(
347 Arc::clone(&self.state),
348 cx.out.clone(),
349 cx.shutdown.clone(),
350 log_namespace,
351 acknowledgements,
352 );
353 let fut = process.run();
354 let handle = tokio::spawn(fut.in_current_span());
355 handles.push(handle);
356 }
357
358 for handle in handles.drain(..) {
361 if let Err(e) = handle.await {
362 if e.is_panic() {
363 panic::resume_unwind(e.into_panic());
364 }
365 }
366 }
367
368 Ok(())
369 }
370}
371
372pub struct IngestorProcess {
373 state: Arc<State>,
374 out: SourceSender,
375 shutdown: ShutdownSignal,
376 acknowledgements: bool,
377 log_namespace: LogNamespace,
378 bytes_received: Registered<BytesReceived>,
379 events_received: Registered<EventsReceived>,
380}
381
382impl IngestorProcess {
383 pub fn new(
384 state: Arc<State>,
385 out: SourceSender,
386 shutdown: ShutdownSignal,
387 log_namespace: LogNamespace,
388 acknowledgements: bool,
389 ) -> Self {
390 Self {
391 state,
392 out,
393 shutdown,
394 acknowledgements,
395 log_namespace,
396 bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
397 events_received: register!(EventsReceived),
398 }
399 }
400
401 async fn run(mut self) {
402 let shutdown = self.shutdown.clone().fuse();
403 pin!(shutdown);
404
405 loop {
406 select! {
407 _ = &mut shutdown => break,
408 _ = self.run_once() => {},
409 }
410 }
411 }
412
413 async fn run_once(&mut self) {
414 let messages = self.receive_messages().await;
415 let messages = messages
416 .inspect(|messages| {
417 emit!(SqsMessageReceiveSucceeded {
418 count: messages.len(),
419 });
420 })
421 .inspect_err(|err| {
422 emit!(SqsMessageReceiveError { error: err });
423 })
424 .unwrap_or_default();
425
426 let mut delete_entries = Vec::new();
427 let mut deferred_entries = Vec::new();
428 for message in messages {
429 let receipt_handle = match message.receipt_handle {
430 None => {
431 warn!(message = "Refusing to process message with no receipt_handle.", ?message.message_id);
435 continue;
436 }
437 Some(ref handle) => handle.to_owned(),
438 };
439
440 let message_id = message
441 .message_id
442 .clone()
443 .unwrap_or_else(|| "<unknown>".to_owned());
444 match self.handle_sqs_message(message.clone()).await {
445 Ok(()) => {
446 emit!(SqsMessageProcessingSucceeded {
447 message_id: &message_id
448 });
449 if self.state.delete_message {
450 trace!(
451 message = "Queued SQS message for deletion.",
452 id = message_id,
453 receipt_handle = receipt_handle,
454 );
455 delete_entries.push(
456 DeleteMessageBatchRequestEntry::builder()
457 .id(message_id.clone())
458 .receipt_handle(receipt_handle)
459 .build()
460 .expect("all required builder params specified"),
461 );
462 }
463 }
464 Err(err) => {
465 match err {
466 ProcessingError::FileTooOld { .. } => {
467 emit!(SqsMessageProcessingSucceeded {
468 message_id: &message_id
469 });
470 if let Some(deferred) = &self.state.deferred {
471 trace!(
472 message = "Forwarding message to deferred queue.",
473 id = message_id,
474 receipt_handle = receipt_handle,
475 deferred_queue = deferred.queue_url,
476 );
477
478 deferred_entries.push(
479 SendMessageBatchRequestEntry::builder()
480 .id(message_id.clone())
481 .message_body(message.body.unwrap_or_default())
482 .build()
483 .expect("all required builder params specified"),
484 );
485 }
486 if self.state.delete_message {
488 trace!(
489 message = "Queued SQS message for deletion.",
490 id = message_id,
491 receipt_handle = receipt_handle,
492 );
493 delete_entries.push(
494 DeleteMessageBatchRequestEntry::builder()
495 .id(message_id)
496 .receipt_handle(receipt_handle)
497 .build()
498 .expect("all required builder params specified"),
499 );
500 }
501 }
502 _ => {
503 emit!(SqsMessageProcessingError {
504 message_id: &message_id,
505 error: &err,
506 });
507 }
508 }
509 }
510 }
511 }
512
513 if !deferred_entries.is_empty() {
515 let Some(deferred) = &self.state.deferred else {
516 warn!(
517 message = "Deferred queue not configured, but received deferred entries.",
518 internal_log_rate_limit = true
519 );
520 return;
521 };
522 let cloned_entries = deferred_entries.clone();
523 match self
524 .send_messages(deferred_entries, deferred.queue_url.clone())
525 .await
526 {
527 Ok(result) => {
528 if !result.successful.is_empty() {
529 emit!(SqsMessageSentSucceeded {
530 message_ids: result.successful,
531 })
532 }
533
534 if !result.failed.is_empty() {
535 emit!(SqsMessageSentPartialError {
536 entries: result.failed
537 })
538 }
539 }
540 Err(err) => {
541 emit!(SqsMessageSendBatchError {
542 entries: cloned_entries,
543 error: err,
544 });
545 }
546 }
547 }
548 if !delete_entries.is_empty() {
549 let cloned_entries = delete_entries.clone();
551 match self.delete_messages(delete_entries).await {
552 Ok(result) => {
553 if !result.successful.is_empty() {
556 emit!(SqsMessageDeleteSucceeded {
557 message_ids: result.successful,
558 });
559 }
560
561 if !result.failed.is_empty() {
562 emit!(SqsMessageDeletePartialError {
563 entries: result.failed
564 });
565 }
566 }
567 Err(err) => {
568 emit!(SqsMessageDeleteBatchError {
569 entries: cloned_entries,
570 error: err,
571 });
572 }
573 }
574 }
575 }
576
577 async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
578 let sqs_body = message.body.unwrap_or_default();
579 let sqs_body = serde_json::from_str::<SnsNotification>(sqs_body.as_ref())
580 .map(|notification| notification.message)
581 .unwrap_or(sqs_body);
582 let s3_event: SqsEvent =
583 serde_json::from_str(sqs_body.as_ref()).context(InvalidSqsMessageSnafu {
584 message_id: message
585 .message_id
586 .clone()
587 .unwrap_or_else(|| "<empty>".to_owned()),
588 })?;
589
590 match s3_event {
591 SqsEvent::TestEvent(_s3_test_event) => {
592 debug!(?message.message_id, message = "Found S3 Test Event.");
593 Ok(())
594 }
595 SqsEvent::Event(s3_event) => self.handle_s3_event(s3_event).await,
596 }
597 }
598
599 async fn handle_s3_event(&mut self, s3_event: S3Event) -> Result<(), ProcessingError> {
600 for record in s3_event.records {
601 self.handle_s3_event_record(record, self.log_namespace)
602 .await?
603 }
604 Ok(())
605 }
606
607 async fn handle_s3_event_record(
608 &mut self,
609 s3_event: S3EventRecord,
610 log_namespace: LogNamespace,
611 ) -> Result<(), ProcessingError> {
612 let event_version: semver::Version = s3_event.event_version.clone().into();
613 if !SUPPORTED_S3_EVENT_VERSION.matches(&event_version) {
614 return Err(ProcessingError::UnsupportedS3EventVersion {
615 version: event_version.clone(),
616 });
617 }
618
619 if s3_event.event_name.kind != "ObjectCreated" {
620 emit!(SqsS3EventRecordInvalidEventIgnored {
621 bucket: &s3_event.s3.bucket.name,
622 key: &s3_event.s3.object.key,
623 kind: &s3_event.event_name.kind,
624 name: &s3_event.event_name.name,
625 });
626 return Ok(());
627 }
628
629 if self.state.region.as_ref() != s3_event.aws_region.as_str() {
632 return Err(ProcessingError::WrongRegion {
633 bucket: s3_event.s3.bucket.name.clone(),
634 key: s3_event.s3.object.key.clone(),
635 region: s3_event.aws_region,
636 });
637 }
638
639 if let Some(deferred) = &self.state.deferred {
640 let delta = Utc::now() - s3_event.event_time;
641 if delta.num_seconds() > deferred.max_age_secs as i64 {
642 return Err(ProcessingError::FileTooOld {
643 bucket: s3_event.s3.bucket.name.clone(),
644 key: s3_event.s3.object.key.clone(),
645 deferred_queue: deferred.queue_url.clone(),
646 });
647 }
648 }
649
650 let object_result = self
651 .state
652 .s3_client
653 .get_object()
654 .bucket(s3_event.s3.bucket.name.clone())
655 .key(s3_event.s3.object.key.clone())
656 .send()
657 .await
658 .context(GetObjectSnafu {
659 bucket: s3_event.s3.bucket.name.clone(),
660 key: s3_event.s3.object.key.clone(),
661 });
662
663 let object = object_result?;
664
665 debug!(
666 message = "Got S3 object from SQS notification.",
667 bucket = s3_event.s3.bucket.name,
668 key = s3_event.s3.object.key,
669 );
670
671 let metadata = object.metadata;
672
673 let timestamp = object.last_modified.map(|ts| {
674 Utc.timestamp_opt(ts.secs(), ts.subsec_nanos())
675 .single()
676 .expect("invalid timestamp")
677 });
678
679 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
680 let object_reader = super::s3_object_decoder(
681 self.state.compression,
682 &s3_event.s3.object.key,
683 object.content_encoding.as_deref(),
684 object.content_type.as_deref(),
685 object.body,
686 )
687 .await;
688
689 let mut read_error = None;
702 let bytes_received = self.bytes_received.clone();
703 let events_received = self.events_received.clone();
704 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = Box::new(
705 FramedRead::new(object_reader, self.state.decoder.framer.clone())
706 .map(|res| {
707 res.inspect(|bytes| {
708 bytes_received.emit(ByteSize(bytes.len()));
709 })
710 .map_err(|err| {
711 read_error = Some(err);
712 })
713 .ok()
714 })
715 .take_while(|res| ready(res.is_some()))
716 .map(|r| r.expect("validated by take_while")),
717 );
718
719 let lines: Box<dyn Stream<Item = Bytes> + Send + Unpin> = match &self.state.multiline {
720 Some(config) => Box::new(
721 LineAgg::new(
722 lines.map(|line| ((), line, ())),
723 line_agg::Logic::new(config.clone()),
724 )
725 .map(|(_src, line, _context, _lastline_context)| line),
726 ),
727 None => lines,
728 };
729
730 let mut stream = lines.flat_map(|line| {
731 let events = match self.state.decoder.deserializer_parse(line) {
732 Ok((events, _events_size)) => events,
733 Err(_error) => {
734 SmallVec::new()
737 }
738 };
739
740 let events = events
741 .into_iter()
742 .map(|mut event: Event| {
743 event = event.with_batch_notifier_option(&batch);
744 if let Some(log_event) = event.maybe_as_log_mut() {
745 handle_single_log(
746 log_event,
747 log_namespace,
748 &s3_event,
749 &metadata,
750 timestamp,
751 );
752 }
753 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
754 event
755 })
756 .collect::<Vec<Event>>();
757 futures::stream::iter(events)
758 });
759
760 let send_error = match self.out.send_event_stream(&mut stream).await {
761 Ok(_) => None,
762 Err(_) => {
763 let (count, _) = stream.size_hint();
764 emit!(StreamClosedError { count });
765 Some(crate::source_sender::ClosedError)
766 }
767 };
768
769 drop(stream);
772
773 drop(batch);
776
777 if let Some(error) = read_error {
778 Err(ProcessingError::ReadObject {
779 source: error,
780 bucket: s3_event.s3.bucket.name.clone(),
781 key: s3_event.s3.object.key.clone(),
782 })
783 } else if let Some(error) = send_error {
784 Err(ProcessingError::PipelineSend {
785 source: error,
786 bucket: s3_event.s3.bucket.name.clone(),
787 key: s3_event.s3.object.key.clone(),
788 })
789 } else {
790 match receiver {
791 None => Ok(()),
792 Some(receiver) => {
793 let result = receiver.await;
794 match result {
795 BatchStatus::Delivered => {
796 debug!(
797 message = "S3 object from SQS delivered.",
798 bucket = s3_event.s3.bucket.name,
799 key = s3_event.s3.object.key,
800 );
801 Ok(())
802 }
803 BatchStatus::Errored => Err(ProcessingError::ErrorAcknowledgement {
804 bucket: s3_event.s3.bucket.name,
805 key: s3_event.s3.object.key,
806 region: s3_event.aws_region,
807 }),
808 BatchStatus::Rejected => {
809 if self.state.delete_failed_message {
810 warn!(
811 message =
812 "S3 object from SQS was rejected. Deleting failed message.",
813 bucket = s3_event.s3.bucket.name,
814 key = s3_event.s3.object.key,
815 );
816 Ok(())
817 } else {
818 Err(ProcessingError::ErrorAcknowledgement {
819 bucket: s3_event.s3.bucket.name,
820 key: s3_event.s3.object.key,
821 region: s3_event.aws_region,
822 })
823 }
824 }
825 }
826 }
827 }
828 }
829 }
830
831 async fn receive_messages(
832 &mut self,
833 ) -> Result<Vec<Message>, SdkError<ReceiveMessageError, HttpResponse>> {
834 self.state
835 .sqs_client
836 .receive_message()
837 .queue_url(self.state.queue_url.clone())
838 .max_number_of_messages(self.state.max_number_of_messages)
839 .visibility_timeout(self.state.visibility_timeout_secs)
840 .wait_time_seconds(self.state.poll_secs)
841 .send()
842 .map_ok(|res| res.messages.unwrap_or_default())
843 .await
844 }
845
846 async fn delete_messages(
847 &mut self,
848 entries: Vec<DeleteMessageBatchRequestEntry>,
849 ) -> Result<DeleteMessageBatchOutput, SdkError<DeleteMessageBatchError, HttpResponse>> {
850 self.state
851 .sqs_client
852 .delete_message_batch()
853 .queue_url(self.state.queue_url.clone())
854 .set_entries(Some(entries))
855 .send()
856 .await
857 }
858
859 async fn send_messages(
860 &mut self,
861 entries: Vec<SendMessageBatchRequestEntry>,
862 queue_url: String,
863 ) -> Result<SendMessageBatchOutput, SdkError<SendMessageBatchError, HttpResponse>> {
864 self.state
865 .sqs_client
866 .send_message_batch()
867 .queue_url(queue_url.clone())
868 .set_entries(Some(entries))
869 .send()
870 .await
871 }
872}
873
874fn handle_single_log(
875 log: &mut LogEvent,
876 log_namespace: LogNamespace,
877 s3_event: &S3EventRecord,
878 metadata: &Option<HashMap<String, String>>,
879 timestamp: Option<DateTime<Utc>>,
880) {
881 log_namespace.insert_source_metadata(
882 AwsS3Config::NAME,
883 log,
884 Some(LegacyKey::Overwrite(path!("bucket"))),
885 path!("bucket"),
886 Bytes::from(s3_event.s3.bucket.name.as_bytes().to_vec()),
887 );
888
889 log_namespace.insert_source_metadata(
890 AwsS3Config::NAME,
891 log,
892 Some(LegacyKey::Overwrite(path!("object"))),
893 path!("object"),
894 Bytes::from(s3_event.s3.object.key.as_bytes().to_vec()),
895 );
896 log_namespace.insert_source_metadata(
897 AwsS3Config::NAME,
898 log,
899 Some(LegacyKey::Overwrite(path!("region"))),
900 path!("region"),
901 Bytes::from(s3_event.aws_region.as_bytes().to_vec()),
902 );
903
904 if let Some(metadata) = metadata {
905 for (key, value) in metadata {
906 log_namespace.insert_source_metadata(
907 AwsS3Config::NAME,
908 log,
909 Some(LegacyKey::Overwrite(path!(key))),
910 path!("metadata", key.as_str()),
911 value.clone(),
912 );
913 }
914 }
915
916 log_namespace.insert_vector_metadata(
917 log,
918 log_schema().source_type_key(),
919 path!("source_type"),
920 Bytes::from_static(AwsS3Config::NAME.as_bytes()),
921 );
922
923 match log_namespace {
927 LogNamespace::Vector => {
928 if let Some(timestamp) = timestamp {
929 log.insert(metadata_path!(AwsS3Config::NAME, "timestamp"), timestamp);
930 }
931
932 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
933 }
934 LogNamespace::Legacy => {
935 if let Some(timestamp_key) = log_schema().timestamp_key() {
936 log.try_insert(
937 (PathPrefix::Event, timestamp_key),
938 timestamp.unwrap_or_else(Utc::now),
939 );
940 }
941 }
942 };
943}
944
945#[derive(Clone, Debug, Deserialize)]
947#[serde(rename_all = "PascalCase")]
948pub struct SnsNotification {
949 pub message: String,
950 pub timestamp: DateTime<Utc>,
951}
952
953#[derive(Clone, Debug, Deserialize)]
955#[serde(untagged)]
956enum SqsEvent {
957 Event(S3Event),
958 TestEvent(S3TestEvent),
959}
960
961#[derive(Clone, Debug, Deserialize)]
962#[serde(rename_all = "PascalCase")]
963pub struct S3TestEvent {
964 pub service: String,
965 pub event: S3EventName,
966 pub bucket: String,
967}
968
969#[derive(Clone, Debug, Deserialize, Serialize)]
971#[serde(rename_all = "PascalCase")]
972pub struct S3Event {
973 pub records: Vec<S3EventRecord>,
974}
975
976#[derive(Clone, Debug, Deserialize, Serialize)]
977#[serde(rename_all = "camelCase")]
978pub struct S3EventRecord {
979 pub event_version: S3EventVersion,
980 pub event_source: String,
981 pub aws_region: String,
982 pub event_name: S3EventName,
983 pub event_time: DateTime<Utc>,
984
985 pub s3: S3Message,
986}
987
988#[derive(Clone, Debug)]
989pub struct S3EventVersion {
990 pub major: u64,
991 pub minor: u64,
992}
993
994impl From<S3EventVersion> for semver::Version {
995 fn from(v: S3EventVersion) -> semver::Version {
996 semver::Version::new(v.major, v.minor, 0)
997 }
998}
999
1000impl<'de> Deserialize<'de> for S3EventVersion {
1003 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1004 where
1005 D: Deserializer<'de>,
1006 {
1007 use serde::de::Error;
1008
1009 let s = String::deserialize(deserializer)?;
1010
1011 let mut parts = s.splitn(2, '.');
1012
1013 let major = parts
1014 .next()
1015 .ok_or_else(|| D::Error::custom("Missing major version number"))?
1016 .parse::<u64>()
1017 .map_err(D::Error::custom)?;
1018
1019 let minor = parts
1020 .next()
1021 .ok_or_else(|| D::Error::custom("Missing minor version number"))?
1022 .parse::<u64>()
1023 .map_err(D::Error::custom)?;
1024
1025 Ok(S3EventVersion { major, minor })
1026 }
1027}
1028
1029impl Serialize for S3EventVersion {
1030 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1031 where
1032 S: Serializer,
1033 {
1034 serializer.serialize_str(&format!("{}.{}", self.major, self.minor))
1035 }
1036}
1037
1038#[derive(Clone, Debug)]
1039pub struct S3EventName {
1040 pub kind: String,
1041 pub name: String,
1042}
1043
1044impl<'de> Deserialize<'de> for S3EventName {
1049 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
1050 where
1051 D: Deserializer<'de>,
1052 {
1053 use serde::de::Error;
1054
1055 let s = String::deserialize(deserializer)?;
1056
1057 let mut parts = s.splitn(2, ':');
1058
1059 let kind = parts
1060 .next()
1061 .ok_or_else(|| D::Error::custom("Missing event kind"))?
1062 .parse::<String>()
1063 .map_err(D::Error::custom)?;
1064
1065 let name = parts
1066 .next()
1067 .ok_or_else(|| D::Error::custom("Missing event name"))?
1068 .parse::<String>()
1069 .map_err(D::Error::custom)?;
1070
1071 Ok(S3EventName { kind, name })
1072 }
1073}
1074
1075impl Serialize for S3EventName {
1076 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
1077 where
1078 S: Serializer,
1079 {
1080 serializer.serialize_str(&format!("{}:{}", self.kind, self.name))
1081 }
1082}
1083
1084#[derive(Clone, Debug, Deserialize, Serialize)]
1085#[serde(rename_all = "camelCase")]
1086pub struct S3Message {
1087 pub bucket: S3Bucket,
1088 pub object: S3Object,
1089}
1090
1091#[derive(Clone, Debug, Deserialize, Serialize)]
1092#[serde(rename_all = "camelCase")]
1093pub struct S3Bucket {
1094 pub name: String,
1095}
1096
1097#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
1098#[serde(rename_all = "camelCase")]
1099pub struct S3Object {
1100 #[serde(with = "urlencoded_string")]
1103 pub key: String,
1104}
1105
1106mod urlencoded_string {
1107 use percent_encoding::{percent_decode, utf8_percent_encode};
1108
1109 pub fn deserialize<'de, D>(deserializer: D) -> Result<String, D::Error>
1110 where
1111 D: serde::de::Deserializer<'de>,
1112 {
1113 use serde::de::Error;
1114
1115 serde::de::Deserialize::deserialize(deserializer).and_then(|s: &[u8]| {
1116 let decoded = if s.contains(&b'+') {
1117 let s = s
1119 .iter()
1120 .map(|c| if *c == b'+' { b' ' } else { *c })
1121 .collect::<Vec<_>>();
1122 percent_decode(&s).decode_utf8().map(Into::into)
1123 } else {
1124 percent_decode(s).decode_utf8().map(Into::into)
1125 };
1126
1127 decoded
1128 .map_err(|err| D::Error::custom(format!("error url decoding S3 object key: {err}")))
1129 })
1130 }
1131
1132 pub fn serialize<S>(s: &str, serializer: S) -> Result<S::Ok, S::Error>
1133 where
1134 S: serde::ser::Serializer,
1135 {
1136 serializer.serialize_str(
1137 &utf8_percent_encode(s, percent_encoding::NON_ALPHANUMERIC).collect::<String>(),
1138 )
1139 }
1140}
1141
1142#[test]
1143fn test_key_deserialize() {
1144 let value = serde_json::from_str(r#"{"key": "noog+nork"}"#).unwrap();
1145 assert_eq!(
1146 S3Object {
1147 key: "noog nork".to_string(),
1148 },
1149 value
1150 );
1151
1152 let value = serde_json::from_str(r#"{"key": "noog%2bnork"}"#).unwrap();
1153 assert_eq!(
1154 S3Object {
1155 key: "noog+nork".to_string(),
1156 },
1157 value
1158 );
1159}
1160
1161#[test]
1162fn test_s3_testevent() {
1163 let value: S3TestEvent = serde_json::from_str(
1164 r#"{
1165 "Service":"Amazon S3",
1166 "Event":"s3:TestEvent",
1167 "Time":"2014-10-13T15:57:02.089Z",
1168 "Bucket":"bucketname",
1169 "RequestId":"5582815E1AEA5ADF",
1170 "HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
1171 }"#,
1172 )
1173 .unwrap();
1174
1175 assert_eq!(value.service, "Amazon S3".to_string());
1176 assert_eq!(value.bucket, "bucketname".to_string());
1177 assert_eq!(value.event.kind, "s3".to_string());
1178 assert_eq!(value.event.name, "TestEvent".to_string());
1179}
1180
1181#[test]
1182fn test_s3_sns_testevent() {
1183 let sns_value: SnsNotification = serde_json::from_str(
1184 r#"{
1185 "Type" : "Notification",
1186 "MessageId" : "63a3f6b6-d533-4a47-aef9-fcf5cf758c76",
1187 "TopicArn" : "arn:aws:sns:us-west-2:123456789012:MyTopic",
1188 "Subject" : "Testing publish to subscribed queues",
1189 "Message" : "{\"Bucket\":\"bucketname\",\"Event\":\"s3:TestEvent\",\"HostId\":\"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE\",\"RequestId\":\"5582815E1AEA5ADF\",\"Service\":\"Amazon S3\",\"Time\":\"2014-10-13T15:57:02.089Z\"}",
1190 "Timestamp" : "2012-03-29T05:12:16.901Z",
1191 "SignatureVersion" : "1",
1192 "Signature" : "EXAMPLEnTrFPa3...",
1193 "SigningCertURL" : "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-f3ecfb7224c7233fe7bb5f59f96de52f.pem",
1194 "UnsubscribeURL" : "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:123456789012:MyTopic:c7fe3a54-ab0e-4ec2-88e0-db410a0f2bee"
1195 }"#,
1196 ).unwrap();
1197
1198 assert_eq!(
1199 sns_value.timestamp,
1200 DateTime::parse_from_rfc3339("2012-03-29T05:12:16.901Z")
1201 .unwrap()
1202 .to_utc()
1203 );
1204
1205 let value: S3TestEvent = serde_json::from_str(sns_value.message.as_ref()).unwrap();
1206
1207 assert_eq!(value.service, "Amazon S3".to_string());
1208 assert_eq!(value.bucket, "bucketname".to_string());
1209 assert_eq!(value.event.kind, "s3".to_string());
1210 assert_eq!(value.event.name, "TestEvent".to_string());
1211}
1212
1213#[test]
1214fn parse_sqs_config() {
1215 let config: Config = toml::from_str(
1216 r#"
1217 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1218 "#,
1219 )
1220 .unwrap();
1221 assert_eq!(
1222 config.queue_url,
1223 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1224 );
1225 assert!(config.deferred.is_none());
1226
1227 let config: Config = toml::from_str(
1228 r#"
1229 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1230 [deferred]
1231 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1232 max_age_secs = 3600
1233 "#,
1234 )
1235 .unwrap();
1236 assert_eq!(
1237 config.queue_url,
1238 "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1239 );
1240 let Some(deferred) = config.deferred else {
1241 panic!("Expected deferred config");
1242 };
1243 assert_eq!(
1244 deferred.queue_url,
1245 "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1246 );
1247 assert_eq!(deferred.max_age_secs, 3600);
1248
1249 let test: Result<Config, toml::de::Error> = toml::from_str(
1250 r#"
1251 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1252 [deferred]
1253 max_age_secs = 3600
1254 "#,
1255 );
1256 assert!(test.is_err());
1257
1258 let test: Result<Config, toml::de::Error> = toml::from_str(
1259 r#"
1260 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"
1261 [deferred]
1262 queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/MyDeferredQueue"
1263 "#,
1264 );
1265 assert!(test.is_err());
1266}