1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::{
9 codecs::{
10 NewlineDelimitedDecoderConfig,
11 decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
12 },
13 config::{LegacyKey, LogNamespace},
14 configurable::configurable_component,
15 lookup::owned_value_path,
16};
17use vrl::value::{Kind, kind::Collection};
18
19use super::util::MultilineConfig;
20use crate::{
21 aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
22 codecs::DecodingConfig,
23 common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
24 config::{
25 ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
26 },
27 line_agg,
28 serde::{bool_or_struct, default_decoding},
29 tls::TlsConfig,
30};
31
32pub mod sqs;
33
34#[configurable_component]
36#[configurable(metadata(docs::advanced))]
37#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum Compression {
40 #[default]
47 Auto,
48
49 None,
51
52 Gzip,
54
55 Zstd,
57}
58
59#[configurable_component]
61#[derive(Clone, Copy, Debug, Default)]
62#[serde(rename_all = "lowercase")]
63enum Strategy {
64 #[default]
68 Sqs,
69}
70
71#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
78#[derive(Clone, Debug, Derivative)]
79#[derivative(Default)]
80#[serde(default, deny_unknown_fields)]
81pub struct AwsS3Config {
82 #[serde(flatten)]
83 region: RegionOrEndpoint,
84
85 compression: Compression,
87
88 #[configurable(metadata(docs::hidden))]
90 strategy: Strategy,
91
92 sqs: Option<sqs::Config>,
94
95 #[configurable(deprecated)]
99 #[configurable(metadata(docs::hidden))]
100 assume_role: Option<String>,
101
102 #[configurable(derived)]
103 #[serde(default)]
104 auth: AwsAuthentication,
105
106 #[configurable(derived)]
110 multiline: Option<MultilineConfig>,
111
112 #[configurable(derived)]
113 #[serde(default, deserialize_with = "bool_or_struct")]
114 acknowledgements: SourceAcknowledgementsConfig,
115
116 #[configurable(derived)]
117 tls_options: Option<TlsConfig>,
118
119 #[configurable(metadata(docs::hidden))]
121 #[serde(default)]
122 log_namespace: Option<bool>,
123
124 #[configurable(derived)]
125 #[serde(default = "default_framing")]
126 #[derivative(Default(value = "default_framing()"))]
127 pub framing: FramingConfig,
128
129 #[configurable(derived)]
130 #[serde(default = "default_decoding")]
131 #[derivative(Default(value = "default_decoding()"))]
132 pub decoding: DeserializerConfig,
133
134 #[serde(default = "default_true")]
138 #[derivative(Default(value = "default_true()"))]
139 pub force_path_style: bool,
140}
141
142const fn default_framing() -> FramingConfig {
143 FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
145 newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
146 })
147}
148
149const fn default_true() -> bool {
150 true
151}
152
153impl_generate_config_from_default!(AwsS3Config);
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "aws_s3")]
157impl SourceConfig for AwsS3Config {
158 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159 let log_namespace = cx.log_namespace(self.log_namespace);
160
161 let multiline_config: Option<line_agg::Config> = self
162 .multiline
163 .as_ref()
164 .map(|config| config.try_into())
165 .transpose()?;
166
167 match self.strategy {
168 Strategy::Sqs => Ok(Box::pin(
169 self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
170 .await?
171 .run(cx, self.acknowledgements, log_namespace),
172 )),
173 }
174 }
175
176 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177 let log_namespace = global_log_namespace.merge(self.log_namespace);
178 let mut schema_definition = self
179 .decoding
180 .schema_definition(log_namespace)
181 .with_source_metadata(
182 Self::NAME,
183 Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
184 &owned_value_path!("bucket"),
185 Kind::bytes(),
186 None,
187 )
188 .with_source_metadata(
189 Self::NAME,
190 Some(LegacyKey::Overwrite(owned_value_path!("object"))),
191 &owned_value_path!("object"),
192 Kind::bytes(),
193 None,
194 )
195 .with_source_metadata(
196 Self::NAME,
197 Some(LegacyKey::Overwrite(owned_value_path!("region"))),
198 &owned_value_path!("region"),
199 Kind::bytes(),
200 None,
201 )
202 .with_source_metadata(
203 Self::NAME,
204 None,
205 &owned_value_path!("timestamp"),
206 Kind::timestamp(),
207 Some("timestamp"),
208 )
209 .with_standard_vector_source_metadata()
210 .with_source_metadata(
212 Self::NAME,
213 None,
214 &owned_value_path!("metadata"),
215 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
216 None,
217 );
218
219 if log_namespace == LogNamespace::Legacy {
221 schema_definition = schema_definition.unknown_fields(Kind::bytes());
222 }
223
224 vec![SourceOutput::new_maybe_logs(
225 self.decoding.output_type(),
226 schema_definition,
227 )]
228 }
229
230 fn can_acknowledge(&self) -> bool {
231 true
232 }
233}
234
235impl AwsS3Config {
236 async fn create_sqs_ingestor(
237 &self,
238 multiline: Option<line_agg::Config>,
239 proxy: &ProxyConfig,
240 log_namespace: LogNamespace,
241 ) -> crate::Result<sqs::Ingestor> {
242 let region = self.region.region();
243 let endpoint = self.region.endpoint();
244
245 let s3_client = create_client::<S3ClientBuilder>(
246 &S3ClientBuilder {
247 force_path_style: Some(self.force_path_style),
248 },
249 &self.auth,
250 region.clone(),
251 endpoint.clone(),
252 proxy,
253 self.tls_options.as_ref(),
254 None,
255 )
256 .await?;
257
258 let decoder =
259 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
260 .build()?;
261
262 match self.sqs {
263 Some(ref sqs) => {
264 let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
265 &SqsClientBuilder {},
266 &self.auth,
267 region.clone(),
268 endpoint,
269 proxy,
270 sqs.tls_options.as_ref(),
271 sqs.timeout.as_ref(),
272 )
273 .await?;
274
275 let ingestor = sqs::Ingestor::new(
276 region,
277 sqs_client,
278 s3_client,
279 sqs.clone(),
280 self.compression,
281 multiline,
282 decoder,
283 )
284 .await?;
285
286 Ok(ingestor)
287 }
288 None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
289 }
290 }
291}
292
293#[derive(Debug, Snafu)]
294enum CreateSqsIngestorError {
295 #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
296 ConfigMissing,
297}
298
299async fn s3_object_decoder(
301 compression: Compression,
302 key: &str,
303 content_encoding: Option<&str>,
304 content_type: Option<&str>,
305 mut body: ByteStream,
306) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
307 let first = match body.next().await {
308 Some(first) => first,
309 _ => {
310 return Box::new(tokio::io::empty());
311 }
312 };
313
314 let r = tokio::io::BufReader::new(StreamReader::new(
315 stream::iter(Some(first))
316 .chain(Box::pin(async_stream::stream! {
317 while let Some(next) = body.next().await {
318 yield next;
319 }
320 }))
321 .map_err(std::io::Error::other),
322 ));
323
324 let compression = match compression {
325 Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
326 _ => compression,
327 };
328
329 use Compression::*;
330 match compression {
331 Auto => unreachable!(), None => Box::new(r),
333 Gzip => Box::new({
334 let mut decoder = bufread::GzipDecoder::new(r);
335 decoder.multiple_members(true);
336 decoder
337 }),
338 Zstd => Box::new({
339 let mut decoder = bufread::ZstdDecoder::new(r);
340 decoder.multiple_members(true);
341 decoder
342 }),
343 }
344}
345
346fn determine_compression(
353 content_encoding: Option<&str>,
354 content_type: Option<&str>,
355 key: &str,
356) -> Option<Compression> {
357 content_encoding
358 .and_then(content_encoding_to_compression)
359 .or_else(|| content_type.and_then(content_type_to_compression))
360 .or_else(|| object_key_to_compression(key))
361}
362
363fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
364 match content_encoding {
365 "gzip" => Some(Compression::Gzip),
366 "zstd" => Some(Compression::Zstd),
367 _ => None,
368 }
369}
370
371fn content_type_to_compression(content_type: &str) -> Option<Compression> {
372 match content_type {
373 "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
374 "application/zstd" => Some(Compression::Zstd),
375 _ => None,
376 }
377}
378
379fn object_key_to_compression(key: &str) -> Option<Compression> {
380 let extension = std::path::Path::new(key)
381 .extension()
382 .and_then(std::ffi::OsStr::to_str);
383
384 use Compression::*;
385 extension.and_then(|extension| match extension {
386 "gz" => Some(Gzip),
387 "zst" => Some(Zstd),
388 _ => Option::None,
389 })
390}
391
392#[cfg(test)]
393mod test {
394 use tokio::io::AsyncReadExt;
395
396 use super::*;
397
398 #[test]
399 fn determine_compression() {
400 use super::Compression;
401
402 let cases = vec![
403 ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
404 (
405 "out.log",
406 None,
407 Some("application/gzip"),
408 Some(Compression::Gzip),
409 ),
410 ("out.log.gz", None, None, Some(Compression::Gzip)),
411 ("out.txt", None, None, None),
412 ];
413 for case in cases {
414 let (key, content_encoding, content_type, expected) = case;
415 assert_eq!(
416 super::determine_compression(content_encoding, content_type, key),
417 expected,
418 "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
419 );
420 }
421 }
422
423 #[tokio::test]
424 async fn decode_empty_message_gzip() {
425 let key = uuid::Uuid::new_v4().to_string();
426
427 let mut data = Vec::new();
428 s3_object_decoder(
429 Compression::Auto,
430 &key,
431 Some("gzip"),
432 None,
433 ByteStream::default(),
434 )
435 .await
436 .read_to_end(&mut data)
437 .await
438 .unwrap();
439
440 assert!(data.is_empty());
441 }
442}
443
444#[cfg(feature = "aws-s3-integration-tests")]
445#[cfg(test)]
446mod integration_tests {
447 use std::{
448 any::Any,
449 collections::HashMap,
450 fs::File,
451 io::{self, BufRead},
452 path::Path,
453 time::Duration,
454 };
455
456 use aws_sdk_s3::Client as S3Client;
457 use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
458 use similar_asserts::assert_eq;
459 use vector_lib::{
460 codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
461 lookup::path,
462 };
463 use vrl::value::Value;
464
465 use super::*;
466 use crate::{
467 SourceSender,
468 aws::{AwsAuthentication, RegionOrEndpoint, create_client},
469 common::sqs::SqsClientBuilder,
470 config::{ProxyConfig, SourceConfig, SourceContext},
471 event::EventStatus::{self, *},
472 line_agg,
473 sources::{
474 aws_s3::{S3ClientBuilder, sqs::S3Event},
475 util::MultilineConfig,
476 },
477 test_util::{
478 collect_n,
479 components::{SOURCE_TAGS, assert_source_compliance},
480 lines_from_gzip_file, random_lines, trace_init,
481 },
482 };
483
484 fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
485 let file = io::BufReader::new(File::open(path).unwrap());
486 file.lines().map(|x| x.unwrap()).collect()
487 }
488
489 #[tokio::test]
490 async fn s3_process_message() {
491 trace_init();
492
493 let logs: Vec<String> = random_lines(100).take(10).collect();
494
495 test_event(
496 None,
497 None,
498 None,
499 None,
500 logs.join("\n").into_bytes(),
501 logs,
502 Delivered,
503 false,
504 DeserializerConfig::Bytes,
505 None,
506 )
507 .await;
508 }
509
510 #[tokio::test]
511 async fn s3_process_json_message() {
512 trace_init();
513
514 let logs: Vec<String> = random_lines(100).take(10).collect();
515
516 let json_logs: Vec<String> = logs
517 .iter()
518 .map(|msg| {
519 format!(r#"{{"message": "{msg}"}}"#)
521 })
522 .collect();
523
524 test_event(
525 None,
526 None,
527 None,
528 None,
529 json_logs.join("\n").into_bytes(),
530 logs,
531 Delivered,
532 false,
533 DeserializerConfig::Json(JsonDeserializerConfig::default()),
534 None,
535 )
536 .await;
537 }
538
539 #[tokio::test]
540 async fn s3_process_message_with_log_namespace() {
541 trace_init();
542
543 let logs: Vec<String> = random_lines(100).take(10).collect();
544
545 test_event(
546 None,
547 None,
548 None,
549 None,
550 logs.join("\n").into_bytes(),
551 logs,
552 Delivered,
553 true,
554 DeserializerConfig::Bytes,
555 None,
556 )
557 .await;
558 }
559
560 #[tokio::test]
561 async fn s3_process_message_spaces() {
562 trace_init();
563
564 let key = "key with spaces".to_string();
565 let logs: Vec<String> = random_lines(100).take(10).collect();
566
567 test_event(
568 Some(key),
569 None,
570 None,
571 None,
572 logs.join("\n").into_bytes(),
573 logs,
574 Delivered,
575 false,
576 DeserializerConfig::Bytes,
577 None,
578 )
579 .await;
580 }
581
582 #[tokio::test]
583 async fn s3_process_message_special_characters() {
584 trace_init();
585
586 let key = format!("special:{}", uuid::Uuid::new_v4());
587 let logs: Vec<String> = random_lines(100).take(10).collect();
588
589 test_event(
590 Some(key),
591 None,
592 None,
593 None,
594 logs.join("\n").into_bytes(),
595 logs,
596 Delivered,
597 false,
598 DeserializerConfig::Bytes,
599 None,
600 )
601 .await;
602 }
603
604 #[tokio::test]
605 async fn s3_process_message_gzip() {
606 use std::io::Read;
607
608 trace_init();
609
610 let logs: Vec<String> = random_lines(100).take(10).collect();
611
612 let mut gz = flate2::read::GzEncoder::new(
613 io::Cursor::new(logs.join("\n").into_bytes()),
614 flate2::Compression::fast(),
615 );
616 let mut buffer = Vec::new();
617 gz.read_to_end(&mut buffer).unwrap();
618
619 test_event(
620 None,
621 Some("gzip"),
622 None,
623 None,
624 buffer,
625 logs,
626 Delivered,
627 false,
628 DeserializerConfig::Bytes,
629 None,
630 )
631 .await;
632 }
633
634 #[tokio::test]
635 async fn s3_process_message_multipart_gzip() {
636 use std::io::Read;
637
638 trace_init();
639
640 let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
641
642 let buffer = {
643 let mut file =
644 File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
645 let mut data = Vec::new();
646 file.read_to_end(&mut data).expect("file can be read");
647 data
648 };
649
650 test_event(
651 None,
652 Some("gzip"),
653 None,
654 None,
655 buffer,
656 logs,
657 Delivered,
658 false,
659 DeserializerConfig::Bytes,
660 None,
661 )
662 .await;
663 }
664
665 #[tokio::test]
666 async fn s3_process_message_multipart_zstd() {
667 use std::io::Read;
668
669 trace_init();
670
671 let logs = lines_from_plaintext("tests/data/multipart-zst.log");
672
673 let buffer = {
674 let mut file =
675 File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
676 let mut data = Vec::new();
677 file.read_to_end(&mut data).expect("file can be read");
678 data
679 };
680
681 test_event(
682 None,
683 Some("zstd"),
684 None,
685 None,
686 buffer,
687 logs,
688 Delivered,
689 false,
690 DeserializerConfig::Bytes,
691 None,
692 )
693 .await;
694 }
695
696 #[tokio::test]
697 async fn s3_process_message_multiline() {
698 trace_init();
699
700 let logs: Vec<String> = vec!["abc", "def", "geh"]
701 .into_iter()
702 .map(ToOwned::to_owned)
703 .collect();
704
705 test_event(
706 None,
707 None,
708 None,
709 Some(MultilineConfig {
710 start_pattern: "abc".to_owned(),
711 mode: line_agg::Mode::HaltWith,
712 condition_pattern: "geh".to_owned(),
713 timeout_ms: Duration::from_millis(1000),
714 }),
715 logs.join("\n").into_bytes(),
716 vec!["abc\ndef\ngeh".to_owned()],
717 Delivered,
718 false,
719 DeserializerConfig::Bytes,
720 None,
721 )
722 .await;
723 }
724
725 #[ignore]
728 #[tokio::test]
729 async fn handles_errored_status() {
730 trace_init();
731
732 let logs: Vec<String> = random_lines(100).take(10).collect();
733
734 test_event(
735 None,
736 None,
737 None,
738 None,
739 logs.join("\n").into_bytes(),
740 logs,
741 Errored,
742 false,
743 DeserializerConfig::Bytes,
744 None,
745 )
746 .await;
747 }
748
749 #[tokio::test]
750 async fn handles_failed_status() {
751 trace_init();
752
753 let logs: Vec<String> = random_lines(100).take(10).collect();
754
755 test_event(
756 None,
757 None,
758 None,
759 None,
760 logs.join("\n").into_bytes(),
761 logs,
762 Rejected,
763 false,
764 DeserializerConfig::Bytes,
765 None,
766 )
767 .await;
768 }
769
770 #[tokio::test]
771 async fn handles_failed_status_without_deletion() {
772 trace_init();
773
774 let logs: Vec<String> = random_lines(100).take(10).collect();
775
776 let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
777 custom_options.insert("delete_failed_message".to_string(), Box::new(false));
778
779 test_event(
780 None,
781 None,
782 None,
783 None,
784 logs.join("\n").into_bytes(),
785 logs,
786 Rejected,
787 false,
788 DeserializerConfig::Bytes,
789 Some(custom_options),
790 )
791 .await;
792 }
793
794 fn s3_address() -> String {
795 std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
796 }
797
798 fn config(
799 queue_url: &str,
800 multiline: Option<MultilineConfig>,
801 log_namespace: bool,
802 decoding: DeserializerConfig,
803 ) -> AwsS3Config {
804 AwsS3Config {
805 region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
806 strategy: Strategy::Sqs,
807 compression: Compression::Auto,
808 multiline,
809 sqs: Some(sqs::Config {
810 queue_url: queue_url.to_string(),
811 poll_secs: 1,
812 max_number_of_messages: 10,
813 visibility_timeout_secs: 0,
814 client_concurrency: None,
815 ..Default::default()
816 }),
817 acknowledgements: true.into(),
818 log_namespace: Some(log_namespace),
819 decoding,
820 ..Default::default()
821 }
822 }
823
824 #[allow(clippy::too_many_arguments)]
826 async fn test_event(
827 key: Option<String>,
828 content_encoding: Option<&str>,
829 content_type: Option<&str>,
830 multiline: Option<MultilineConfig>,
831 payload: Vec<u8>,
832 expected_lines: Vec<String>,
833 status: EventStatus,
834 log_namespace: bool,
835 decoding: DeserializerConfig,
836 custom_options: Option<HashMap<String, Box<dyn Any>>>,
837 ) {
838 assert_source_compliance(&SOURCE_TAGS, async move {
839 let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
840
841 let s3 = s3_client().await;
842 let sqs = sqs_client().await;
843
844 let queue = create_queue(&sqs).await;
845 let bucket = create_bucket(&s3).await;
846
847 tokio::time::sleep(Duration::from_secs(1)).await;
848
849 let mut config = config(&queue, multiline, log_namespace, decoding);
850
851 if let Some(false) = custom_options
852 .as_ref()
853 .and_then(|opts| opts.get("delete_failed_message"))
854 .and_then(|val| val.downcast_ref::<bool>())
855 .copied()
856 {
857 config.sqs.as_mut().unwrap().delete_failed_message = false;
858 }
859
860 s3.put_object()
861 .bucket(bucket.clone())
862 .key(key.clone())
863 .body(ByteStream::from(payload))
864 .set_content_type(content_type.map(|t| t.to_owned()))
865 .set_content_encoding(content_encoding.map(|t| t.to_owned()))
866 .send()
867 .await
868 .expect("Could not put object");
869
870 let sqs_client = sqs_client().await;
871
872 let mut s3_event: S3Event = serde_json::from_str(
873 r#"
874{
875 "Records":[
876 {
877 "eventVersion":"2.1",
878 "eventSource":"aws:s3",
879 "awsRegion":"us-east-1",
880 "eventTime":"2022-03-24T19:43:00.548Z",
881 "eventName":"ObjectCreated:Put",
882 "userIdentity":{
883 "principalId":"AWS:ARNOTAREALIDD4:user.name"
884 },
885 "requestParameters":{
886 "sourceIPAddress":"136.56.73.213"
887 },
888 "responseElements":{
889 "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
890 "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
891 },
892 "s3":{
893 "s3SchemaVersion":"1.0",
894 "configurationId":"asdfasdf",
895 "bucket":{
896 "name":"bucket-name",
897 "ownerIdentity":{
898 "principalId":"A3PEG170DF9VNQ"
899 },
900 "arn":"arn:aws:s3:::nfox-testing-vector"
901 },
902 "object":{
903 "key":"test-log.txt",
904 "size":33,
905 "eTag":"c981ce6672c4251048b0b834e334007f",
906 "sequencer":"00623CC9C47AB5634C"
907 }
908 }
909 }
910 ]
911}
912 "#,
913 )
914 .unwrap();
915
916 s3_event.records[0].s3.bucket.name.clone_from(&bucket);
917 s3_event.records[0].s3.object.key.clone_from(&key);
918
919 let _send_message_output = sqs_client
922 .send_message()
923 .queue_url(queue.clone())
924 .message_body(serde_json::to_string(&s3_event).unwrap())
925 .send()
926 .await
927 .unwrap();
928
929 let (tx, rx) = SourceSender::new_test_finalize(status);
930 let cx = SourceContext::new_test(tx, None);
931 let namespace = cx.log_namespace(Some(log_namespace));
932 let source = config.build(cx).await.unwrap();
933 tokio::spawn(async move { source.await.unwrap() });
934
935 let events = collect_n(rx, expected_lines.len()).await;
936
937 assert_eq!(expected_lines.len(), events.len());
938 for (i, event) in events.iter().enumerate() {
939
940 if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
941 schema_definition.is_valid_for_event(event).unwrap();
942 }
943
944 let message = expected_lines[i].as_str();
945
946 let log = event.as_log();
947 if log_namespace {
948 assert_eq!(log.value(), &Value::from(message));
949 } else {
950 assert_eq!(log["message"], message.into());
951 }
952 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
953 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
954 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
955 }
956
957 tokio::time::sleep(Duration::from_secs(10)).await;
961 match status {
963 Errored => {
964 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
966 }
967 Rejected if !config.sqs.unwrap().delete_failed_message => {
968 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
969 }
970 _ => {
971 assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
972 }
973 };
974 }).await;
975 }
976
977 async fn create_queue(client: &SqsClient) -> String {
981 let queue_name = uuid::Uuid::new_v4().to_string();
982
983 let res = client
984 .create_queue()
985 .queue_name(queue_name.clone())
986 .attributes(QueueAttributeName::VisibilityTimeout, "2")
987 .send()
988 .await
989 .expect("Could not create queue");
990
991 res.queue_url.expect("no queue url")
992 }
993
994 async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
996 let sqs_result = client
997 .receive_message()
998 .queue_url(queue)
999 .visibility_timeout(0)
1000 .wait_time_seconds(wait_time_seconds)
1001 .send()
1002 .await
1003 .unwrap();
1004
1005 sqs_result
1006 .messages
1007 .map(|messages| messages.len())
1008 .unwrap_or(0)
1009 }
1010
1011 async fn create_bucket(client: &S3Client) -> String {
1015 let bucket_name = uuid::Uuid::new_v4().to_string();
1016
1017 client
1018 .create_bucket()
1019 .bucket(bucket_name.clone())
1020 .send()
1021 .await
1022 .expect("Could not create bucket");
1023
1024 bucket_name
1025 }
1026
1027 async fn s3_client() -> S3Client {
1028 let auth = AwsAuthentication::test_auth();
1029 let region_endpoint = RegionOrEndpoint {
1030 region: Some("us-east-1".to_owned()),
1031 endpoint: Some(s3_address()),
1032 };
1033 let proxy_config = ProxyConfig::default();
1034 let force_path_style_value: bool = true;
1035 create_client::<S3ClientBuilder>(
1036 &S3ClientBuilder {
1037 force_path_style: Some(force_path_style_value),
1038 },
1039 &auth,
1040 region_endpoint.region(),
1041 region_endpoint.endpoint(),
1042 &proxy_config,
1043 None,
1044 None,
1045 )
1046 .await
1047 .unwrap()
1048 }
1049
1050 async fn sqs_client() -> SqsClient {
1051 let auth = AwsAuthentication::test_auth();
1052 let region_endpoint = RegionOrEndpoint {
1053 region: Some("us-east-1".to_owned()),
1054 endpoint: Some(s3_address()),
1055 };
1056 let proxy_config = ProxyConfig::default();
1057 create_client::<SqsClientBuilder>(
1058 &SqsClientBuilder {},
1059 &auth,
1060 region_endpoint.region(),
1061 region_endpoint.endpoint(),
1062 &proxy_config,
1063 None,
1064 None,
1065 )
1066 .await
1067 .unwrap()
1068 }
1069}