1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::{
9 codecs::{
10 NewlineDelimitedDecoderConfig,
11 decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
12 },
13 config::{LegacyKey, LogNamespace},
14 configurable::configurable_component,
15 lookup::owned_value_path,
16};
17use vrl::value::{Kind, kind::Collection};
18
19use super::util::MultilineConfig;
20use crate::{
21 aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
22 codecs::DecodingConfig,
23 common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
24 config::{
25 ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
26 },
27 line_agg,
28 serde::{bool_or_struct, default_decoding},
29 tls::TlsConfig,
30};
31
32pub mod sqs;
33
34#[configurable_component]
36#[configurable(metadata(docs::advanced))]
37#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39#[derivative(Default)]
40pub enum Compression {
41 #[derivative(Default)]
48 Auto,
49
50 None,
52
53 Gzip,
55
56 Zstd,
58}
59
60#[configurable_component]
62#[derive(Clone, Copy, Debug, Derivative)]
63#[serde(rename_all = "lowercase")]
64#[derivative(Default)]
65enum Strategy {
66 #[derivative(Default)]
70 Sqs,
71}
72
73#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
80#[derive(Clone, Debug, Derivative)]
81#[derivative(Default)]
82#[serde(default, deny_unknown_fields)]
83pub struct AwsS3Config {
84 #[serde(flatten)]
85 region: RegionOrEndpoint,
86
87 compression: Compression,
89
90 #[configurable(metadata(docs::hidden))]
92 strategy: Strategy,
93
94 sqs: Option<sqs::Config>,
96
97 #[configurable(deprecated)]
101 #[configurable(metadata(docs::hidden))]
102 assume_role: Option<String>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 auth: AwsAuthentication,
107
108 #[configurable(derived)]
112 multiline: Option<MultilineConfig>,
113
114 #[configurable(derived)]
115 #[serde(default, deserialize_with = "bool_or_struct")]
116 acknowledgements: SourceAcknowledgementsConfig,
117
118 #[configurable(derived)]
119 tls_options: Option<TlsConfig>,
120
121 #[configurable(metadata(docs::hidden))]
123 #[serde(default)]
124 log_namespace: Option<bool>,
125
126 #[configurable(derived)]
127 #[serde(default = "default_framing")]
128 #[derivative(Default(value = "default_framing()"))]
129 pub framing: FramingConfig,
130
131 #[configurable(derived)]
132 #[serde(default = "default_decoding")]
133 #[derivative(Default(value = "default_decoding()"))]
134 pub decoding: DeserializerConfig,
135
136 #[serde(default = "default_true")]
140 #[derivative(Default(value = "default_true()"))]
141 pub force_path_style: bool,
142}
143
144const fn default_framing() -> FramingConfig {
145 FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
147 newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
148 })
149}
150
151const fn default_true() -> bool {
152 true
153}
154
155impl_generate_config_from_default!(AwsS3Config);
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "aws_s3")]
159impl SourceConfig for AwsS3Config {
160 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
161 let log_namespace = cx.log_namespace(self.log_namespace);
162
163 let multiline_config: Option<line_agg::Config> = self
164 .multiline
165 .as_ref()
166 .map(|config| config.try_into())
167 .transpose()?;
168
169 match self.strategy {
170 Strategy::Sqs => Ok(Box::pin(
171 self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
172 .await?
173 .run(cx, self.acknowledgements, log_namespace),
174 )),
175 }
176 }
177
178 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
179 let log_namespace = global_log_namespace.merge(self.log_namespace);
180 let mut schema_definition = self
181 .decoding
182 .schema_definition(log_namespace)
183 .with_source_metadata(
184 Self::NAME,
185 Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
186 &owned_value_path!("bucket"),
187 Kind::bytes(),
188 None,
189 )
190 .with_source_metadata(
191 Self::NAME,
192 Some(LegacyKey::Overwrite(owned_value_path!("object"))),
193 &owned_value_path!("object"),
194 Kind::bytes(),
195 None,
196 )
197 .with_source_metadata(
198 Self::NAME,
199 Some(LegacyKey::Overwrite(owned_value_path!("region"))),
200 &owned_value_path!("region"),
201 Kind::bytes(),
202 None,
203 )
204 .with_source_metadata(
205 Self::NAME,
206 None,
207 &owned_value_path!("timestamp"),
208 Kind::timestamp(),
209 Some("timestamp"),
210 )
211 .with_standard_vector_source_metadata()
212 .with_source_metadata(
214 Self::NAME,
215 None,
216 &owned_value_path!("metadata"),
217 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
218 None,
219 );
220
221 if log_namespace == LogNamespace::Legacy {
223 schema_definition = schema_definition.unknown_fields(Kind::bytes());
224 }
225
226 vec![SourceOutput::new_maybe_logs(
227 self.decoding.output_type(),
228 schema_definition,
229 )]
230 }
231
232 fn can_acknowledge(&self) -> bool {
233 true
234 }
235}
236
237impl AwsS3Config {
238 async fn create_sqs_ingestor(
239 &self,
240 multiline: Option<line_agg::Config>,
241 proxy: &ProxyConfig,
242 log_namespace: LogNamespace,
243 ) -> crate::Result<sqs::Ingestor> {
244 let region = self.region.region();
245 let endpoint = self.region.endpoint();
246
247 let s3_client = create_client::<S3ClientBuilder>(
248 &S3ClientBuilder {
249 force_path_style: Some(self.force_path_style),
250 },
251 &self.auth,
252 region.clone(),
253 endpoint.clone(),
254 proxy,
255 self.tls_options.as_ref(),
256 None,
257 )
258 .await?;
259
260 let decoder =
261 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
262 .build()?;
263
264 match self.sqs {
265 Some(ref sqs) => {
266 let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
267 &SqsClientBuilder {},
268 &self.auth,
269 region.clone(),
270 endpoint,
271 proxy,
272 sqs.tls_options.as_ref(),
273 sqs.timeout.as_ref(),
274 )
275 .await?;
276
277 let ingestor = sqs::Ingestor::new(
278 region,
279 sqs_client,
280 s3_client,
281 sqs.clone(),
282 self.compression,
283 multiline,
284 decoder,
285 )
286 .await?;
287
288 Ok(ingestor)
289 }
290 None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
291 }
292 }
293}
294
295#[derive(Debug, Snafu)]
296enum CreateSqsIngestorError {
297 #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
298 ConfigMissing,
299}
300
301async fn s3_object_decoder(
303 compression: Compression,
304 key: &str,
305 content_encoding: Option<&str>,
306 content_type: Option<&str>,
307 mut body: ByteStream,
308) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
309 let first = match body.next().await {
310 Some(first) => first,
311 _ => {
312 return Box::new(tokio::io::empty());
313 }
314 };
315
316 let r = tokio::io::BufReader::new(StreamReader::new(
317 stream::iter(Some(first))
318 .chain(Box::pin(async_stream::stream! {
319 while let Some(next) = body.next().await {
320 yield next;
321 }
322 }))
323 .map_err(std::io::Error::other),
324 ));
325
326 let compression = match compression {
327 Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
328 _ => compression,
329 };
330
331 use Compression::*;
332 match compression {
333 Auto => unreachable!(), None => Box::new(r),
335 Gzip => Box::new({
336 let mut decoder = bufread::GzipDecoder::new(r);
337 decoder.multiple_members(true);
338 decoder
339 }),
340 Zstd => Box::new({
341 let mut decoder = bufread::ZstdDecoder::new(r);
342 decoder.multiple_members(true);
343 decoder
344 }),
345 }
346}
347
348fn determine_compression(
355 content_encoding: Option<&str>,
356 content_type: Option<&str>,
357 key: &str,
358) -> Option<Compression> {
359 content_encoding
360 .and_then(content_encoding_to_compression)
361 .or_else(|| content_type.and_then(content_type_to_compression))
362 .or_else(|| object_key_to_compression(key))
363}
364
365fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
366 match content_encoding {
367 "gzip" => Some(Compression::Gzip),
368 "zstd" => Some(Compression::Zstd),
369 _ => None,
370 }
371}
372
373fn content_type_to_compression(content_type: &str) -> Option<Compression> {
374 match content_type {
375 "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
376 "application/zstd" => Some(Compression::Zstd),
377 _ => None,
378 }
379}
380
381fn object_key_to_compression(key: &str) -> Option<Compression> {
382 let extension = std::path::Path::new(key)
383 .extension()
384 .and_then(std::ffi::OsStr::to_str);
385
386 use Compression::*;
387 extension.and_then(|extension| match extension {
388 "gz" => Some(Gzip),
389 "zst" => Some(Zstd),
390 _ => Option::None,
391 })
392}
393
394#[cfg(test)]
395mod test {
396 use tokio::io::AsyncReadExt;
397
398 use super::*;
399
400 #[test]
401 fn determine_compression() {
402 use super::Compression;
403
404 let cases = vec![
405 ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
406 (
407 "out.log",
408 None,
409 Some("application/gzip"),
410 Some(Compression::Gzip),
411 ),
412 ("out.log.gz", None, None, Some(Compression::Gzip)),
413 ("out.txt", None, None, None),
414 ];
415 for case in cases {
416 let (key, content_encoding, content_type, expected) = case;
417 assert_eq!(
418 super::determine_compression(content_encoding, content_type, key),
419 expected,
420 "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
421 );
422 }
423 }
424
425 #[tokio::test]
426 async fn decode_empty_message_gzip() {
427 let key = uuid::Uuid::new_v4().to_string();
428
429 let mut data = Vec::new();
430 s3_object_decoder(
431 Compression::Auto,
432 &key,
433 Some("gzip"),
434 None,
435 ByteStream::default(),
436 )
437 .await
438 .read_to_end(&mut data)
439 .await
440 .unwrap();
441
442 assert!(data.is_empty());
443 }
444}
445
446#[cfg(feature = "aws-s3-integration-tests")]
447#[cfg(test)]
448mod integration_tests {
449 use std::{
450 any::Any,
451 collections::HashMap,
452 fs::File,
453 io::{self, BufRead},
454 path::Path,
455 time::Duration,
456 };
457
458 use aws_sdk_s3::Client as S3Client;
459 use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
460 use similar_asserts::assert_eq;
461 use vector_lib::{
462 codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
463 lookup::path,
464 };
465 use vrl::value::Value;
466
467 use super::*;
468 use crate::{
469 SourceSender,
470 aws::{AwsAuthentication, RegionOrEndpoint, create_client},
471 common::sqs::SqsClientBuilder,
472 config::{ProxyConfig, SourceConfig, SourceContext},
473 event::EventStatus::{self, *},
474 line_agg,
475 sources::{
476 aws_s3::{S3ClientBuilder, sqs::S3Event},
477 util::MultilineConfig,
478 },
479 test_util::{
480 collect_n,
481 components::{SOURCE_TAGS, assert_source_compliance},
482 lines_from_gzip_file, random_lines, trace_init,
483 },
484 };
485
486 fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
487 let file = io::BufReader::new(File::open(path).unwrap());
488 file.lines().map(|x| x.unwrap()).collect()
489 }
490
491 #[tokio::test]
492 async fn s3_process_message() {
493 trace_init();
494
495 let logs: Vec<String> = random_lines(100).take(10).collect();
496
497 test_event(
498 None,
499 None,
500 None,
501 None,
502 logs.join("\n").into_bytes(),
503 logs,
504 Delivered,
505 false,
506 DeserializerConfig::Bytes,
507 None,
508 )
509 .await;
510 }
511
512 #[tokio::test]
513 async fn s3_process_json_message() {
514 trace_init();
515
516 let logs: Vec<String> = random_lines(100).take(10).collect();
517
518 let json_logs: Vec<String> = logs
519 .iter()
520 .map(|msg| {
521 format!(r#"{{"message": "{msg}"}}"#)
523 })
524 .collect();
525
526 test_event(
527 None,
528 None,
529 None,
530 None,
531 json_logs.join("\n").into_bytes(),
532 logs,
533 Delivered,
534 false,
535 DeserializerConfig::Json(JsonDeserializerConfig::default()),
536 None,
537 )
538 .await;
539 }
540
541 #[tokio::test]
542 async fn s3_process_message_with_log_namespace() {
543 trace_init();
544
545 let logs: Vec<String> = random_lines(100).take(10).collect();
546
547 test_event(
548 None,
549 None,
550 None,
551 None,
552 logs.join("\n").into_bytes(),
553 logs,
554 Delivered,
555 true,
556 DeserializerConfig::Bytes,
557 None,
558 )
559 .await;
560 }
561
562 #[tokio::test]
563 async fn s3_process_message_spaces() {
564 trace_init();
565
566 let key = "key with spaces".to_string();
567 let logs: Vec<String> = random_lines(100).take(10).collect();
568
569 test_event(
570 Some(key),
571 None,
572 None,
573 None,
574 logs.join("\n").into_bytes(),
575 logs,
576 Delivered,
577 false,
578 DeserializerConfig::Bytes,
579 None,
580 )
581 .await;
582 }
583
584 #[tokio::test]
585 async fn s3_process_message_special_characters() {
586 trace_init();
587
588 let key = format!("special:{}", uuid::Uuid::new_v4());
589 let logs: Vec<String> = random_lines(100).take(10).collect();
590
591 test_event(
592 Some(key),
593 None,
594 None,
595 None,
596 logs.join("\n").into_bytes(),
597 logs,
598 Delivered,
599 false,
600 DeserializerConfig::Bytes,
601 None,
602 )
603 .await;
604 }
605
606 #[tokio::test]
607 async fn s3_process_message_gzip() {
608 use std::io::Read;
609
610 trace_init();
611
612 let logs: Vec<String> = random_lines(100).take(10).collect();
613
614 let mut gz = flate2::read::GzEncoder::new(
615 io::Cursor::new(logs.join("\n").into_bytes()),
616 flate2::Compression::fast(),
617 );
618 let mut buffer = Vec::new();
619 gz.read_to_end(&mut buffer).unwrap();
620
621 test_event(
622 None,
623 Some("gzip"),
624 None,
625 None,
626 buffer,
627 logs,
628 Delivered,
629 false,
630 DeserializerConfig::Bytes,
631 None,
632 )
633 .await;
634 }
635
636 #[tokio::test]
637 async fn s3_process_message_multipart_gzip() {
638 use std::io::Read;
639
640 trace_init();
641
642 let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
643
644 let buffer = {
645 let mut file =
646 File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
647 let mut data = Vec::new();
648 file.read_to_end(&mut data).expect("file can be read");
649 data
650 };
651
652 test_event(
653 None,
654 Some("gzip"),
655 None,
656 None,
657 buffer,
658 logs,
659 Delivered,
660 false,
661 DeserializerConfig::Bytes,
662 None,
663 )
664 .await;
665 }
666
667 #[tokio::test]
668 async fn s3_process_message_multipart_zstd() {
669 use std::io::Read;
670
671 trace_init();
672
673 let logs = lines_from_plaintext("tests/data/multipart-zst.log");
674
675 let buffer = {
676 let mut file =
677 File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
678 let mut data = Vec::new();
679 file.read_to_end(&mut data).expect("file can be read");
680 data
681 };
682
683 test_event(
684 None,
685 Some("zstd"),
686 None,
687 None,
688 buffer,
689 logs,
690 Delivered,
691 false,
692 DeserializerConfig::Bytes,
693 None,
694 )
695 .await;
696 }
697
698 #[tokio::test]
699 async fn s3_process_message_multiline() {
700 trace_init();
701
702 let logs: Vec<String> = vec!["abc", "def", "geh"]
703 .into_iter()
704 .map(ToOwned::to_owned)
705 .collect();
706
707 test_event(
708 None,
709 None,
710 None,
711 Some(MultilineConfig {
712 start_pattern: "abc".to_owned(),
713 mode: line_agg::Mode::HaltWith,
714 condition_pattern: "geh".to_owned(),
715 timeout_ms: Duration::from_millis(1000),
716 }),
717 logs.join("\n").into_bytes(),
718 vec!["abc\ndef\ngeh".to_owned()],
719 Delivered,
720 false,
721 DeserializerConfig::Bytes,
722 None,
723 )
724 .await;
725 }
726
727 #[ignore]
730 #[tokio::test]
731 async fn handles_errored_status() {
732 trace_init();
733
734 let logs: Vec<String> = random_lines(100).take(10).collect();
735
736 test_event(
737 None,
738 None,
739 None,
740 None,
741 logs.join("\n").into_bytes(),
742 logs,
743 Errored,
744 false,
745 DeserializerConfig::Bytes,
746 None,
747 )
748 .await;
749 }
750
751 #[tokio::test]
752 async fn handles_failed_status() {
753 trace_init();
754
755 let logs: Vec<String> = random_lines(100).take(10).collect();
756
757 test_event(
758 None,
759 None,
760 None,
761 None,
762 logs.join("\n").into_bytes(),
763 logs,
764 Rejected,
765 false,
766 DeserializerConfig::Bytes,
767 None,
768 )
769 .await;
770 }
771
772 #[tokio::test]
773 async fn handles_failed_status_without_deletion() {
774 trace_init();
775
776 let logs: Vec<String> = random_lines(100).take(10).collect();
777
778 let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
779 custom_options.insert("delete_failed_message".to_string(), Box::new(false));
780
781 test_event(
782 None,
783 None,
784 None,
785 None,
786 logs.join("\n").into_bytes(),
787 logs,
788 Rejected,
789 false,
790 DeserializerConfig::Bytes,
791 Some(custom_options),
792 )
793 .await;
794 }
795
796 fn s3_address() -> String {
797 std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
798 }
799
800 fn config(
801 queue_url: &str,
802 multiline: Option<MultilineConfig>,
803 log_namespace: bool,
804 decoding: DeserializerConfig,
805 ) -> AwsS3Config {
806 AwsS3Config {
807 region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
808 strategy: Strategy::Sqs,
809 compression: Compression::Auto,
810 multiline,
811 sqs: Some(sqs::Config {
812 queue_url: queue_url.to_string(),
813 poll_secs: 1,
814 max_number_of_messages: 10,
815 visibility_timeout_secs: 0,
816 client_concurrency: None,
817 ..Default::default()
818 }),
819 acknowledgements: true.into(),
820 log_namespace: Some(log_namespace),
821 decoding,
822 ..Default::default()
823 }
824 }
825
826 #[allow(clippy::too_many_arguments)]
828 async fn test_event(
829 key: Option<String>,
830 content_encoding: Option<&str>,
831 content_type: Option<&str>,
832 multiline: Option<MultilineConfig>,
833 payload: Vec<u8>,
834 expected_lines: Vec<String>,
835 status: EventStatus,
836 log_namespace: bool,
837 decoding: DeserializerConfig,
838 custom_options: Option<HashMap<String, Box<dyn Any>>>,
839 ) {
840 assert_source_compliance(&SOURCE_TAGS, async move {
841 let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
842
843 let s3 = s3_client().await;
844 let sqs = sqs_client().await;
845
846 let queue = create_queue(&sqs).await;
847 let bucket = create_bucket(&s3).await;
848
849 tokio::time::sleep(Duration::from_secs(1)).await;
850
851 let mut config = config(&queue, multiline, log_namespace, decoding);
852
853 if let Some(false) = custom_options
854 .as_ref()
855 .and_then(|opts| opts.get("delete_failed_message"))
856 .and_then(|val| val.downcast_ref::<bool>())
857 .copied()
858 {
859 config.sqs.as_mut().unwrap().delete_failed_message = false;
860 }
861
862 s3.put_object()
863 .bucket(bucket.clone())
864 .key(key.clone())
865 .body(ByteStream::from(payload))
866 .set_content_type(content_type.map(|t| t.to_owned()))
867 .set_content_encoding(content_encoding.map(|t| t.to_owned()))
868 .send()
869 .await
870 .expect("Could not put object");
871
872 let sqs_client = sqs_client().await;
873
874 let mut s3_event: S3Event = serde_json::from_str(
875 r#"
876{
877 "Records":[
878 {
879 "eventVersion":"2.1",
880 "eventSource":"aws:s3",
881 "awsRegion":"us-east-1",
882 "eventTime":"2022-03-24T19:43:00.548Z",
883 "eventName":"ObjectCreated:Put",
884 "userIdentity":{
885 "principalId":"AWS:ARNOTAREALIDD4:user.name"
886 },
887 "requestParameters":{
888 "sourceIPAddress":"136.56.73.213"
889 },
890 "responseElements":{
891 "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
892 "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
893 },
894 "s3":{
895 "s3SchemaVersion":"1.0",
896 "configurationId":"asdfasdf",
897 "bucket":{
898 "name":"bucket-name",
899 "ownerIdentity":{
900 "principalId":"A3PEG170DF9VNQ"
901 },
902 "arn":"arn:aws:s3:::nfox-testing-vector"
903 },
904 "object":{
905 "key":"test-log.txt",
906 "size":33,
907 "eTag":"c981ce6672c4251048b0b834e334007f",
908 "sequencer":"00623CC9C47AB5634C"
909 }
910 }
911 }
912 ]
913}
914 "#,
915 )
916 .unwrap();
917
918 s3_event.records[0].s3.bucket.name.clone_from(&bucket);
919 s3_event.records[0].s3.object.key.clone_from(&key);
920
921 let _send_message_output = sqs_client
924 .send_message()
925 .queue_url(queue.clone())
926 .message_body(serde_json::to_string(&s3_event).unwrap())
927 .send()
928 .await
929 .unwrap();
930
931 let (tx, rx) = SourceSender::new_test_finalize(status);
932 let cx = SourceContext::new_test(tx, None);
933 let namespace = cx.log_namespace(Some(log_namespace));
934 let source = config.build(cx).await.unwrap();
935 tokio::spawn(async move { source.await.unwrap() });
936
937 let events = collect_n(rx, expected_lines.len()).await;
938
939 assert_eq!(expected_lines.len(), events.len());
940 for (i, event) in events.iter().enumerate() {
941
942 if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
943 schema_definition.is_valid_for_event(event).unwrap();
944 }
945
946 let message = expected_lines[i].as_str();
947
948 let log = event.as_log();
949 if log_namespace {
950 assert_eq!(log.value(), &Value::from(message));
951 } else {
952 assert_eq!(log["message"], message.into());
953 }
954 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
955 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
956 assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
957 }
958
959 tokio::time::sleep(Duration::from_secs(10)).await;
963 match status {
965 Errored => {
966 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
968 }
969 Rejected if !config.sqs.unwrap().delete_failed_message => {
970 assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
971 }
972 _ => {
973 assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
974 }
975 };
976 }).await;
977 }
978
979 async fn create_queue(client: &SqsClient) -> String {
983 let queue_name = uuid::Uuid::new_v4().to_string();
984
985 let res = client
986 .create_queue()
987 .queue_name(queue_name.clone())
988 .attributes(QueueAttributeName::VisibilityTimeout, "2")
989 .send()
990 .await
991 .expect("Could not create queue");
992
993 res.queue_url.expect("no queue url")
994 }
995
996 async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
998 let sqs_result = client
999 .receive_message()
1000 .queue_url(queue)
1001 .visibility_timeout(0)
1002 .wait_time_seconds(wait_time_seconds)
1003 .send()
1004 .await
1005 .unwrap();
1006
1007 sqs_result
1008 .messages
1009 .map(|messages| messages.len())
1010 .unwrap_or(0)
1011 }
1012
1013 async fn create_bucket(client: &S3Client) -> String {
1017 let bucket_name = uuid::Uuid::new_v4().to_string();
1018
1019 client
1020 .create_bucket()
1021 .bucket(bucket_name.clone())
1022 .send()
1023 .await
1024 .expect("Could not create bucket");
1025
1026 bucket_name
1027 }
1028
1029 async fn s3_client() -> S3Client {
1030 let auth = AwsAuthentication::test_auth();
1031 let region_endpoint = RegionOrEndpoint {
1032 region: Some("us-east-1".to_owned()),
1033 endpoint: Some(s3_address()),
1034 };
1035 let proxy_config = ProxyConfig::default();
1036 let force_path_style_value: bool = true;
1037 create_client::<S3ClientBuilder>(
1038 &S3ClientBuilder {
1039 force_path_style: Some(force_path_style_value),
1040 },
1041 &auth,
1042 region_endpoint.region(),
1043 region_endpoint.endpoint(),
1044 &proxy_config,
1045 None,
1046 None,
1047 )
1048 .await
1049 .unwrap()
1050 }
1051
1052 async fn sqs_client() -> SqsClient {
1053 let auth = AwsAuthentication::test_auth();
1054 let region_endpoint = RegionOrEndpoint {
1055 region: Some("us-east-1".to_owned()),
1056 endpoint: Some(s3_address()),
1057 };
1058 let proxy_config = ProxyConfig::default();
1059 create_client::<SqsClientBuilder>(
1060 &SqsClientBuilder {},
1061 &auth,
1062 region_endpoint.region(),
1063 region_endpoint.endpoint(),
1064 &proxy_config,
1065 None,
1066 None,
1067 )
1068 .await
1069 .unwrap()
1070 }
1071}