vector/sources/aws_s3/
mod.rs

1use std::convert::TryInto;
2
3use async_compression::tokio::bufread;
4use aws_smithy_types::byte_stream::ByteStream;
5use futures::{TryStreamExt, stream, stream::StreamExt};
6use snafu::Snafu;
7use tokio_util::io::StreamReader;
8use vector_lib::{
9    codecs::{
10        NewlineDelimitedDecoderConfig,
11        decoding::{DeserializerConfig, FramingConfig, NewlineDelimitedDecoderOptions},
12    },
13    config::{LegacyKey, LogNamespace},
14    configurable::configurable_component,
15    lookup::owned_value_path,
16};
17use vrl::value::{Kind, kind::Collection};
18
19use super::util::MultilineConfig;
20use crate::{
21    aws::{RegionOrEndpoint, auth::AwsAuthentication, create_client, create_client_and_region},
22    codecs::DecodingConfig,
23    common::{s3::S3ClientBuilder, sqs::SqsClientBuilder},
24    config::{
25        ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
26    },
27    line_agg,
28    serde::{bool_or_struct, default_decoding},
29    tls::TlsConfig,
30};
31
32pub mod sqs;
33
34/// Compression scheme for objects retrieved from S3.
35#[configurable_component]
36#[configurable(metadata(docs::advanced))]
37#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39#[derivative(Default)]
40pub enum Compression {
41    /// Automatically attempt to determine the compression scheme.
42    ///
43    /// The compression scheme of the object is determined from its `Content-Encoding` and
44    /// `Content-Type` metadata, as well as the key suffix (for example, `.gz`).
45    ///
46    /// It is set to `none` if the compression scheme cannot be determined.
47    #[derivative(Default)]
48    Auto,
49
50    /// Uncompressed.
51    None,
52
53    /// GZIP.
54    Gzip,
55
56    /// ZSTD.
57    Zstd,
58}
59
60/// Strategies for consuming objects from AWS S3.
61#[configurable_component]
62#[derive(Clone, Copy, Debug, Derivative)]
63#[serde(rename_all = "lowercase")]
64#[derivative(Default)]
65enum Strategy {
66    /// Consumes objects by processing bucket notification events sent to an [AWS SQS queue][aws_sqs].
67    ///
68    /// [aws_sqs]: https://aws.amazon.com/sqs/
69    #[derivative(Default)]
70    Sqs,
71}
72
73/// Configuration for the `aws_s3` source.
74// TODO: The `Default` impl here makes the configuration schema output look pretty weird, especially because all the
75// usage of optionals means we're spewing out a ton of `"foo": null` stuff in the default value, and that's not helpful
76// when there's required fields.
77//
78// Maybe showing defaults at all, when there are required properties, doesn't actually make sense? :thinkies:
79#[configurable_component(source("aws_s3", "Collect logs from AWS S3."))]
80#[derive(Clone, Debug, Derivative)]
81#[derivative(Default)]
82#[serde(default, deny_unknown_fields)]
83pub struct AwsS3Config {
84    #[serde(flatten)]
85    region: RegionOrEndpoint,
86
87    /// The compression scheme used for decompressing objects retrieved from S3.
88    compression: Compression,
89
90    /// The strategy to use to consume objects from S3.
91    #[configurable(metadata(docs::hidden))]
92    strategy: Strategy,
93
94    /// Configuration options for SQS.
95    sqs: Option<sqs::Config>,
96
97    /// The ARN of an [IAM role][iam_role] to assume at startup.
98    ///
99    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
100    #[configurable(deprecated)]
101    #[configurable(metadata(docs::hidden))]
102    assume_role: Option<String>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    auth: AwsAuthentication,
107
108    /// Multiline aggregation configuration.
109    ///
110    /// If not specified, multiline aggregation is disabled.
111    #[configurable(derived)]
112    multiline: Option<MultilineConfig>,
113
114    #[configurable(derived)]
115    #[serde(default, deserialize_with = "bool_or_struct")]
116    acknowledgements: SourceAcknowledgementsConfig,
117
118    #[configurable(derived)]
119    tls_options: Option<TlsConfig>,
120
121    /// The namespace to use for logs. This overrides the global setting.
122    #[configurable(metadata(docs::hidden))]
123    #[serde(default)]
124    log_namespace: Option<bool>,
125
126    #[configurable(derived)]
127    #[serde(default = "default_framing")]
128    #[derivative(Default(value = "default_framing()"))]
129    pub framing: FramingConfig,
130
131    #[configurable(derived)]
132    #[serde(default = "default_decoding")]
133    #[derivative(Default(value = "default_decoding()"))]
134    pub decoding: DeserializerConfig,
135
136    /// Specifies which addressing style to use.
137    ///
138    /// This controls whether the bucket name is in the hostname, or part of the URL.
139    #[serde(default = "default_true")]
140    #[derivative(Default(value = "default_true()"))]
141    pub force_path_style: bool,
142}
143
144const fn default_framing() -> FramingConfig {
145    // This is used for backwards compatibility. It used to be the only (hardcoded) option.
146    FramingConfig::NewlineDelimited(NewlineDelimitedDecoderConfig {
147        newline_delimited: NewlineDelimitedDecoderOptions { max_length: None },
148    })
149}
150
151const fn default_true() -> bool {
152    true
153}
154
155impl_generate_config_from_default!(AwsS3Config);
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "aws_s3")]
159impl SourceConfig for AwsS3Config {
160    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
161        let log_namespace = cx.log_namespace(self.log_namespace);
162
163        let multiline_config: Option<line_agg::Config> = self
164            .multiline
165            .as_ref()
166            .map(|config| config.try_into())
167            .transpose()?;
168
169        match self.strategy {
170            Strategy::Sqs => Ok(Box::pin(
171                self.create_sqs_ingestor(multiline_config, &cx.proxy, log_namespace)
172                    .await?
173                    .run(cx, self.acknowledgements, log_namespace),
174            )),
175        }
176    }
177
178    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
179        let log_namespace = global_log_namespace.merge(self.log_namespace);
180        let mut schema_definition = self
181            .decoding
182            .schema_definition(log_namespace)
183            .with_source_metadata(
184                Self::NAME,
185                Some(LegacyKey::Overwrite(owned_value_path!("bucket"))),
186                &owned_value_path!("bucket"),
187                Kind::bytes(),
188                None,
189            )
190            .with_source_metadata(
191                Self::NAME,
192                Some(LegacyKey::Overwrite(owned_value_path!("object"))),
193                &owned_value_path!("object"),
194                Kind::bytes(),
195                None,
196            )
197            .with_source_metadata(
198                Self::NAME,
199                Some(LegacyKey::Overwrite(owned_value_path!("region"))),
200                &owned_value_path!("region"),
201                Kind::bytes(),
202                None,
203            )
204            .with_source_metadata(
205                Self::NAME,
206                None,
207                &owned_value_path!("timestamp"),
208                Kind::timestamp(),
209                Some("timestamp"),
210            )
211            .with_standard_vector_source_metadata()
212            // for metadata that is added to the events dynamically from the metadata
213            .with_source_metadata(
214                Self::NAME,
215                None,
216                &owned_value_path!("metadata"),
217                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
218                None,
219            );
220
221        // for metadata that is added to the events dynamically from the metadata
222        if log_namespace == LogNamespace::Legacy {
223            schema_definition = schema_definition.unknown_fields(Kind::bytes());
224        }
225
226        vec![SourceOutput::new_maybe_logs(
227            self.decoding.output_type(),
228            schema_definition,
229        )]
230    }
231
232    fn can_acknowledge(&self) -> bool {
233        true
234    }
235}
236
237impl AwsS3Config {
238    async fn create_sqs_ingestor(
239        &self,
240        multiline: Option<line_agg::Config>,
241        proxy: &ProxyConfig,
242        log_namespace: LogNamespace,
243    ) -> crate::Result<sqs::Ingestor> {
244        let region = self.region.region();
245        let endpoint = self.region.endpoint();
246
247        let s3_client = create_client::<S3ClientBuilder>(
248            &S3ClientBuilder {
249                force_path_style: Some(self.force_path_style),
250            },
251            &self.auth,
252            region.clone(),
253            endpoint.clone(),
254            proxy,
255            self.tls_options.as_ref(),
256            None,
257        )
258        .await?;
259
260        let decoder =
261            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
262                .build()?;
263
264        match self.sqs {
265            Some(ref sqs) => {
266                let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
267                    &SqsClientBuilder {},
268                    &self.auth,
269                    region.clone(),
270                    endpoint,
271                    proxy,
272                    sqs.tls_options.as_ref(),
273                    sqs.timeout.as_ref(),
274                )
275                .await?;
276
277                let ingestor = sqs::Ingestor::new(
278                    region,
279                    sqs_client,
280                    s3_client,
281                    sqs.clone(),
282                    self.compression,
283                    multiline,
284                    decoder,
285                )
286                .await?;
287
288                Ok(ingestor)
289            }
290            None => Err(CreateSqsIngestorError::ConfigMissing {}.into()),
291        }
292    }
293}
294
295#[derive(Debug, Snafu)]
296enum CreateSqsIngestorError {
297    #[snafu(display("Configuration for `sqs` required when strategy=sqs"))]
298    ConfigMissing,
299}
300
301/// None if body is empty
302async fn s3_object_decoder(
303    compression: Compression,
304    key: &str,
305    content_encoding: Option<&str>,
306    content_type: Option<&str>,
307    mut body: ByteStream,
308) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
309    let first = match body.next().await {
310        Some(first) => first,
311        _ => {
312            return Box::new(tokio::io::empty());
313        }
314    };
315
316    let r = tokio::io::BufReader::new(StreamReader::new(
317        stream::iter(Some(first))
318            .chain(Box::pin(async_stream::stream! {
319                while let Some(next) = body.next().await {
320                    yield next;
321                }
322            }))
323            .map_err(std::io::Error::other),
324    ));
325
326    let compression = match compression {
327        Auto => determine_compression(content_encoding, content_type, key).unwrap_or(None),
328        _ => compression,
329    };
330
331    use Compression::*;
332    match compression {
333        Auto => unreachable!(), // is mapped above
334        None => Box::new(r),
335        Gzip => Box::new({
336            let mut decoder = bufread::GzipDecoder::new(r);
337            decoder.multiple_members(true);
338            decoder
339        }),
340        Zstd => Box::new({
341            let mut decoder = bufread::ZstdDecoder::new(r);
342            decoder.multiple_members(true);
343            decoder
344        }),
345    }
346}
347
348// try to determine the compression given the:
349// * content-encoding
350// * content-type
351// * key name (for file extension)
352//
353// It will use this information in this order
354fn determine_compression(
355    content_encoding: Option<&str>,
356    content_type: Option<&str>,
357    key: &str,
358) -> Option<Compression> {
359    content_encoding
360        .and_then(content_encoding_to_compression)
361        .or_else(|| content_type.and_then(content_type_to_compression))
362        .or_else(|| object_key_to_compression(key))
363}
364
365fn content_encoding_to_compression(content_encoding: &str) -> Option<Compression> {
366    match content_encoding {
367        "gzip" => Some(Compression::Gzip),
368        "zstd" => Some(Compression::Zstd),
369        _ => None,
370    }
371}
372
373fn content_type_to_compression(content_type: &str) -> Option<Compression> {
374    match content_type {
375        "application/gzip" | "application/x-gzip" => Some(Compression::Gzip),
376        "application/zstd" => Some(Compression::Zstd),
377        _ => None,
378    }
379}
380
381fn object_key_to_compression(key: &str) -> Option<Compression> {
382    let extension = std::path::Path::new(key)
383        .extension()
384        .and_then(std::ffi::OsStr::to_str);
385
386    use Compression::*;
387    extension.and_then(|extension| match extension {
388        "gz" => Some(Gzip),
389        "zst" => Some(Zstd),
390        _ => Option::None,
391    })
392}
393
394#[cfg(test)]
395mod test {
396    use tokio::io::AsyncReadExt;
397
398    use super::*;
399
400    #[test]
401    fn determine_compression() {
402        use super::Compression;
403
404        let cases = vec![
405            ("out.log", Some("gzip"), None, Some(Compression::Gzip)),
406            (
407                "out.log",
408                None,
409                Some("application/gzip"),
410                Some(Compression::Gzip),
411            ),
412            ("out.log.gz", None, None, Some(Compression::Gzip)),
413            ("out.txt", None, None, None),
414        ];
415        for case in cases {
416            let (key, content_encoding, content_type, expected) = case;
417            assert_eq!(
418                super::determine_compression(content_encoding, content_type, key),
419                expected,
420                "key={key:?} content_encoding={content_encoding:?} content_type={content_type:?}",
421            );
422        }
423    }
424
425    #[tokio::test]
426    async fn decode_empty_message_gzip() {
427        let key = uuid::Uuid::new_v4().to_string();
428
429        let mut data = Vec::new();
430        s3_object_decoder(
431            Compression::Auto,
432            &key,
433            Some("gzip"),
434            None,
435            ByteStream::default(),
436        )
437        .await
438        .read_to_end(&mut data)
439        .await
440        .unwrap();
441
442        assert!(data.is_empty());
443    }
444}
445
446#[cfg(feature = "aws-s3-integration-tests")]
447#[cfg(test)]
448mod integration_tests {
449    use std::{
450        any::Any,
451        collections::HashMap,
452        fs::File,
453        io::{self, BufRead},
454        path::Path,
455        time::Duration,
456    };
457
458    use aws_sdk_s3::Client as S3Client;
459    use aws_sdk_sqs::{Client as SqsClient, types::QueueAttributeName};
460    use similar_asserts::assert_eq;
461    use vector_lib::{
462        codecs::{JsonDeserializerConfig, decoding::DeserializerConfig},
463        lookup::path,
464    };
465    use vrl::value::Value;
466
467    use super::*;
468    use crate::{
469        SourceSender,
470        aws::{AwsAuthentication, RegionOrEndpoint, create_client},
471        common::sqs::SqsClientBuilder,
472        config::{ProxyConfig, SourceConfig, SourceContext},
473        event::EventStatus::{self, *},
474        line_agg,
475        sources::{
476            aws_s3::{S3ClientBuilder, sqs::S3Event},
477            util::MultilineConfig,
478        },
479        test_util::{
480            collect_n,
481            components::{SOURCE_TAGS, assert_source_compliance},
482            lines_from_gzip_file, random_lines, trace_init,
483        },
484    };
485
486    fn lines_from_plaintext<P: AsRef<Path>>(path: P) -> Vec<String> {
487        let file = io::BufReader::new(File::open(path).unwrap());
488        file.lines().map(|x| x.unwrap()).collect()
489    }
490
491    #[tokio::test]
492    async fn s3_process_message() {
493        trace_init();
494
495        let logs: Vec<String> = random_lines(100).take(10).collect();
496
497        test_event(
498            None,
499            None,
500            None,
501            None,
502            logs.join("\n").into_bytes(),
503            logs,
504            Delivered,
505            false,
506            DeserializerConfig::Bytes,
507            None,
508        )
509        .await;
510    }
511
512    #[tokio::test]
513    async fn s3_process_json_message() {
514        trace_init();
515
516        let logs: Vec<String> = random_lines(100).take(10).collect();
517
518        let json_logs: Vec<String> = logs
519            .iter()
520            .map(|msg| {
521                // convert to JSON object
522                format!(r#"{{"message": "{msg}"}}"#)
523            })
524            .collect();
525
526        test_event(
527            None,
528            None,
529            None,
530            None,
531            json_logs.join("\n").into_bytes(),
532            logs,
533            Delivered,
534            false,
535            DeserializerConfig::Json(JsonDeserializerConfig::default()),
536            None,
537        )
538        .await;
539    }
540
541    #[tokio::test]
542    async fn s3_process_message_with_log_namespace() {
543        trace_init();
544
545        let logs: Vec<String> = random_lines(100).take(10).collect();
546
547        test_event(
548            None,
549            None,
550            None,
551            None,
552            logs.join("\n").into_bytes(),
553            logs,
554            Delivered,
555            true,
556            DeserializerConfig::Bytes,
557            None,
558        )
559        .await;
560    }
561
562    #[tokio::test]
563    async fn s3_process_message_spaces() {
564        trace_init();
565
566        let key = "key with spaces".to_string();
567        let logs: Vec<String> = random_lines(100).take(10).collect();
568
569        test_event(
570            Some(key),
571            None,
572            None,
573            None,
574            logs.join("\n").into_bytes(),
575            logs,
576            Delivered,
577            false,
578            DeserializerConfig::Bytes,
579            None,
580        )
581        .await;
582    }
583
584    #[tokio::test]
585    async fn s3_process_message_special_characters() {
586        trace_init();
587
588        let key = format!("special:{}", uuid::Uuid::new_v4());
589        let logs: Vec<String> = random_lines(100).take(10).collect();
590
591        test_event(
592            Some(key),
593            None,
594            None,
595            None,
596            logs.join("\n").into_bytes(),
597            logs,
598            Delivered,
599            false,
600            DeserializerConfig::Bytes,
601            None,
602        )
603        .await;
604    }
605
606    #[tokio::test]
607    async fn s3_process_message_gzip() {
608        use std::io::Read;
609
610        trace_init();
611
612        let logs: Vec<String> = random_lines(100).take(10).collect();
613
614        let mut gz = flate2::read::GzEncoder::new(
615            io::Cursor::new(logs.join("\n").into_bytes()),
616            flate2::Compression::fast(),
617        );
618        let mut buffer = Vec::new();
619        gz.read_to_end(&mut buffer).unwrap();
620
621        test_event(
622            None,
623            Some("gzip"),
624            None,
625            None,
626            buffer,
627            logs,
628            Delivered,
629            false,
630            DeserializerConfig::Bytes,
631            None,
632        )
633        .await;
634    }
635
636    #[tokio::test]
637    async fn s3_process_message_multipart_gzip() {
638        use std::io::Read;
639
640        trace_init();
641
642        let logs = lines_from_gzip_file("tests/data/multipart-gzip.log.gz");
643
644        let buffer = {
645            let mut file =
646                File::open("tests/data/multipart-gzip.log.gz").expect("file can be opened");
647            let mut data = Vec::new();
648            file.read_to_end(&mut data).expect("file can be read");
649            data
650        };
651
652        test_event(
653            None,
654            Some("gzip"),
655            None,
656            None,
657            buffer,
658            logs,
659            Delivered,
660            false,
661            DeserializerConfig::Bytes,
662            None,
663        )
664        .await;
665    }
666
667    #[tokio::test]
668    async fn s3_process_message_multipart_zstd() {
669        use std::io::Read;
670
671        trace_init();
672
673        let logs = lines_from_plaintext("tests/data/multipart-zst.log");
674
675        let buffer = {
676            let mut file =
677                File::open("tests/data/multipart-zst.log.zst").expect("file can be opened");
678            let mut data = Vec::new();
679            file.read_to_end(&mut data).expect("file can be read");
680            data
681        };
682
683        test_event(
684            None,
685            Some("zstd"),
686            None,
687            None,
688            buffer,
689            logs,
690            Delivered,
691            false,
692            DeserializerConfig::Bytes,
693            None,
694        )
695        .await;
696    }
697
698    #[tokio::test]
699    async fn s3_process_message_multiline() {
700        trace_init();
701
702        let logs: Vec<String> = vec!["abc", "def", "geh"]
703            .into_iter()
704            .map(ToOwned::to_owned)
705            .collect();
706
707        test_event(
708            None,
709            None,
710            None,
711            Some(MultilineConfig {
712                start_pattern: "abc".to_owned(),
713                mode: line_agg::Mode::HaltWith,
714                condition_pattern: "geh".to_owned(),
715                timeout_ms: Duration::from_millis(1000),
716            }),
717            logs.join("\n").into_bytes(),
718            vec!["abc\ndef\ngeh".to_owned()],
719            Delivered,
720            false,
721            DeserializerConfig::Bytes,
722            None,
723        )
724        .await;
725    }
726
727    // TODO: re-enable this after figuring out why it is so flakey in CI
728    //       https://github.com/vectordotdev/vector/issues/17456
729    #[ignore]
730    #[tokio::test]
731    async fn handles_errored_status() {
732        trace_init();
733
734        let logs: Vec<String> = random_lines(100).take(10).collect();
735
736        test_event(
737            None,
738            None,
739            None,
740            None,
741            logs.join("\n").into_bytes(),
742            logs,
743            Errored,
744            false,
745            DeserializerConfig::Bytes,
746            None,
747        )
748        .await;
749    }
750
751    #[tokio::test]
752    async fn handles_failed_status() {
753        trace_init();
754
755        let logs: Vec<String> = random_lines(100).take(10).collect();
756
757        test_event(
758            None,
759            None,
760            None,
761            None,
762            logs.join("\n").into_bytes(),
763            logs,
764            Rejected,
765            false,
766            DeserializerConfig::Bytes,
767            None,
768        )
769        .await;
770    }
771
772    #[tokio::test]
773    async fn handles_failed_status_without_deletion() {
774        trace_init();
775
776        let logs: Vec<String> = random_lines(100).take(10).collect();
777
778        let mut custom_options: HashMap<String, Box<dyn Any>> = HashMap::new();
779        custom_options.insert("delete_failed_message".to_string(), Box::new(false));
780
781        test_event(
782            None,
783            None,
784            None,
785            None,
786            logs.join("\n").into_bytes(),
787            logs,
788            Rejected,
789            false,
790            DeserializerConfig::Bytes,
791            Some(custom_options),
792        )
793        .await;
794    }
795
796    fn s3_address() -> String {
797        std::env::var("S3_ADDRESS").unwrap_or_else(|_| "http://localhost:4566".into())
798    }
799
800    fn config(
801        queue_url: &str,
802        multiline: Option<MultilineConfig>,
803        log_namespace: bool,
804        decoding: DeserializerConfig,
805    ) -> AwsS3Config {
806        AwsS3Config {
807            region: RegionOrEndpoint::with_both("us-east-1", s3_address()),
808            strategy: Strategy::Sqs,
809            compression: Compression::Auto,
810            multiline,
811            sqs: Some(sqs::Config {
812                queue_url: queue_url.to_string(),
813                poll_secs: 1,
814                max_number_of_messages: 10,
815                visibility_timeout_secs: 0,
816                client_concurrency: None,
817                ..Default::default()
818            }),
819            acknowledgements: true.into(),
820            log_namespace: Some(log_namespace),
821            decoding,
822            ..Default::default()
823        }
824    }
825
826    // puts an object and asserts that the logs it gets back match
827    #[allow(clippy::too_many_arguments)]
828    async fn test_event(
829        key: Option<String>,
830        content_encoding: Option<&str>,
831        content_type: Option<&str>,
832        multiline: Option<MultilineConfig>,
833        payload: Vec<u8>,
834        expected_lines: Vec<String>,
835        status: EventStatus,
836        log_namespace: bool,
837        decoding: DeserializerConfig,
838        custom_options: Option<HashMap<String, Box<dyn Any>>>,
839    ) {
840        assert_source_compliance(&SOURCE_TAGS, async move {
841            let key = key.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
842
843            let s3 = s3_client().await;
844            let sqs = sqs_client().await;
845
846            let queue = create_queue(&sqs).await;
847            let bucket = create_bucket(&s3).await;
848
849            tokio::time::sleep(Duration::from_secs(1)).await;
850
851            let mut config = config(&queue, multiline, log_namespace, decoding);
852
853            if let Some(false) = custom_options
854                .as_ref()
855                .and_then(|opts| opts.get("delete_failed_message"))
856                .and_then(|val| val.downcast_ref::<bool>())
857                .copied()
858            {
859                config.sqs.as_mut().unwrap().delete_failed_message = false;
860            }
861
862            s3.put_object()
863                .bucket(bucket.clone())
864                .key(key.clone())
865                .body(ByteStream::from(payload))
866                .set_content_type(content_type.map(|t| t.to_owned()))
867                .set_content_encoding(content_encoding.map(|t| t.to_owned()))
868                .send()
869                .await
870                .expect("Could not put object");
871
872            let sqs_client = sqs_client().await;
873
874            let mut s3_event: S3Event = serde_json::from_str(
875            r#"
876{
877   "Records":[
878      {
879         "eventVersion":"2.1",
880         "eventSource":"aws:s3",
881         "awsRegion":"us-east-1",
882         "eventTime":"2022-03-24T19:43:00.548Z",
883         "eventName":"ObjectCreated:Put",
884         "userIdentity":{
885            "principalId":"AWS:ARNOTAREALIDD4:user.name"
886         },
887         "requestParameters":{
888            "sourceIPAddress":"136.56.73.213"
889         },
890         "responseElements":{
891            "x-amz-request-id":"ZX6X98Q6NM9NQTP3",
892            "x-amz-id-2":"ESLLtyT4N5cAPW+C9EXwtaeEWz6nq7eCA6txjZKlG2Q7xp2nHXQI69Od2B0PiYIbhUiX26NrpIQPV0lLI6js3nVNmYo2SWBs"
893         },
894         "s3":{
895            "s3SchemaVersion":"1.0",
896            "configurationId":"asdfasdf",
897            "bucket":{
898               "name":"bucket-name",
899               "ownerIdentity":{
900                  "principalId":"A3PEG170DF9VNQ"
901               },
902               "arn":"arn:aws:s3:::nfox-testing-vector"
903            },
904            "object":{
905               "key":"test-log.txt",
906               "size":33,
907               "eTag":"c981ce6672c4251048b0b834e334007f",
908               "sequencer":"00623CC9C47AB5634C"
909            }
910         }
911      }
912   ]
913}
914        "#,
915            )
916            .unwrap();
917
918            s3_event.records[0].s3.bucket.name.clone_from(&bucket);
919            s3_event.records[0].s3.object.key.clone_from(&key);
920
921            // send SQS message (this is usually sent by S3 itself when an object is uploaded)
922            // This does not automatically work with localstack and the AWS SDK, so this is done manually
923            let _send_message_output = sqs_client
924                .send_message()
925                .queue_url(queue.clone())
926                .message_body(serde_json::to_string(&s3_event).unwrap())
927                .send()
928                .await
929                .unwrap();
930
931            let (tx, rx) = SourceSender::new_test_finalize(status);
932            let cx = SourceContext::new_test(tx, None);
933            let namespace = cx.log_namespace(Some(log_namespace));
934            let source = config.build(cx).await.unwrap();
935            tokio::spawn(async move { source.await.unwrap() });
936
937            let events = collect_n(rx, expected_lines.len()).await;
938
939            assert_eq!(expected_lines.len(), events.len());
940            for (i, event) in events.iter().enumerate() {
941
942                if let Some(schema_definition) = config.outputs(namespace).pop().unwrap().schema_definition {
943                    schema_definition.is_valid_for_event(event).unwrap();
944                }
945
946                let message = expected_lines[i].as_str();
947
948                let log = event.as_log();
949                if log_namespace {
950                    assert_eq!(log.value(), &Value::from(message));
951                } else {
952                    assert_eq!(log["message"], message.into());
953                }
954                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("bucket"), path!("bucket")).unwrap(), &bucket.clone().into());
955                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("object"), path!("object")).unwrap(), &key.clone().into());
956                assert_eq!(namespace.get_source_metadata(AwsS3Config::NAME, log, path!("region"), path!("region")).unwrap(), &"us-east-1".into());
957            }
958
959            // Unfortunately we need a fairly large sleep here to ensure that the source has actually managed to delete the SQS message.
960            // The deletion of this message occurs after the Event has been sent out by the source and there is no way of knowing when this
961            // process has finished other than waiting around for a while.
962            tokio::time::sleep(Duration::from_secs(10)).await;
963            // Make sure the SQS message is deleted
964            match status {
965                Errored => {
966                    // need to wait up to the visibility timeout before it will be counted again
967                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
968                }
969                Rejected if !config.sqs.unwrap().delete_failed_message => {
970                    assert_eq!(count_messages(&sqs, &queue, 10).await, 1);
971                }
972                _ => {
973                    assert_eq!(count_messages(&sqs, &queue, 0).await, 0);
974                }
975            };
976        }).await;
977    }
978
979    /// creates a new SQS queue
980    ///
981    /// returns the queue name
982    async fn create_queue(client: &SqsClient) -> String {
983        let queue_name = uuid::Uuid::new_v4().to_string();
984
985        let res = client
986            .create_queue()
987            .queue_name(queue_name.clone())
988            .attributes(QueueAttributeName::VisibilityTimeout, "2")
989            .send()
990            .await
991            .expect("Could not create queue");
992
993        res.queue_url.expect("no queue url")
994    }
995
996    /// count the number of messages in a SQS queue
997    async fn count_messages(client: &SqsClient, queue: &str, wait_time_seconds: i32) -> usize {
998        let sqs_result = client
999            .receive_message()
1000            .queue_url(queue)
1001            .visibility_timeout(0)
1002            .wait_time_seconds(wait_time_seconds)
1003            .send()
1004            .await
1005            .unwrap();
1006
1007        sqs_result
1008            .messages
1009            .map(|messages| messages.len())
1010            .unwrap_or(0)
1011    }
1012
1013    /// creates a new S3 bucket
1014    ///
1015    /// returns the bucket name
1016    async fn create_bucket(client: &S3Client) -> String {
1017        let bucket_name = uuid::Uuid::new_v4().to_string();
1018
1019        client
1020            .create_bucket()
1021            .bucket(bucket_name.clone())
1022            .send()
1023            .await
1024            .expect("Could not create bucket");
1025
1026        bucket_name
1027    }
1028
1029    async fn s3_client() -> S3Client {
1030        let auth = AwsAuthentication::test_auth();
1031        let region_endpoint = RegionOrEndpoint {
1032            region: Some("us-east-1".to_owned()),
1033            endpoint: Some(s3_address()),
1034        };
1035        let proxy_config = ProxyConfig::default();
1036        let force_path_style_value: bool = true;
1037        create_client::<S3ClientBuilder>(
1038            &S3ClientBuilder {
1039                force_path_style: Some(force_path_style_value),
1040            },
1041            &auth,
1042            region_endpoint.region(),
1043            region_endpoint.endpoint(),
1044            &proxy_config,
1045            None,
1046            None,
1047        )
1048        .await
1049        .unwrap()
1050    }
1051
1052    async fn sqs_client() -> SqsClient {
1053        let auth = AwsAuthentication::test_auth();
1054        let region_endpoint = RegionOrEndpoint {
1055            region: Some("us-east-1".to_owned()),
1056            endpoint: Some(s3_address()),
1057        };
1058        let proxy_config = ProxyConfig::default();
1059        create_client::<SqsClientBuilder>(
1060            &SqsClientBuilder {},
1061            &auth,
1062            region_endpoint.region(),
1063            region_endpoint.endpoint(),
1064            &proxy_config,
1065            None,
1066            None,
1067        )
1068        .await
1069        .unwrap()
1070    }
1071}