vector/sources/aws_s3/
mod.rs

1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{stream, stream::StreamExt, TryStreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::codecs::decoding::{
9    DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions,
10};
11use vector_lib::codecs::NewlineDelimitedDecoderConfig;
12use vector_lib::config::{LegacyKey, LogNamespace};
13use vector_lib::configurable::configurable_component;
14use vector_lib::lookup::owned_value_path;
15use vrl::value::{kind::Collection, Kind};
16
17use super::util::MultilineConfig;
18use crate::codecs::DecodingConfig;
19use crate::{
20    aws::{auth::AwsAuthentication, create_client, create_client_and_region, RegionOrEndpoint},
21    common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
22    config::{
23        ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
24    },
25    line_agg,
26    serde::{bool_or_struct, default_decoding},
27    tls::TlsConfig,
28};
29
30pub mod sqs;
31
32/// Compression scheme for objects retrieved from S3.
33#[configurable_component]
34#[configurable(metadata(docs::advanced))]
35#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
36#[serde(rename_all = "lowercase")]
37#[derivative(Default)]
38pub enum Compression {
39    /// Automatically attempt to determine the compression scheme.
40    ///
41    /// The compression scheme of the object is determined from its `Content-Encoding` and
42    /// `Content-Type` metadata, as well as the key suffix (for example, `.gz`).
43    ///
44    /// It is set to `none` if the compression scheme cannot be determined.
45    #[derivative(Default)]
46    Auto,
47
48    /// Uncompressed.
49    None,
50
51    /// GZIP.
52    Gzip,
53
54    /// ZSTD.
55    Zstd,
56}
57
58/// Strategies for consuming objects from AWS S3.
59#[configurable_component]
60#[derive(Clone, Copy, Debug, Derivative)]
61#[serde(rename_all = "lowercase")]
62#[derivative(Default)]
63enum Strategy {
64    /// Consumes objects by processing bucket notification events sent to an [AWS SQS queue][aws_sqs].
65    ///
66    /// [aws_sqs]: https://aws.amazon.com/sqs/
67    #[derivative(Default)]
68    Sqs,
69}
70
71/// Configuration for the `aws_s3` source.
72// TODO: The `Default` impl here makes the configuration schema output look pretty weird, especially because all the
73// usage of optionals means we're spewing out a ton of `"foo": null` stuff in the default value, and that's not helpful
74// when there's required fields.
75//
76// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
77#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
78#[derive(Clone, Debug, Derivative)]
79#[derivative(Default)]
80#[serde(default, deny_unknown_fields)]
81pub struct AwsS3Config {
82    #[serde(flatten)]
83    region: RegionOrEndpoint,
84
85    /// The compression scheme used for decompressing objects retrieved from S3.
86    compression: Compression,
87
88    /// The strategy to use to consume objects from S3.
89    #[configurable(metadata(docs::hidden))]
90    strategy: Strategy,
91
92    /// Configuration options for SQS.
93    sqs: Option<sqs::Config>,
94
95    /// The ARN of an [IAM role][iam_role] to assume at startup.
96    ///
97    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
98    #[configurable(deprecated)]
99    #[configurable(metadata(docs::hidden))]
100    assume_role: Option<String>,
101
102    #[configurable(derived)]
103    #[serde(default)]
104    auth: AwsAuthentication,
105
106    /// Multiline aggregation configuration.
107    ///
108    /// If not specified, multiline aggregation is disabled.
109    #[configurable(derived)]
110    multiline: Option<MultilineConfig>,
111
112    #[configurable(derived)]
113    #[serde(default, deserialize_with = "bool_or_struct")]
114    acknowledgements: SourceAcknowledgementsConfig,
115
116    #[configurable(derived)]
117    tls_options: Option<TlsConfig>,
118
119    /// The namespace to use for logs. This overrides the global setting.
120    #[configurable(metadata(docs::hidden))]
121    #[serde(default)]
122    log_namespace: Option<bool>,
123
124    #[configurable(derived)]
125    #[serde(default = "default_framing")]
126    #[derivative(Default(value = "default_framing()"))]
127    pub framing: FramingConfig,
128
129    #[configurable(derived)]
130    #[serde(default = "default_decoding")]
131    #[derivative(Default(value = "default_decoding()"))]
132    pub decoding: DeserializerConfig,
133
134    /// Specifies which addressing style to use.
135    ///
136    /// This controls whether the bucket name is in the hostname, or part of the URL.
137    #[serde(default = "default_true")]
138    #[derivative(Default(value = "default_true()"))]
139    pub force_path_style: bool,
140}
141
142const fn default_framing() -> FramingConfig {
143    // This is used for backwards compatibility. It used to be the only (hardcoded) option.
144    FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
145        newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
146    })
147}
148
149const fn default_true() -> bool {
150    true
151}
152
153impl_generate_config_from_default!(AwsS3Config);
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "aws_s3")]
157impl SourceConfig for AwsS3Config {
158    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159        let log_namespace = cx.log_namespace(self.log_namespace);
160
161        let multiline_config: Option<line_agg::Config> = self
162            .multiline
163            .as_ref()
164            .map(|config| config.try_into())
165            .transpose()?;
166
167        match self.strategy {
168            Strategy::Sqs => Ok(Box::pin(
169                self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
170                    .await?
171                    .run(cx, self.acknowledgements, log_namespace),
172            )),
173        }
174    }
175
176    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177        let log_namespace = global_log_namespace.merge(self.log_namespace);
178        let mut schema_definition = self
179            .decoding
180            .schema_definition(log_namespace)
181            .with_source_metadata(
182                Self::NAME,
183                Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
184                &owned_value_path!("bucket"),
185                Kind::bytes(),
186                None,
187            )
188            .with_source_metadata(
189                Self::NAME,
190                Some(LegacyKey::Overwrite(owned_value_path!("object"))),
191                &owned_value_path!("object"),
192                Kind::bytes(),
193                None,
194            )
195            .with_source_metadata(
196                Self::NAME,
197                Some(LegacyKey::Overwrite(owned_value_path!("region"))),
198                &owned_value_path!("region"),
199                Kind::bytes(),
200                None,
201            )
202            .with_source_metadata(
203                Self::NAME,
204                None,
205                &owned_value_path!("timestamp"),
206                Kind::timestamp(),
207                Some("timestamp"),
208            )
209            .with_standard_vector_source_metadata()
210            // for metadata that is added to the events dynamically from the metadata
211            .with_source_metadata(
212                Self::NAME,
213                None,
214                &owned_value_path!("metadata"),
215                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
216                None,
217            );
218
219        // for metadata that is added to the events dynamically from the metadata
220        if log_namespace == LogNamespace::Legacy {
221            schema_definition = schema_definition.unknown_fields(Kind::bytes());
222        }
223
224        vec![SourceOutput::new_maybe_logs(
225            self.decoding.output_type(),
226            schema_definition,
227        )]
228    }
229
230    fn can_acknowledge(&self) -> bool {
231        true
232    }
233}
234
235impl AwsS3Config {
236    async fn create_sqs_ingestor(
237        &self,
238        multiline: Option<line_agg::Config>,
239        proxy: &ProxyConfig,
240        log_namespace: LogNamespace,
241    ) -> crate::Result<sqs::Ingestor> {
242        let region = self.region.region();
243        let endpoint = self.region.endpoint();
244
245        let s3_client = create_client::<S3ClientBuilder>(
246            &S3ClientBuilder {
247                force_path_style: Some(self.force_path_style),
248            },
249            &self.auth,
250            region.clone(),
251            endpoint.clone(),
252            proxy,
253            self.tls_options.as_ref(),
254            None,
255        )
256        .await?;
257
258        let decoder =
259            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
260                .build()?;
261
262        match self.sqs {
263            Some(ref sqs) => {
264                let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
265                    &SqsClientBuilder {},
266                    &self.auth,
267                    region.clone(),
268                    endpoint,
269                    proxy,
270                    sqs.tls_options.as_ref(),
271                    sqs.timeout.as_ref(),
272                )
273                .await?;
274
275                let ingestor = sqs::Ingestor::new(
276                    region,
277                    sqs_client,
278                    s3_client,
279                    sqs.clone(),
280                    self.compression,
281                    multiline,
282                    decoder,
283                )
284                .await?;
285
286                Ok(ingestor)
287            }
288            None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
289        }
290    }
291}
292
293#[derive(Debug, Snafu)]
294enum CreateSqsIngestorError {
295    #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
296    ConfigMissing,
297}
298
299/// None if body is empty
300async fn s3_object_decoder(
301    compression: Compression,
302    key: &str,
303    content_encoding: Option<&str>,
304    content_type: Option<&str>,
305    mut body: ByteStream,
306) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
307    let first = match body.next().await {
308        Some(first) => first,
309        _ => {
310            return Box::new(tokio::io::empty());
311        }
312    };
313
314    let r = tokio::io::BufReader::new(StreamReader::new(
315        stream::iter(Some(first))
316            .chain(Box::pin(async_stream::stream! {
317                while let Some(next) = body.next().await {
318                    yield next;
319                }
320            }))
321            .map_err(std::io::Error::other),
322    ));
323
324    let compression = match compression {
325        Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
326        _ => compression,
327    };
328
329    use Compression::*;
330    match compression {
331        Auto => unreachable!(), // is mapped above
332        None => Box::new(r),
333        Gzip => Box::new({
334            let mut decoder = bufread::GzipDecoder::new(r);
335            decoder.multiple_members(true);
336            decoder
337        }),
338        Zstd => Box::new({
339            let mut decoder = bufread::ZstdDecoder::new(r);
340            decoder.multiple_members(true);
341            decoder
342        }),
343    }
344}
345
346// try to determine the compression given the:
347// * content-encoding
348// * content-type
349// * key name (for file extension)
350//
351// It will use this information in this order
352fn determine_compression(
353    content_encoding: Option<&str>,
354    content_type: Option<&str>,
355    key: &str,
356) -> Option<Compression> {
357    content_encoding
358        .and_then(content_encoding_to_compression)
359        .or_else(|| content_type.and_then(content_type_to_compression))
360        .or_else(|| object_key_to_compression(key))
361}
362
363fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
364    match content_encoding {
365        "gzip" => Some(Compression::Gzip),
366        "zstd" => Some(Compression::Zstd),
367        _ => None,
368    }
369}
370
371fn content_type_to_compression(content_type: &str) -> Option<Compression> {
372    match content_type {
373        "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
374        "application/zstd" => Some(Compression::Zstd),
375        _ => None,
376    }
377}
378
379fn object_key_to_compression(key: &str) -> Option<Compression> {
380    let extension = std::path::Path::new(key)
381        .extension()
382        .and_then(std::ffi::OsStr::to_str);
383
384    use Compression::*;
385    extension.and_then(|extension| match extension {
386        "gz" => Some(Gzip),
387        "zst" => Some(Zstd),
388        _ => Option::None,
389    })
390}
391
392#[cfg(test)]
393mod test {
394    use tokio::io::AsyncReadExt;
395
396    use super::*;
397
398    #[test]
399    fn determine_compression() {
400        use super::Compression;
401
402        let cases = vec![
403            ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
404            (
405                "out.log",
406                None,
407                Some("application/gzip"),
408                Some(Compression::Gzip),
409            ),
410            ("out.log.gz", None, None, Some(Compression::Gzip)),
411            ("out.txt", None, None, None),
412        ];
413        for case in cases {
414            let (key, content_encoding, content_type, expected) = case;
415            assert_eq!(
416                super::determine_compression(content_encoding, content_type, key),
417                expected,
418                "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
419            );
420        }
421    }
422
423    #[tokio::test]
424    async fn decode_empty_message_gzip() {
425        let key = uuid::Uuid::new_v4().to_string();
426
427        let mut data = Vec::new();
428        s3_object_decoder(
429            Compression::Auto,
430            &key,
431            Some("gzip"),
432            None,
433            ByteStream::default(),
434        )
435        .await
436        .read_to_end(&mut data)
437        .await
438        .unwrap();
439
440        assert!(data.is_empty());
441    }
442}
443
444#[cfg(feature = "aws-s3-integration-tests")]
445#[cfg(test)]
446mod integration_tests {
447    use std::{
448        any::Any,
449        collections::HashMap,
450        fs::File,
451        io::{self, BufRead},
452        path::Path,
453        time::Duration,
454    };
455
456    use aws_sdk_s3::Client as S3Client;
457    use aws_sdk_sqs::{types::QueueAttributeName, Client as SqsClient};
458    use similar_asserts::assert_eq;
459    use vector_lib::codecs::{decoding::DeserializerConfig, JsonDeserializerConfig};
460    use vector_lib::lookup::path;
461    use vrl::value::Value;
462
463    use super::*;
464    use crate::{
465        aws::{create_client, AwsAuthentication, RegionOrEndpoint},
466        common::sqs::SqsClientBuilder,
467        config::{ProxyConfig, SourceConfig, SourceContext},
468        event::EventStatus::{self, *},
469        line_agg,
470        sources::{
471            aws_s3::{sqs::S3Event, S3ClientBuilder},
472            util::MultilineConfig,
473        },
474        test_util::{
475            collect_n,
476            components::{assert_source_compliance, SOURCE_TAGS},
477            lines_from_gzip_file, random_lines, trace_init,
478        },
479        SourceSender,
480    };
481
482    fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
483        let file = io::BufReader::new(File::open(path).unwrap());
484        file.lines().map(|x| x.unwrap()).collect()
485    }
486
487    #[tokio::test]
488    async fn s3_process_message() {
489        trace_init();
490
491        let logs: Vec<String> = random_lines(100).take(10).collect();
492
493        test_event(
494            None,
495            None,
496            None,
497            None,
498            logs.join("\n").into_bytes(),
499            logs,
500            Delivered,
501            false,
502            DeserializerConfig::Bytes,
503            None,
504        )
505        .await;
506    }
507
508    #[tokio::test]
509    async fn s3_process_json_message() {
510        trace_init();
511
512        let logs: Vec<String> = random_lines(100).take(10).collect();
513
514        let json_logs: Vec<String> = logs
515            .iter()
516            .map(|msg| {
517                // convert to JSON object
518                format!(r#"{{"message": "{msg}"}}"#)
519            })
520            .collect();
521
522        test_event(
523            None,
524            None,
525            None,
526            None,
527            json_logs.join("\n").into_bytes(),
528            logs,
529            Delivered,
530            false,
531            DeserializerConfig::Json(JsonDeserializerConfig::default()),
532            None,
533        )
534        .await;
535    }
536
537    #[tokio::test]
538    async fn s3_process_message_with_log_namespace() {
539        trace_init();
540
541        let logs: Vec<String> = random_lines(100).take(10).collect();
542
543        test_event(
544            None,
545            None,
546            None,
547            None,
548            logs.join("\n").into_bytes(),
549            logs,
550            Delivered,
551            true,
552            DeserializerConfig::Bytes,
553            None,
554        )
555        .await;
556    }
557
558    #[tokio::test]
559    async fn s3_process_message_spaces() {
560        trace_init();
561
562        let key = "key with spaces".to_string();
563        let logs: Vec<String> = random_lines(100).take(10).collect();
564
565        test_event(
566            Some(key),
567            None,
568            None,
569            None,
570            logs.join("\n").into_bytes(),
571            logs,
572            Delivered,
573            false,
574            DeserializerConfig::Bytes,
575            None,
576        )
577        .await;
578    }
579
580    #[tokio::test]
581    async fn s3_process_message_special_characters() {
582        trace_init();
583
584        let key = format!("special:{}", uuid::Uuid::new_v4());
585        let logs: Vec<String> = random_lines(100).take(10).collect();
586
587        test_event(
588            Some(key),
589            None,
590            None,
591            None,
592            logs.join("\n").into_bytes(),
593            logs,
594            Delivered,
595            false,
596            DeserializerConfig::Bytes,
597            None,
598        )
599        .await;
600    }
601
602    #[tokio::test]
603    async fn s3_process_message_gzip() {
604        use std::io::Read;
605
606        trace_init();
607
608        let logs: Vec<String> = random_lines(100).take(10).collect();
609
610        let mut gz = flate2::read::GzEncoder::new(
611            io::Cursor::new(logs.join("\n").into_bytes()),
612            flate2::Compression::fast(),
613        );
614        let mut buffer = Vec::new();
615        gz.read_to_end(&mut buffer).unwrap();
616
617        test_event(
618            None,
619            Some("gzip"),
620            None,
621            None,
622            buffer,
623            logs,
624            Delivered,
625            false,
626            DeserializerConfig::Bytes,
627            None,
628        )
629        .await;
630    }
631
632    #[tokio::test]
633    async fn s3_process_message_multipart_gzip() {
634        use std::io::Read;
635
636        trace_init();
637
638        let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
639
640        let buffer = {
641            let mut file =
642                File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
643            let mut data = Vec::new();
644            file.read_to_end(&mut data).expect("file can be read");
645            data
646        };
647
648        test_event(
649            None,
650            Some("gzip"),
651            None,
652            None,
653            buffer,
654            logs,
655            Delivered,
656            false,
657            DeserializerConfig::Bytes,
658            None,
659        )
660        .await;
661    }
662
663    #[tokio::test]
664    async fn s3_process_message_multipart_zstd() {
665        use std::io::Read;
666
667        trace_init();
668
669        let logs = lines_from_plaintext("tests/data/multipart-zst.log");
670
671        let buffer = {
672            let mut file =
673                File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
674            let mut data = Vec::new();
675            file.read_to_end(&mut data).expect("file can be read");
676            data
677        };
678
679        test_event(
680            None,
681            Some("zstd"),
682            None,
683            None,
684            buffer,
685            logs,
686            Delivered,
687            false,
688            DeserializerConfig::Bytes,
689            None,
690        )
691        .await;
692    }
693
694    #[tokio::test]
695    async fn s3_process_message_multiline() {
696        trace_init();
697
698        let logs: Vec<String> = vec!["abc", "def", "geh"]
699            .into_iter()
700            .map(ToOwned::to_owned)
701            .collect();
702
703        test_event(
704            None,
705            None,
706            None,
707            Some(MultilineConfig {
708                start_pattern: "abc".to_owned(),
709                mode: line_agg::Mode::HaltWith,
710                condition_pattern: "geh".to_owned(),
711                timeout_ms: Duration::from_millis(1000),
712            }),
713            logs.join("\n").into_bytes(),
714            vec!["abc\ndef\ngeh".to_owned()],
715            Delivered,
716            false,
717            DeserializerConfig::Bytes,
718            None,
719        )
720        .await;
721    }
722
723    // TODO: re-enable this after figuring out why it is so flakey in CI
724    //       https://github.com/vectordotdev/vector/issues/17456
725    #[ignore]
726    #[tokio::test]
727    async fn handles_errored_status() {
728        trace_init();
729
730        let logs: Vec<String> = random_lines(100).take(10).collect();
731
732        test_event(
733            None,
734            None,
735            None,
736            None,
737            logs.join("\n").into_bytes(),
738            logs,
739            Errored,
740            false,
741            DeserializerConfig::Bytes,
742            None,
743        )
744        .await;
745    }
746
747    #[tokio::test]
748    async fn handles_failed_status() {
749        trace_init();
750
751        let logs: Vec<String> = random_lines(100).take(10).collect();
752
753        test_event(
754            None,
755            None,
756            None,
757            None,
758            logs.join("\n").into_bytes(),
759            logs,
760            Rejected,
761            false,
762            DeserializerConfig::Bytes,
763            None,
764        )
765        .await;
766    }
767
768    #[tokio::test]
769    async fn handles_failed_status_without_deletion() {
770        trace_init();
771
772        let logs: Vec<String> = random_lines(100).take(10).collect();
773
774        let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
775        custom_options.insert("delete_failed_message".to_string(), Box::new(false));
776
777        test_event(
778            None,
779            None,
780            None,
781            None,
782            logs.join("\n").into_bytes(),
783            logs,
784            Rejected,
785            false,
786            DeserializerConfig::Bytes,
787            Some(custom_options),
788        )
789        .await;
790    }
791
792    fn s3_address() -> String {
793        std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
794    }
795
796    fn config(
797        queue_url: &str,
798        multiline: Option<MultilineConfig>,
799        log_namespace: bool,
800        decoding: DeserializerConfig,
801    ) -> AwsS3Config {
802        AwsS3Config {
803            region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
804            strategy: Strategy::Sqs,
805            compression: Compression::Auto,
806            multiline,
807            sqs: Some(sqs::Config {
808                queue_url: queue_url.to_string(),
809                poll_secs: 1,
810                max_number_of_messages: 10,
811                visibility_timeout_secs: 0,
812                client_concurrency: None,
813                ..Default::default()
814            }),
815            acknowledgements: true.into(),
816            log_namespace: Some(log_namespace),
817            decoding,
818            ..Default::default()
819        }
820    }
821
822    // puts an object and asserts that the logs it gets back match
823    #[allow(clippy::too_many_arguments)]
824    async fn test_event(
825        key: Option<String>,
826        content_encoding: Option<&str>,
827        content_type: Option<&str>,
828        multiline: Option<MultilineConfig>,
829        payload: Vec<u8>,
830        expected_lines: Vec<String>,
831        status: EventStatus,
832        log_namespace: bool,
833        decoding: DeserializerConfig,
834        custom_options: Option<HashMap<String, Box<dyn Any>>>,
835    ) {
836        assert_source_compliance(&SOURCE_TAGS, async move {
837            let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
838
839            let s3 = s3_client().await;
840            let sqs = sqs_client().await;
841
842            let queue = create_queue(&sqs).await;
843            let bucket = create_bucket(&s3).await;
844
845            tokio::time::sleep(Duration::from_secs(1)).await;
846
847            let mut config = config(&queue, multiline, log_namespace, decoding);
848
849            if let Some(false) = custom_options
850                .as_ref()
851                .and_then(|opts| opts.get("delete_failed_message"))
852                .and_then(|val| val.downcast_ref::<bool>())
853                .copied()
854            {
855                config.sqs.as_mut().unwrap().delete_failed_message = false;
856            }
857
858            s3.put_object()
859                .bucket(bucket.clone())
860                .key(key.clone())
861                .body(ByteStream::from(payload))
862                .set_content_type(content_type.map(|t| t.to_owned()))
863                .set_content_encoding(content_encoding.map(|t| t.to_owned()))
864                .send()
865                .await
866                .expect("Could not put object");
867
868            let sqs_client = sqs_client().await;
869
870            let mut s3_event: S3Event = serde_json::from_str(
871            r#"
872{
873   "Records":[
874      {
875         "eventVersion":"2.1",
876         "eventSource":"aws:s3",
877         "awsRegion":"us-east-1",
878         "eventTime":"2022-03-24T19:43:00.548Z",
879         "eventName":"ObjectCreated:Put",
880         "userIdentity":{
881            "principalId":"AWS:ARNOTAREALIDD4:user.name"
882         },
883         "requestParameters":{
884            "sourceIPAddress":"136.56.73.213"
885         },
886         "responseElements":{
887            "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
888            "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
889         },
890         "s3":{
891            "s3SchemaVersion":"1.0",
892            "configurationId":"asdfasdf",
893            "bucket":{
894               "name":"bucket-name",
895               "ownerIdentity":{
896                  "principalId":"A3PEG170DF9VNQ"
897               },
898               "arn":"arn:aws:s3:::nfox-testing-vector"
899            },
900            "object":{
901               "key":"test-log.txt",
902               "size":33,
903               "eTag":"c981ce6672c4251048b0b834e334007f",
904               "sequencer":"00623CC9C47AB5634C"
905            }
906         }
907      }
908   ]
909}
910        "#,
911            )
912            .unwrap();
913
914            s3_event.records[0].s3.bucket.name.clone_from(&bucket);
915            s3_event.records[0].s3.object.key.clone_from(&key);
916
917            // send SQS message (this is usually sent by S3 itself when an object is uploaded)
918            // This does not automatically work with localstack and the AWS SDK, so this is done manually
919            let _send_message_output = sqs_client
920                .send_message()
921                .queue_url(queue.clone())
922                .message_body(serde_json::to_string(&s3_event).unwrap())
923                .send()
924                .await
925                .unwrap();
926
927            let (tx, rx) = SourceSender::new_test_finalize(status);
928            let cx = SourceContext::new_test(tx, None);
929            let namespace = cx.log_namespace(Some(log_namespace));
930            let source = config.build(cx).await.unwrap();
931            tokio::spawn(async move { source.await.unwrap() });
932
933            let events = collect_n(rx, expected_lines.len()).await;
934
935            assert_eq!(expected_lines.len(), events.len());
936            for (i, event) in events.iter().enumerate() {
937
938                if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
939                    schema_definition.is_valid_for_event(event).unwrap();
940                }
941
942                let message = expected_lines[i].as_str();
943
944                let log = event.as_log();
945                if log_namespace {
946                    assert_eq!(log.value(), &Value::from(message));
947                } else {
948                    assert_eq!(log["message"], message.into());
949                }
950                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
951                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
952                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
953            }
954
955            // Unfortunately we need a fairly large sleep here to ensure that the source has actually managed to delete the SQS message.
956            // The deletion of this message occurs after the Event has been sent out by the source and there is no way of knowing when this
957            // process has finished other than waiting around for a while.
958            tokio::time::sleep(Duration::from_secs(10)).await;
959            // Make sure the SQS message is deleted
960            match status {
961                Errored => {
962                    // need to wait up to the visibility timeout before it will be counted again
963                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
964                }
965                Rejected if !config.sqs.unwrap().delete_failed_message => {
966                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
967                }
968                _ => {
969                    assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
970                }
971            };
972        }).await;
973    }
974
975    /// creates a new SQS queue
976    ///
977    /// returns the queue name
978    async fn create_queue(client: &SqsClient) -> String {
979        let queue_name = uuid::Uuid::new_v4().to_string();
980
981        let res = client
982            .create_queue()
983            .queue_name(queue_name.clone())
984            .attributes(QueueAttributeName::VisibilityTimeout, "2")
985            .send()
986            .await
987            .expect("Could not create queue");
988
989        res.queue_url.expect("no queue url")
990    }
991
992    /// count the number of messages in a SQS queue
993    async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
994        let sqs_result = client
995            .receive_message()
996            .queue_url(queue)
997            .visibility_timeout(0)
998            .wait_time_seconds(wait_time_seconds)
999            .send()
1000            .await
1001            .unwrap();
1002
1003        sqs_result
1004            .messages
1005            .map(|messages| messages.len())
1006            .unwrap_or(0)
1007    }
1008
1009    /// creates a new S3 bucket
1010    ///
1011    /// returns the bucket name
1012    async fn create_bucket(client: &S3Client) -> String {
1013        let bucket_name = uuid::Uuid::new_v4().to_string();
1014
1015        client
1016            .create_bucket()
1017            .bucket(bucket_name.clone())
1018            .send()
1019            .await
1020            .expect("Could not create bucket");
1021
1022        bucket_name
1023    }
1024
1025    async fn s3_client() -> S3Client {
1026        let auth = AwsAuthentication::test_auth();
1027        let region_endpoint = RegionOrEndpoint {
1028            region: Some("us-east-1".to_owned()),
1029            endpoint: Some(s3_address()),
1030        };
1031        let proxy_config = ProxyConfig::default();
1032        let force_path_style_value: bool = true;
1033        create_client::<S3ClientBuilder>(
1034            &S3ClientBuilder {
1035                force_path_style: Some(force_path_style_value),
1036            },
1037            &auth,
1038            region_endpoint.region(),
1039            region_endpoint.endpoint(),
1040            &proxy_config,
1041            None,
1042            None,
1043        )
1044        .await
1045        .unwrap()
1046    }
1047
1048    async fn sqs_client() -> SqsClient {
1049        let auth = AwsAuthentication::test_auth();
1050        let region_endpoint = RegionOrEndpoint {
1051            region: Some("us-east-1".to_owned()),
1052            endpoint: Some(s3_address()),
1053        };
1054        let proxy_config = ProxyConfig::default();
1055        create_client::<SqsClientBuilder>(
1056            &SqsClientBuilder {},
1057            &auth,
1058            region_endpoint.region(),
1059            region_endpoint.endpoint(),
1060            &proxy_config,
1061            None,
1062            None,
1063        )
1064        .await
1065        .unwrap()
1066    }
1067}