1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{stream, stream::StreamExt, TryStreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::codecs::decoding::{
9 DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions,
10};
11use vector_lib::codecs::NewlineDelimitedDecoderConfig;
12use vector_lib::config::{LegacyKey, LogNamespace};
13use vector_lib::configurable::configurable_component;
14use vector_lib::lookup::owned_value_path;
15use vrl::value::{kind::Collection, Kind};
16
17use super::util::MultilineConfig;
18use crate::codecs::DecodingConfig;
19use crate::{
20 aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint},
21 common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
22 config::{
23 ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
24 },
25 line_agg,
26 serde::{bool_or_struct, default_decoding},
27 tls::TlsConfig,
28};
29
30pub mod sqs;
31
32#[configurable_component]
34#[configurable(metadata(docs::advanced))]
35#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
36#[serde(rename_all = "lowercase")]
37#[derivative(Default)]
38pub enum Compression {
39 #[derivative(Default)]
46 Auto,
47
48 None,
50
51 Gzip,
53
54 Zstd,
56}
57
58#[configurable_component]
60#[derive(Clone, Copy, Debug, Derivative)]
61#[serde(rename_all = "lowercase")]
62#[derivative(Default)]
63enum Strategy {
64 #[derivative(Default)]
68 Sqs,
69}
70
71#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
78#[derive(Clone, Debug, Derivative)]
79#[derivative(Default)]
80#[serde(default, deny_unknown_fields)]
81pub struct AwsS3Config {
82 #[serde(flatten)]
83 region: RegionOrEndpoint,
84
85 compression: Compression,
87
88 #[configurable(metadata(docs::hidden))]
90 strategy: Strategy,
91
92 sqs: Option<sqs::Config>,
94
95 #[configurable(deprecated)]
99 #[configurable(metadata(docs::hidden))]
100 assume_role: Option<String>,
101
102 #[configurable(derived)]
103 #[serde(default)]
104 auth: AwsAuthentication,
105
106 #[configurable(derived)]
110 multiline: Option<MultilineConfig>,
111
112 #[configurable(derived)]
113 #[serde(default, deserialize_with = "bool_or_struct")]
114 acknowledgements: SourceAcknowledgementsConfig,
115
116 #[configurable(derived)]
117 tls_options: Option<TlsConfig>,
118
119 #[configurable(metadata(docs::hidden))]
121 #[serde(default)]
122 log_namespace: Option<bool>,
123
124 #[configurable(derived)]
125 #[serde(default = "default_framing")]
126 #[derivative(Default(value = "default_framing()"))]
127 pub framing: FramingConfig,
128
129 #[configurable(derived)]
130 #[serde(default = "default_decoding")]
131 #[derivative(Default(value = "default_decoding()"))]
132 pub decoding: DeserializerConfig,
133
134 #[serde(default = "default_true")]
138 #[derivative(Default(value = "default_true()"))]
139 pub force_path_style: bool,
140}
141
142const fn default_framing() -> FramingConfig {
143 FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
145 newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
146 })
147}
148
149const fn default_true() -> bool {
150 true
151}
152
153impl_generate_config_from_default!(AwsS3Config);
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "aws_s3")]
157impl SourceConfig for AwsS3Config {
158 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159 let log_namespace = cx.log_namespace(self.log_namespace);
160
161 let multiline_config: Option<line_agg::Config> = self
162 .multiline
163 .as_ref()
164 .map(|config| config.try_into())
165 .transpose()?;
166
167 match self.strategy {
168 Strategy::Sqs => Ok(Box::pin(
169 self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
170 .await?
171 .run(cx, self.acknowledgements, log_namespace),
172 )),
173 }
174 }
175
176 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177 let log_namespace = global_log_namespace.merge(self.log_namespace);
178 let mut schema_definition = self
179 .decoding
180 .schema_definition(log_namespace)
181 .with_source_metadata(
182 Self::NAME,
183 Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
184 &owned_value_path!("bucket"),
185 Kind::bytes(),
186 None,
187 )
188 .with_source_metadata(
189 Self::NAME,
190 Some(LegacyKey::Overwrite(owned_value_path!("object"))),
191 &owned_value_path!("object"),
192 Kind::bytes(),
193 None,
194 )
195 .with_source_metadata(
196 Self::NAME,
197 Some(LegacyKey::Overwrite(owned_value_path!("region"))),
198 &owned_value_path!("region"),
199 Kind::bytes(),
200 None,
201 )
202 .with_source_metadata(
203 Self::NAME,
204 None,
205 &owned_value_path!("timestamp"),
206 Kind::timestamp(),
207 Some("timestamp"),
208 )
209 .with_standard_vector_source_metadata()
210 .with_source_metadata(
212 Self::NAME,
213 None,
214 &owned_value_path!("metadata"),
215 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
216 None,
217 );
218
219 if log_namespace == LogNamespace::Legacy {
221 schema_definition = schema_definition.unknown_fields(Kind::bytes());
222 }
223
224 vec![SourceOutput::new_maybe_logs(
225 self.decoding.output_type(),
226 schema_definition,
227 )]
228 }
229
230 fn can_acknowledge(&self) -> bool {
231 true
232 }
233}
234
235impl AwsS3Config {
236 async fn create_sqs_ingestor(
237 &self,
238 multiline: Option<line_agg::Config>,
239 proxy: &ProxyConfig,
240 log_namespace: LogNamespace,
241 ) -> crate::Result<sqs::Ingestor> {
242 let region = self.region.region();
243 let endpoint = self.region.endpoint();
244
245 let s3_client = create_client::<S3ClientBuilder>(
246 &S3ClientBuilder {
247 force_path_style: Some(self.force_path_style),
248 },
249 &self.auth,
250 region.clone(),
251 endpoint.clone(),
252 proxy,
253 self.tls_options.as_ref(),
254 None,
255 )
256 .await?;
257
258 let decoder =
259 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
260 .build()?;
261
262 match self.sqs {
263 Some(ref sqs) => {
264 let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
265 &SqsClientBuilder {},
266 &self.auth,
267 region.clone(),
268 endpoint,
269 proxy,
270 sqs.tls_options.as_ref(),
271 sqs.timeout.as_ref(),
272 )
273 .await?;
274
275 let ingestor = sqs::Ingestor::new(
276 region,
277 sqs_client,
278 s3_client,
279 sqs.clone(),
280 self.compression,
281 multiline,
282 decoder,
283 )
284 .await?;
285
286 Ok(ingestor)
287 }
288 None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
289 }
290 }
291}
292
293#[derive(Debug, Snafu)]
294enum CreateSqsIngestorError {
295 #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
296 ConfigMissing,
297}
298
299async fn s3_object_decoder(
301 compression: Compression,
302 key: &str,
303 content_encoding: Option<&str>,
304 content_type: Option<&str>,
305 mut body: ByteStream,
306) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
307 let first = match body.next().await {
308 Some(first) => first,
309 _ => {
310 return Box::new(tokio::io::empty());
311 }
312 };
313
314 let r = tokio::io::BufReader::new(StreamReader::new(
315 stream::iter(Some(first))
316 .chain(Box::pin(async_stream::stream! {
317 while let Some(next) = body.next().await {
318 yield next;
319 }
320 }))
321 .map_err(std::io::Error::other),
322 ));
323
324 let compression = match compression {
325 Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
326 _ => compression,
327 };
328
329 use Compression::*;
330 match compression {
331 Auto => unreachable!(), None => Box::new(r),
333 Gzip => Box::new({
334 let mut decoder = bufread::GzipDecoder::new(r);
335 decoder.multiple_members(true);
336 decoder
337 }),
338 Zstd => Box::new({
339 let mut decoder = bufread::ZstdDecoder::new(r);
340 decoder.multiple_members(true);
341 decoder
342 }),
343 }
344}
345
346fn determine_compression(
353 content_encoding: Option<&str>,
354 content_type: Option<&str>,
355 key: &str,
356) -> Option<Compression> {
357 content_encoding
358 .and_then(content_encoding_to_compression)
359 .or_else(|| content_type.and_then(content_type_to_compression))
360 .or_else(|| object_key_to_compression(key))
361}
362
363fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
364 match content_encoding {
365 "gzip" => Some(Compression::Gzip),
366 "zstd" => Some(Compression::Zstd),
367 _ => None,
368 }
369}
370
371fn content_type_to_compression(content_type: &str) -> Option<Compression> {
372 match content_type {
373 "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
374 "application/zstd" => Some(Compression::Zstd),
375 _ => None,
376 }
377}
378
379fn object_key_to_compression(key: &str) -> Option<Compression> {
380 let extension = std::path::Path::new(key)
381 .extension()
382 .and_then(std::ffi::OsStr::to_str);
383
384 use Compression::*;
385 extension.and_then(|extension| match extension {
386 "gz" => Some(Gzip),
387 "zst" => Some(Zstd),
388 _ => Option::None,
389 })
390}
391
392#[cfg(test)]
393mod test {
394 use tokio::io::AsyncReadExt;
395
396 use super::*;
397
398 #[test]
399 fn determine_compression() {
400 use super::Compression;
401
402 let cases = vec![
403 ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
404 (
405 "out.log",
406 None,
407 Some("application/gzip"),
408 Some(Compression::Gzip),
409 ),
410 ("out.log.gz", None, None, Some(Compression::Gzip)),
411 ("out.txt", None, None, None),
412 ];
413 for case in cases {
414 let (key, content_encoding, content_type, expected) = case;
415 assert_eq!(
416 super::determine_compression(content_encoding, content_type, key),
417 expected,
418 "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
419 );
420 }
421 }
422
423 #[tokio::test]
424 async fn decode_empty_message_gzip() {
425 let key = uuid::Uuid::new_v4().to_string();
426
427 let mut data = Vec::new();
428 s3_object_decoder(
429 Compression::Auto,
430 &key,
431 Some("gzip"),
432 None,
433 ByteStream::default(),
434 )
435 .await
436 .read_to_end(&mut data)
437 .await
438 .unwrap();
439
440 assert!(data.is_empty());
441 }
442}
443
444#[cfg(feature = "aws-s3-integration-tests")]
445#[cfg(test)]
446mod integration_tests {
447 use std::{
448 any::Any,
449 collections::HashMap,
450 fs::File,
451 io::{self, BufRead},
452 path::Path,
453 time::Duration,
454 };
455
456 use aws_sdk_s3::Client as S3Client;
457 use aws_sdk_sqs::{types::QueueAttributeName, Client as SqsClient};
458 use similar_asserts::assert_eq;
459 use vector_lib::codecs::{decoding::DeserializerConfig, JsonDeserializerConfig};
460 use vector_lib::lookup::path;
461 use vrl::value::Value;
462
463 use super::*;
464 use crate::{
465 aws::{create_client, AwsAuthentication, RegionOrEndpoint},
466 common::sqs::SqsClientBuilder,
467 config::{ProxyConfig, SourceConfig, SourceContext},
468 event::EventStatus::{self, *},
469 line_agg,
470 sources::{
471 aws_s3::{sqs::S3Event, S3ClientBuilder},
472 util::MultilineConfig,
473 },
474 test_util::{
475 collect_n,
476 components::{assert_source_compliance, SOURCE_TAGS},
477 lines_from_gzip_file, random_lines, trace_init,
478 },
479 SourceSender,
480 };
481
482 fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
483 let file = io::BufReader::new(File::open(path).unwrap());
484 file.lines().map(|x| x.unwrap()).collect()
485 }
486
487 #[tokio::test]
488 async fn s3_process_message() {
489 trace_init();
490
491 let logs: Vec<String> = random_lines(100).take(10).collect();
492
493 test_event(
494 None,
495 None,
496 None,
497 None,
498 logs.join("\n").into_bytes(),
499 logs,
500 Delivered,
501 false,
502 DeserializerConfig::Bytes,
503 None,
504 )
505 .await;
506 }
507
508 #[tokio::test]
509 async fn s3_process_json_message() {
510 trace_init();
511
512 let logs: Vec<String> = random_lines(100).take(10).collect();
513
514 let json_logs: Vec<String> = logs
515 .iter()
516 .map(|msg| {
517 format!(r#"{{"message": "{msg}"}}"#)
519 })
520 .collect();
521
522 test_event(
523 None,
524 None,
525 None,
526 None,
527 json_logs.join("\n").into_bytes(),
528 logs,
529 Delivered,
530 false,
531 DeserializerConfig::Json(JsonDeserializerConfig::default()),
532 None,
533 )
534 .await;
535 }
536
537 #[tokio::test]
538 async fn s3_process_message_with_log_namespace() {
539 trace_init();
540
541 let logs: Vec<String> = random_lines(100).take(10).collect();
542
543 test_event(
544 None,
545 None,
546 None,
547 None,
548 logs.join("\n").into_bytes(),
549 logs,
550 Delivered,
551 true,
552 DeserializerConfig::Bytes,
553 None,
554 )
555 .await;
556 }
557
558 #[tokio::test]
559 async fn s3_process_message_spaces() {
560 trace_init();
561
562 let key = "key with spaces".to_string();
563 let logs: Vec<String> = random_lines(100).take(10).collect();
564
565 test_event(
566 Some(key),
567 None,
568 None,
569 None,
570 logs.join("\n").into_bytes(),
571 logs,
572 Delivered,
573 false,
574 DeserializerConfig::Bytes,
575 None,
576 )
577 .await;
578 }
579
580 #[tokio::test]
581 async fn s3_process_message_special_characters() {
582 trace_init();
583
584 let key = format!("special:{}", uuid::Uuid::new_v4());
585 let logs: Vec<String> = random_lines(100).take(10).collect();
586
587 test_event(
588 Some(key),
589 None,
590 None,
591 None,
592 logs.join("\n").into_bytes(),
593 logs,
594 Delivered,
595 false,
596 DeserializerConfig::Bytes,
597 None,
598 )
599 .await;
600 }
601
602 #[tokio::test]
603 async fn s3_process_message_gzip() {
604 use std::io::Read;
605
606 trace_init();
607
608 let logs: Vec<String> = random_lines(100).take(10).collect();
609
610 let mut gz = flate2::read::GzEncoder::new(
611 io::Cursor::new(logs.join("\n").into_bytes()),
612 flate2::Compression::fast(),
613 );
614 let mut buffer = Vec::new();
615 gz.read_to_end(&mut buffer).unwrap();
616
617 test_event(
618 None,
619 Some("gzip"),
620 None,
621 None,
622 buffer,
623 logs,
624 Delivered,
625 false,
626 DeserializerConfig::Bytes,
627 None,
628 )
629 .await;
630 }
631
632 #[tokio::test]
633 async fn s3_process_message_multipart_gzip() {
634 use std::io::Read;
635
636 trace_init();
637
638 let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
639
640 let buffer = {
641 let mut file =
642 File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
643 let mut data = Vec::new();
644 file.read_to_end(&mut data).expect("file can be read");
645 data
646 };
647
648 test_event(
649 None,
650 Some("gzip"),
651 None,
652 None,
653 buffer,
654 logs,
655 Delivered,
656 false,
657 DeserializerConfig::Bytes,
658 None,
659 )
660 .await;
661 }
662
663 #[tokio::test]
664 async fn s3_process_message_multipart_zstd() {
665 use std::io::Read;
666
667 trace_init();
668
669 let logs = lines_from_plaintext("tests/data/multipart-zst.log");
670
671 let buffer = {
672 let mut file =
673 File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
674 let mut data = Vec::new();
675 file.read_to_end(&mut data).expect("file can be read");
676 data
677 };
678
679 test_event(
680 None,
681 Some("zstd"),
682 None,
683 None,
684 buffer,
685 logs,
686 Delivered,
687 false,
688 DeserializerConfig::Bytes,
689 None,
690 )
691 .await;
692 }
693
694 #[tokio::test]
695 async fn s3_process_message_multiline() {
696 trace_init();
697
698 let logs: Vec<String> = vec!["abc", "def", "geh"]
699 .into_iter()
700 .map(ToOwned::to_owned)
701 .collect();
702
703 test_event(
704 None,
705 None,
706 None,
707 Some(MultilineConfig {
708 start_pattern: "abc".to_owned(),
709 mode: line_agg::Mode::HaltWith,
710 condition_pattern: "geh".to_owned(),
711 timeout_ms: Duration::from_millis(1000),
712 }),
713 logs.join("\n").into_bytes(),
714 vec!["abc\ndef\ngeh".to_owned()],
715 Delivered,
716 false,
717 DeserializerConfig::Bytes,
718 None,
719 )
720 .await;
721 }
722
723 #[ignore]
726 #[tokio::test]
727 async fn handles_errored_status() {
728 trace_init();
729
730 let logs: Vec<String> = random_lines(100).take(10).collect();
731
732 test_event(
733 None,
734 None,
735 None,
736 None,
737 logs.join("\n").into_bytes(),
738 logs,
739 Errored,
740 false,
741 DeserializerConfig::Bytes,
742 None,
743 )
744 .await;
745 }
746
747 #[tokio::test]
748 async fn handles_failed_status() {
749 trace_init();
750
751 let logs: Vec<String> = random_lines(100).take(10).collect();
752
753 test_event(
754 None,
755 None,
756 None,
757 None,
758 logs.join("\n").into_bytes(),
759 logs,
760 Rejected,
761 false,
762 DeserializerConfig::Bytes,
763 None,
764 )
765 .await;
766 }
767
768 #[tokio::test]
769 async fn handles_failed_status_without_deletion() {
770 trace_init();
771
772 let logs: Vec<String> = random_lines(100).take(10).collect();
773
774 let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
775 custom_options.insert("delete_failed_message".to_string(), Box::new(false));
776
777 test_event(
778 None,
779 None,
780 None,
781 None,
782 logs.join("\n").into_bytes(),
783 logs,
784 Rejected,
785 false,
786 DeserializerConfig::Bytes,
787 Some(custom_options),
788 )
789 .await;
790 }
791
792 fn s3_address() -> String {
793 std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
794 }
795
796 fn config(
797 queue_url: &str,
798 multiline: Option<MultilineConfig>,
799 log_namespace: bool,
800 decoding: DeserializerConfig,
801 ) -> AwsS3Config {
802 AwsS3Config {
803 region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
804 strategy: Strategy::Sqs,
805 compression: Compression::Auto,
806 multiline,
807 sqs: Some(sqs::Config {
808 queue_url: queue_url.to_string(),
809 poll_secs: 1,
810 max_number_of_messages: 10,
811 visibility_timeout_secs: 0,
812 client_concurrency: None,
813 ..Default::default()
814 }),
815 acknowledgements: true.into(),
816 log_namespace: Some(log_namespace),
817 decoding,
818 ..Default::default()
819 }
820 }
821
822 #[allow(clippy::too_many_arguments)]
824 async fn test_event(
825 key: Option<String>,
826 content_encoding: Option<&str>,
827 content_type: Option<&str>,
828 multiline: Option<MultilineConfig>,
829 payload: Vec<u8>,
830 expected_lines: Vec<String>,
831 status: EventStatus,
832 log_namespace: bool,
833 decoding: DeserializerConfig,
834 custom_options: Option<HashMap<String, Box<dyn Any>>>,
835 ) {
836 assert_source_compliance(&SOURCE_TAGS, async move {
837 let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
838
839 let s3 = s3_client().await;
840 let sqs = sqs_client().await;
841
842 let queue = create_queue(&sqs).await;
843 let bucket = create_bucket(&s3).await;
844
845 tokio::time::sleep(Duration::from_secs(1)).await;
846
847 let mut config = config(&queue, multiline, log_namespace, decoding);
848
849 if let Some(false) = custom_options
850 .as_ref()
851 .and_then(|opts| opts.get("delete_failed_message"))
852 .and_then(|val| val.downcast_ref::<bool>())
853 .copied()
854 {
855 config.sqs.as_mut().unwrap().delete_failed_message = false;
856 }
857
858 s3.put_object()
859 .bucket(bucket.clone())
860 .key(key.clone())
861 .body(ByteStream::from(payload))
862 .set_content_type(content_type.map(|t| t.to_owned()))
863 .set_content_encoding(content_encoding.map(|t| t.to_owned()))
864 .send()
865 .await
866 .expect("Could not put object");
867
868 let sqs_client = sqs_client().await;
869
870 let mut s3_event: S3Event = serde_json::from_str(
871 r#"
872{
873 "Records":[
874 {
875 "eventVersion":"2.1",
876 "eventSource":"aws:s3",
877 "awsRegion":"us-east-1",
878 "eventTime":"2022-03-24T19:43:00.548Z",
879 "eventName":"ObjectCreated:Put",
880 "userIdentity":{
881 "principalId":"AWS:ARNOTAREALIDD4:user.name"
882 },
883 "requestParameters":{
884 "sourceIPAddress":"136.56.73.213"
885 },
886 "responseElements":{
887 "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
888 "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
889 },
890 "s3":{
891 "s3SchemaVersion":"1.0",
892 "configurationId":"asdfasdf",
893 "bucket":{
894 "name":"bucket-name",
895 "ownerIdentity":{
896 "principalId":"A3PEG170DF9VNQ"
897 },
898 "arn":"arn:aws:s3:::nfox-testing-vector"
899 },
900 "object":{
901 "key":"test-log.txt",
902 "size":33,
903 "eTag":"c981ce6672c4251048b0b834e334007f",
904 "sequencer":"00623CC9C47AB5634C"
905 }
906 }
907 }
908 ]
909}
910 "#,
911 )
912 .unwrap();
913
914 s3_event.records[0].s3.bucket.name.clone_from(&bucket);
915 s3_event.records[0].s3.object.key.clone_from(&key);
916
917 let _send_message_output = sqs_client
920 .send_message()
921 .queue_url(queue.clone())
922 .message_body(serde_json::to_string(&s3_event).unwrap())
923 .send()
924 .await
925 .unwrap();
926
927 let (tx, rx) = SourceSender::new_test_finalize(status);
928 let cx = SourceContext::new_test(tx, None);
929 let namespace = cx.log_namespace(Some(log_namespace));
930 let source = config.build(cx).await.unwrap();
931 tokio::spawn(async move { source.await.unwrap() });
932
933 let events = collect_n(rx, expected_lines.len()).await;
934
935 assert_eq!(expected_lines.len(), events.len());
936 for (i, event) in events.iter().enumerate() {
937
938 if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
939 schema_definition.is_valid_for_event(event).unwrap();
940 }
941
942 let message = expected_lines[i].as_str();
943
944 let log = event.as_log();
945 if log_namespace {
946 assert_eq!(log.value(), &Value::from(message));
947 } else {
948 assert_eq!(log["message"], message.into());
949 }
950 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
951 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
952 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
953 }
954
955 tokio::time::sleep(Duration::from_secs(10)).await;
959 match status {
961 Errored => {
962 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
964 }
965 Rejected if !config.sqs.unwrap().delete_failed_message => {
966 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
967 }
968 _ => {
969 assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
970 }
971 };
972 }).await;
973 }
974
975 async fn create_queue(client: &SqsClient) -> String {
979 let queue_name = uuid::Uuid::new_v4().to_string();
980
981 let res = client
982 .create_queue()
983 .queue_name(queue_name.clone())
984 .attributes(QueueAttributeName::VisibilityTimeout, "2")
985 .send()
986 .await
987 .expect("Could not create queue");
988
989 res.queue_url.expect("no queue url")
990 }
991
992 async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
994 let sqs_result = client
995 .receive_message()
996 .queue_url(queue)
997 .visibility_timeout(0)
998 .wait_time_seconds(wait_time_seconds)
999 .send()
1000 .await
1001 .unwrap();
1002
1003 sqs_result
1004 .messages
1005 .map(|messages| messages.len())
1006 .unwrap_or(0)
1007 }
1008
1009 async fn create_bucket(client: &S3Client) -> String {
1013 let bucket_name = uuid::Uuid::new_v4().to_string();
1014
1015 client
1016 .create_bucket()
1017 .bucket(bucket_name.clone())
1018 .send()
1019 .await
1020 .expect("Could not create bucket");
1021
1022 bucket_name
1023 }
1024
1025 async fn s3_client() -> S3Client {
1026 let auth = AwsAuthentication::test_auth();
1027 let region_endpoint = RegionOrEndpoint {
1028 region: Some("us-east-1".to_owned()),
1029 endpoint: Some(s3_address()),
1030 };
1031 let proxy_config = ProxyConfig::default();
1032 let force_path_style_value: bool = true;
1033 create_client::<S3ClientBuilder>(
1034 &S3ClientBuilder {
1035 force_path_style: Some(force_path_style_value),
1036 },
1037 &auth,
1038 region_endpoint.region(),
1039 region_endpoint.endpoint(),
1040 &proxy_config,
1041 None,
1042 None,
1043 )
1044 .await
1045 .unwrap()
1046 }
1047
1048 async fn sqs_client() -> SqsClient {
1049 let auth = AwsAuthentication::test_auth();
1050 let region_endpoint = RegionOrEndpoint {
1051 region: Some("us-east-1".to_owned()),
1052 endpoint: Some(s3_address()),
1053 };
1054 let proxy_config = ProxyConfig::default();
1055 create_client::<SqsClientBuilder>(
1056 &SqsClientBuilder {},
1057 &auth,
1058 region_endpoint.region(),
1059 region_endpoint.endpoint(),
1060 &proxy_config,
1061 None,
1062 None,
1063 )
1064 .await
1065 .unwrap()
1066 }
1067}