vector/sources/aws_s3/
mod.rs

1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::{
9    codecs::{
10        NewlineDelimitedDecoderConfig,
11        decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
12    },
13    config::{LegacyKey, LogNamespace},
14    configurable::configurable_component,
15    lookup::owned_value_path,
16};
17use vrl::value::{Kind, kind::Collection};
18
19use super::util::MultilineConfig;
20use crate::{
21    aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
22    codecs::DecodingConfig,
23    common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
24    config::{
25        ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
26    },
27    line_agg,
28    serde::{bool_or_struct, default_decoding},
29    tls::TlsConfig,
30};
31
32pub mod sqs;
33
34/// Compression scheme for objects retrieved from S3.
35#[configurable_component]
36#[configurable(metadata(docs::advanced))]
37#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum Compression {
40    /// Automatically attempt to determine the compression scheme.
41    ///
42    /// The compression scheme of the object is determined from its `Content-Encoding` and
43    /// `Content-Type` metadata, as well as the key suffix (for example, `.gz`).
44    ///
45    /// It is set to `none` if the compression scheme cannot be determined.
46    #[default]
47    Auto,
48
49    /// Uncompressed.
50    None,
51
52    /// GZIP.
53    Gzip,
54
55    /// ZSTD.
56    Zstd,
57}
58
59/// Strategies for consuming objects from AWS S3.
60#[configurable_component]
61#[derive(Clone, Copy, Debug, Default)]
62#[serde(rename_all = "lowercase")]
63enum Strategy {
64    /// Consumes objects by processing bucket notification events sent to an [AWS SQS queue][aws_sqs].
65    ///
66    /// [aws_sqs]: https://aws.amazon.com/sqs/
67    #[default]
68    Sqs,
69}
70
71/// Configuration for the `aws_s3` source.
72// TODO: The `Default` impl here makes the configuration schema output look pretty weird, especially because all the
73// usage of optionals means we're spewing out a ton of `"foo": null` stuff in the default value, and that's not helpful
74// when there's required fields.
75//
76// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
77#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
78#[derive(Clone, Debug, Derivative)]
79#[derivative(Default)]
80#[serde(default, deny_unknown_fields)]
81pub struct AwsS3Config {
82    #[serde(flatten)]
83    region: RegionOrEndpoint,
84
85    /// The compression scheme used for decompressing objects retrieved from S3.
86    compression: Compression,
87
88    /// The strategy to use to consume objects from S3.
89    #[configurable(metadata(docs::hidden))]
90    strategy: Strategy,
91
92    /// Configuration options for SQS.
93    sqs: Option<sqs::Config>,
94
95    /// The ARN of an [IAM role][iam_role] to assume at startup.
96    ///
97    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
98    #[configurable(deprecated)]
99    #[configurable(metadata(docs::hidden))]
100    assume_role: Option<String>,
101
102    #[configurable(derived)]
103    #[serde(default)]
104    auth: AwsAuthentication,
105
106    /// Multiline aggregation configuration.
107    ///
108    /// If not specified, multiline aggregation is disabled.
109    #[configurable(derived)]
110    multiline: Option<MultilineConfig>,
111
112    #[configurable(derived)]
113    #[serde(default, deserialize_with = "bool_or_struct")]
114    acknowledgements: SourceAcknowledgementsConfig,
115
116    #[configurable(derived)]
117    tls_options: Option<TlsConfig>,
118
119    /// The namespace to use for logs. This overrides the global setting.
120    #[configurable(metadata(docs::hidden))]
121    #[serde(default)]
122    log_namespace: Option<bool>,
123
124    #[configurable(derived)]
125    #[serde(default = "default_framing")]
126    #[derivative(Default(value = "default_framing()"))]
127    pub framing: FramingConfig,
128
129    #[configurable(derived)]
130    #[serde(default = "default_decoding")]
131    #[derivative(Default(value = "default_decoding()"))]
132    pub decoding: DeserializerConfig,
133
134    /// Specifies which addressing style to use.
135    ///
136    /// This controls whether the bucket name is in the hostname, or part of the URL.
137    #[serde(default = "default_true")]
138    #[derivative(Default(value = "default_true()"))]
139    pub force_path_style: bool,
140}
141
142const fn default_framing() -> FramingConfig {
143    // This is used for backwards compatibility. It used to be the only (hardcoded) option.
144    FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
145        newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
146    })
147}
148
149const fn default_true() -> bool {
150    true
151}
152
153impl_generate_config_from_default!(AwsS3Config);
154
155#[async_trait::async_trait]
156#[typetag::serde(name = "aws_s3")]
157impl SourceConfig for AwsS3Config {
158    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
159        let log_namespace = cx.log_namespace(self.log_namespace);
160
161        let multiline_config: Option<line_agg::Config> = self
162            .multiline
163            .as_ref()
164            .map(|config| config.try_into())
165            .transpose()?;
166
167        match self.strategy {
168            Strategy::Sqs => Ok(Box::pin(
169                self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
170                    .await?
171                    .run(cx, self.acknowledgements, log_namespace),
172            )),
173        }
174    }
175
176    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177        let log_namespace = global_log_namespace.merge(self.log_namespace);
178        let mut schema_definition = self
179            .decoding
180            .schema_definition(log_namespace)
181            .with_source_metadata(
182                Self::NAME,
183                Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
184                &owned_value_path!("bucket"),
185                Kind::bytes(),
186                None,
187            )
188            .with_source_metadata(
189                Self::NAME,
190                Some(LegacyKey::Overwrite(owned_value_path!("object"))),
191                &owned_value_path!("object"),
192                Kind::bytes(),
193                None,
194            )
195            .with_source_metadata(
196                Self::NAME,
197                Some(LegacyKey::Overwrite(owned_value_path!("region"))),
198                &owned_value_path!("region"),
199                Kind::bytes(),
200                None,
201            )
202            .with_source_metadata(
203                Self::NAME,
204                None,
205                &owned_value_path!("timestamp"),
206                Kind::timestamp(),
207                Some("timestamp"),
208            )
209            .with_standard_vector_source_metadata()
210            // for metadata that is added to the events dynamically from the metadata
211            .with_source_metadata(
212                Self::NAME,
213                None,
214                &owned_value_path!("metadata"),
215                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
216                None,
217            );
218
219        // for metadata that is added to the events dynamically from the metadata
220        if log_namespace == LogNamespace::Legacy {
221            schema_definition = schema_definition.unknown_fields(Kind::bytes());
222        }
223
224        vec![SourceOutput::new_maybe_logs(
225            self.decoding.output_type(),
226            schema_definition,
227        )]
228    }
229
230    fn can_acknowledge(&self) -> bool {
231        true
232    }
233}
234
235impl AwsS3Config {
236    async fn create_sqs_ingestor(
237        &self,
238        multiline: Option<line_agg::Config>,
239        proxy: &ProxyConfig,
240        log_namespace: LogNamespace,
241    ) -> crate::Result<sqs::Ingestor> {
242        let region = self.region.region();
243        let endpoint = self.region.endpoint();
244
245        let s3_client = create_client::<S3ClientBuilder>(
246            &S3ClientBuilder {
247                force_path_style: Some(self.force_path_style),
248            },
249            &self.auth,
250            region.clone(),
251            endpoint.clone(),
252            proxy,
253            self.tls_options.as_ref(),
254            None,
255        )
256        .await?;
257
258        let decoder =
259            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
260                .build()?;
261
262        match self.sqs {
263            Some(ref sqs) => {
264                let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
265                    &SqsClientBuilder {},
266                    &self.auth,
267                    region.clone(),
268                    endpoint,
269                    proxy,
270                    sqs.tls_options.as_ref(),
271                    sqs.timeout.as_ref(),
272                )
273                .await?;
274
275                let ingestor = sqs::Ingestor::new(
276                    region,
277                    sqs_client,
278                    s3_client,
279                    sqs.clone(),
280                    self.compression,
281                    multiline,
282                    decoder,
283                )
284                .await?;
285
286                Ok(ingestor)
287            }
288            None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
289        }
290    }
291}
292
293#[derive(Debug, Snafu)]
294enum CreateSqsIngestorError {
295    #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
296    ConfigMissing,
297}
298
299/// None if body is empty
300async fn s3_object_decoder(
301    compression: Compression,
302    key: &str,
303    content_encoding: Option<&str>,
304    content_type: Option<&str>,
305    mut body: ByteStream,
306) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
307    let first = match body.next().await {
308        Some(first) => first,
309        _ => {
310            return Box::new(tokio::io::empty());
311        }
312    };
313
314    let r = tokio::io::BufReader::new(StreamReader::new(
315        stream::iter(Some(first))
316            .chain(Box::pin(async_stream::stream! {
317                while let Some(next) = body.next().await {
318                    yield next;
319                }
320            }))
321            .map_err(std::io::Error::other),
322    ));
323
324    let compression = match compression {
325        Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
326        _ => compression,
327    };
328
329    use Compression::*;
330    match compression {
331        Auto => unreachable!(), // is mapped above
332        None => Box::new(r),
333        Gzip => Box::new({
334            let mut decoder = bufread::GzipDecoder::new(r);
335            decoder.multiple_members(true);
336            decoder
337        }),
338        Zstd => Box::new({
339            let mut decoder = bufread::ZstdDecoder::new(r);
340            decoder.multiple_members(true);
341            decoder
342        }),
343    }
344}
345
346// try to determine the compression given the:
347// * content-encoding
348// * content-type
349// * key name (for file extension)
350//
351// It will use this information in this order
352fn determine_compression(
353    content_encoding: Option<&str>,
354    content_type: Option<&str>,
355    key: &str,
356) -> Option<Compression> {
357    content_encoding
358        .and_then(content_encoding_to_compression)
359        .or_else(|| content_type.and_then(content_type_to_compression))
360        .or_else(|| object_key_to_compression(key))
361}
362
363fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
364    match content_encoding {
365        "gzip" => Some(Compression::Gzip),
366        "zstd" => Some(Compression::Zstd),
367        _ => None,
368    }
369}
370
371fn content_type_to_compression(content_type: &str) -> Option<Compression> {
372    match content_type {
373        "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
374        "application/zstd" => Some(Compression::Zstd),
375        _ => None,
376    }
377}
378
379fn object_key_to_compression(key: &str) -> Option<Compression> {
380    let extension = std::path::Path::new(key)
381        .extension()
382        .and_then(std::ffi::OsStr::to_str);
383
384    use Compression::*;
385    extension.and_then(|extension| match extension {
386        "gz" => Some(Gzip),
387        "zst" => Some(Zstd),
388        _ => Option::None,
389    })
390}
391
392#[cfg(test)]
393mod test {
394    use tokio::io::AsyncReadExt;
395
396    use super::*;
397
398    #[test]
399    fn determine_compression() {
400        use super::Compression;
401
402        let cases = vec![
403            ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
404            (
405                "out.log",
406                None,
407                Some("application/gzip"),
408                Some(Compression::Gzip),
409            ),
410            ("out.log.gz", None, None, Some(Compression::Gzip)),
411            ("out.txt", None, None, None),
412        ];
413        for case in cases {
414            let (key, content_encoding, content_type, expected) = case;
415            assert_eq!(
416                super::determine_compression(content_encoding, content_type, key),
417                expected,
418                "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
419            );
420        }
421    }
422
423    #[tokio::test]
424    async fn decode_empty_message_gzip() {
425        let key = uuid::Uuid::new_v4().to_string();
426
427        let mut data = Vec::new();
428        s3_object_decoder(
429            Compression::Auto,
430            &key,
431            Some("gzip"),
432            None,
433            ByteStream::default(),
434        )
435        .await
436        .read_to_end(&mut data)
437        .await
438        .unwrap();
439
440        assert!(data.is_empty());
441    }
442}
443
444#[cfg(feature = "aws-s3-integration-tests")]
445#[cfg(test)]
446mod integration_tests {
447    use std::{
448        any::Any,
449        collections::HashMap,
450        fs::File,
451        io::{self, BufRead},
452        path::Path,
453        time::Duration,
454    };
455
456    use aws_sdk_s3::Client as S3Client;
457    use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
458    use similar_asserts::assert_eq;
459    use vector_lib::{
460        codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
461        lookup::path,
462    };
463    use vrl::value::Value;
464
465    use super::*;
466    use crate::{
467        SourceSender,
468        aws::{AwsAuthentication, RegionOrEndpoint, create_client},
469        common::sqs::SqsClientBuilder,
470        config::{ProxyConfig, SourceConfig, SourceContext},
471        event::EventStatus::{self, *},
472        line_agg,
473        sources::{
474            aws_s3::{S3ClientBuilder, sqs::S3Event},
475            util::MultilineConfig,
476        },
477        test_util::{
478            collect_n,
479            components::{SOURCE_TAGS, assert_source_compliance},
480            lines_from_gzip_file, random_lines, trace_init,
481        },
482    };
483
484    fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
485        let file = io::BufReader::new(File::open(path).unwrap());
486        file.lines().map(|x| x.unwrap()).collect()
487    }
488
489    #[tokio::test]
490    async fn s3_process_message() {
491        trace_init();
492
493        let logs: Vec<String> = random_lines(100).take(10).collect();
494
495        test_event(
496            None,
497            None,
498            None,
499            None,
500            logs.join("\n").into_bytes(),
501            logs,
502            Delivered,
503            false,
504            DeserializerConfig::Bytes,
505            None,
506        )
507        .await;
508    }
509
510    #[tokio::test]
511    async fn s3_process_json_message() {
512        trace_init();
513
514        let logs: Vec<String> = random_lines(100).take(10).collect();
515
516        let json_logs: Vec<String> = logs
517            .iter()
518            .map(|msg| {
519                // convert to JSON object
520                format!(r#"{{"message": "{msg}"}}"#)
521            })
522            .collect();
523
524        test_event(
525            None,
526            None,
527            None,
528            None,
529            json_logs.join("\n").into_bytes(),
530            logs,
531            Delivered,
532            false,
533            DeserializerConfig::Json(JsonDeserializerConfig::default()),
534            None,
535        )
536        .await;
537    }
538
539    #[tokio::test]
540    async fn s3_process_message_with_log_namespace() {
541        trace_init();
542
543        let logs: Vec<String> = random_lines(100).take(10).collect();
544
545        test_event(
546            None,
547            None,
548            None,
549            None,
550            logs.join("\n").into_bytes(),
551            logs,
552            Delivered,
553            true,
554            DeserializerConfig::Bytes,
555            None,
556        )
557        .await;
558    }
559
560    #[tokio::test]
561    async fn s3_process_message_spaces() {
562        trace_init();
563
564        let key = "key with spaces".to_string();
565        let logs: Vec<String> = random_lines(100).take(10).collect();
566
567        test_event(
568            Some(key),
569            None,
570            None,
571            None,
572            logs.join("\n").into_bytes(),
573            logs,
574            Delivered,
575            false,
576            DeserializerConfig::Bytes,
577            None,
578        )
579        .await;
580    }
581
582    #[tokio::test]
583    async fn s3_process_message_special_characters() {
584        trace_init();
585
586        let key = format!("special:{}", uuid::Uuid::new_v4());
587        let logs: Vec<String> = random_lines(100).take(10).collect();
588
589        test_event(
590            Some(key),
591            None,
592            None,
593            None,
594            logs.join("\n").into_bytes(),
595            logs,
596            Delivered,
597            false,
598            DeserializerConfig::Bytes,
599            None,
600        )
601        .await;
602    }
603
604    #[tokio::test]
605    async fn s3_process_message_gzip() {
606        use std::io::Read;
607
608        trace_init();
609
610        let logs: Vec<String> = random_lines(100).take(10).collect();
611
612        let mut gz = flate2::read::GzEncoder::new(
613            io::Cursor::new(logs.join("\n").into_bytes()),
614            flate2::Compression::fast(),
615        );
616        let mut buffer = Vec::new();
617        gz.read_to_end(&mut buffer).unwrap();
618
619        test_event(
620            None,
621            Some("gzip"),
622            None,
623            None,
624            buffer,
625            logs,
626            Delivered,
627            false,
628            DeserializerConfig::Bytes,
629            None,
630        )
631        .await;
632    }
633
634    #[tokio::test]
635    async fn s3_process_message_multipart_gzip() {
636        use std::io::Read;
637
638        trace_init();
639
640        let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
641
642        let buffer = {
643            let mut file =
644                File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
645            let mut data = Vec::new();
646            file.read_to_end(&mut data).expect("file can be read");
647            data
648        };
649
650        test_event(
651            None,
652            Some("gzip"),
653            None,
654            None,
655            buffer,
656            logs,
657            Delivered,
658            false,
659            DeserializerConfig::Bytes,
660            None,
661        )
662        .await;
663    }
664
665    #[tokio::test]
666    async fn s3_process_message_multipart_zstd() {
667        use std::io::Read;
668
669        trace_init();
670
671        let logs = lines_from_plaintext("tests/data/multipart-zst.log");
672
673        let buffer = {
674            let mut file =
675                File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
676            let mut data = Vec::new();
677            file.read_to_end(&mut data).expect("file can be read");
678            data
679        };
680
681        test_event(
682            None,
683            Some("zstd"),
684            None,
685            None,
686            buffer,
687            logs,
688            Delivered,
689            false,
690            DeserializerConfig::Bytes,
691            None,
692        )
693        .await;
694    }
695
696    #[tokio::test]
697    async fn s3_process_message_multiline() {
698        trace_init();
699
700        let logs: Vec<String> = vec!["abc", "def", "geh"]
701            .into_iter()
702            .map(ToOwned::to_owned)
703            .collect();
704
705        test_event(
706            None,
707            None,
708            None,
709            Some(MultilineConfig {
710                start_pattern: "abc".to_owned(),
711                mode: line_agg::Mode::HaltWith,
712                condition_pattern: "geh".to_owned(),
713                timeout_ms: Duration::from_millis(1000),
714            }),
715            logs.join("\n").into_bytes(),
716            vec!["abc\ndef\ngeh".to_owned()],
717            Delivered,
718            false,
719            DeserializerConfig::Bytes,
720            None,
721        )
722        .await;
723    }
724
725    // TODO: re-enable this after figuring out why it is so flakey in CI
726    //       https://github.com/vectordotdev/vector/issues/17456
727    #[ignore]
728    #[tokio::test]
729    async fn handles_errored_status() {
730        trace_init();
731
732        let logs: Vec<String> = random_lines(100).take(10).collect();
733
734        test_event(
735            None,
736            None,
737            None,
738            None,
739            logs.join("\n").into_bytes(),
740            logs,
741            Errored,
742            false,
743            DeserializerConfig::Bytes,
744            None,
745        )
746        .await;
747    }
748
749    #[tokio::test]
750    async fn handles_failed_status() {
751        trace_init();
752
753        let logs: Vec<String> = random_lines(100).take(10).collect();
754
755        test_event(
756            None,
757            None,
758            None,
759            None,
760            logs.join("\n").into_bytes(),
761            logs,
762            Rejected,
763            false,
764            DeserializerConfig::Bytes,
765            None,
766        )
767        .await;
768    }
769
770    #[tokio::test]
771    async fn handles_failed_status_without_deletion() {
772        trace_init();
773
774        let logs: Vec<String> = random_lines(100).take(10).collect();
775
776        let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
777        custom_options.insert("delete_failed_message".to_string(), Box::new(false));
778
779        test_event(
780            None,
781            None,
782            None,
783            None,
784            logs.join("\n").into_bytes(),
785            logs,
786            Rejected,
787            false,
788            DeserializerConfig::Bytes,
789            Some(custom_options),
790        )
791        .await;
792    }
793
794    fn s3_address() -> String {
795        std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
796    }
797
798    fn config(
799        queue_url: &str,
800        multiline: Option<MultilineConfig>,
801        log_namespace: bool,
802        decoding: DeserializerConfig,
803    ) -> AwsS3Config {
804        AwsS3Config {
805            region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
806            strategy: Strategy::Sqs,
807            compression: Compression::Auto,
808            multiline,
809            sqs: Some(sqs::Config {
810                queue_url: queue_url.to_string(),
811                poll_secs: 1,
812                max_number_of_messages: 10,
813                visibility_timeout_secs: 0,
814                client_concurrency: None,
815                ..Default::default()
816            }),
817            acknowledgements: true.into(),
818            log_namespace: Some(log_namespace),
819            decoding,
820            ..Default::default()
821        }
822    }
823
824    // puts an object and asserts that the logs it gets back match
825    #[allow(clippy::too_many_arguments)]
826    async fn test_event(
827        key: Option<String>,
828        content_encoding: Option<&str>,
829        content_type: Option<&str>,
830        multiline: Option<MultilineConfig>,
831        payload: Vec<u8>,
832        expected_lines: Vec<String>,
833        status: EventStatus,
834        log_namespace: bool,
835        decoding: DeserializerConfig,
836        custom_options: Option<HashMap<String, Box<dyn Any>>>,
837    ) {
838        assert_source_compliance(&SOURCE_TAGS, async move {
839            let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
840
841            let s3 = s3_client().await;
842            let sqs = sqs_client().await;
843
844            let queue = create_queue(&sqs).await;
845            let bucket = create_bucket(&s3).await;
846
847            tokio::time::sleep(Duration::from_secs(1)).await;
848
849            let mut config = config(&queue, multiline, log_namespace, decoding);
850
851            if let Some(false) = custom_options
852                .as_ref()
853                .and_then(|opts| opts.get("delete_failed_message"))
854                .and_then(|val| val.downcast_ref::<bool>())
855                .copied()
856            {
857                config.sqs.as_mut().unwrap().delete_failed_message = false;
858            }
859
860            s3.put_object()
861                .bucket(bucket.clone())
862                .key(key.clone())
863                .body(ByteStream::from(payload))
864                .set_content_type(content_type.map(|t| t.to_owned()))
865                .set_content_encoding(content_encoding.map(|t| t.to_owned()))
866                .send()
867                .await
868                .expect("Could not put object");
869
870            let sqs_client = sqs_client().await;
871
872            let mut s3_event: S3Event = serde_json::from_str(
873            r#"
874{
875   "Records":[
876      {
877         "eventVersion":"2.1",
878         "eventSource":"aws:s3",
879         "awsRegion":"us-east-1",
880         "eventTime":"2022-03-24T19:43:00.548Z",
881         "eventName":"ObjectCreated:Put",
882         "userIdentity":{
883            "principalId":"AWS:ARNOTAREALIDD4:user.name"
884         },
885         "requestParameters":{
886            "sourceIPAddress":"136.56.73.213"
887         },
888         "responseElements":{
889            "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
890            "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
891         },
892         "s3":{
893            "s3SchemaVersion":"1.0",
894            "configurationId":"asdfasdf",
895            "bucket":{
896               "name":"bucket-name",
897               "ownerIdentity":{
898                  "principalId":"A3PEG170DF9VNQ"
899               },
900               "arn":"arn:aws:s3:::nfox-testing-vector"
901            },
902            "object":{
903               "key":"test-log.txt",
904               "size":33,
905               "eTag":"c981ce6672c4251048b0b834e334007f",
906               "sequencer":"00623CC9C47AB5634C"
907            }
908         }
909      }
910   ]
911}
912        "#,
913            )
914            .unwrap();
915
916            s3_event.records[0].s3.bucket.name.clone_from(&bucket);
917            s3_event.records[0].s3.object.key.clone_from(&key);
918
919            // send SQS message (this is usually sent by S3 itself when an object is uploaded)
920            // This does not automatically work with localstack and the AWS SDK, so this is done manually
921            let _send_message_output = sqs_client
922                .send_message()
923                .queue_url(queue.clone())
924                .message_body(serde_json::to_string(&s3_event).unwrap())
925                .send()
926                .await
927                .unwrap();
928
929            let (tx, rx) = SourceSender::new_test_finalize(status);
930            let cx = SourceContext::new_test(tx, None);
931            let namespace = cx.log_namespace(Some(log_namespace));
932            let source = config.build(cx).await.unwrap();
933            tokio::spawn(async move { source.await.unwrap() });
934
935            let events = collect_n(rx, expected_lines.len()).await;
936
937            assert_eq!(expected_lines.len(), events.len());
938            for (i, event) in events.iter().enumerate() {
939
940                if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
941                    schema_definition.is_valid_for_event(event).unwrap();
942                }
943
944                let message = expected_lines[i].as_str();
945
946                let log = event.as_log();
947                if log_namespace {
948                    assert_eq!(log.value(), &Value::from(message));
949                } else {
950                    assert_eq!(log["message"], message.into());
951                }
952                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
953                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
954                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
955            }
956
957            // Unfortunately we need a fairly large sleep here to ensure that the source has actually managed to delete the SQS message.
958            // The deletion of this message occurs after the Event has been sent out by the source and there is no way of knowing when this
959            // process has finished other than waiting around for a while.
960            tokio::time::sleep(Duration::from_secs(10)).await;
961            // Make sure the SQS message is deleted
962            match status {
963                Errored => {
964                    // need to wait up to the visibility timeout before it will be counted again
965                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
966                }
967                Rejected if !config.sqs.unwrap().delete_failed_message => {
968                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
969                }
970                _ => {
971                    assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
972                }
973            };
974        }).await;
975    }
976
977    /// creates a new SQS queue
978    ///
979    /// returns the queue name
980    async fn create_queue(client: &SqsClient) -> String {
981        let queue_name = uuid::Uuid::new_v4().to_string();
982
983        let res = client
984            .create_queue()
985            .queue_name(queue_name.clone())
986            .attributes(QueueAttributeName::VisibilityTimeout, "2")
987            .send()
988            .await
989            .expect("Could not create queue");
990
991        res.queue_url.expect("no queue url")
992    }
993
994    /// count the number of messages in a SQS queue
995    async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
996        let sqs_result = client
997            .receive_message()
998            .queue_url(queue)
999            .visibility_timeout(0)
1000            .wait_time_seconds(wait_time_seconds)
1001            .send()
1002            .await
1003            .unwrap();
1004
1005        sqs_result
1006            .messages
1007            .map(|messages| messages.len())
1008            .unwrap_or(0)
1009    }
1010
1011    /// creates a new S3 bucket
1012    ///
1013    /// returns the bucket name
1014    async fn create_bucket(client: &S3Client) -> String {
1015        let bucket_name = uuid::Uuid::new_v4().to_string();
1016
1017        client
1018            .create_bucket()
1019            .bucket(bucket_name.clone())
1020            .send()
1021            .await
1022            .expect("Could not create bucket");
1023
1024        bucket_name
1025    }
1026
1027    async fn s3_client() -> S3Client {
1028        let auth = AwsAuthentication::test_auth();
1029        let region_endpoint = RegionOrEndpoint {
1030            region: Some("us-east-1".to_owned()),
1031            endpoint: Some(s3_address()),
1032        };
1033        let proxy_config = ProxyConfig::default();
1034        let force_path_style_value: bool = true;
1035        create_client::<S3ClientBuilder>(
1036            &S3ClientBuilder {
1037                force_path_style: Some(force_path_style_value),
1038            },
1039            &auth,
1040            region_endpoint.region(),
1041            region_endpoint.endpoint(),
1042            &proxy_config,
1043            None,
1044            None,
1045        )
1046        .await
1047        .unwrap()
1048    }
1049
1050    async fn sqs_client() -> SqsClient {
1051        let auth = AwsAuthentication::test_auth();
1052        let region_endpoint = RegionOrEndpoint {
1053            region: Some("us-east-1".to_owned()),
1054            endpoint: Some(s3_address()),
1055        };
1056        let proxy_config = ProxyConfig::default();
1057        create_client::<SqsClientBuilder>(
1058            &SqsClientBuilder {},
1059            &auth,
1060            region_endpoint.region(),
1061            region_endpoint.endpoint(),
1062            &proxy_config,
1063            None,
1064            None,
1065        )
1066        .await
1067        .unwrap()
1068    }
1069}