vector/sources/aws_kinesis_firehose/
mod.rs

1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9    codecs::decoding::{DeserializerConfig, FramingConfig},
10    config::{LegacyKey, LogNamespace},
11    configurable::configurable_component,
12    lookup::owned_value_path,
13    sensitive_string::SensitiveString,
14    tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19    codecs::DecodingConfig,
20    config::{
21        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22        SourceOutput,
23    },
24    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25    serde::{bool_or_struct, default_decoding, default_framing_message_based},
26    tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34/// Configuration for the `aws_kinesis_firehose` source.
35#[configurable_component(source(
36    "aws_kinesis_firehose",
37    "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41    /// The socket address to listen for connections on.
42    #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43    #[configurable(metadata(docs::examples = "localhost:443"))]
44    address: SocketAddr,
45
46    /// An access key to authenticate requests against.
47    ///
48    /// AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
49    /// configured, `access_key` should be set to the same value. Otherwise, all requests are allowed.
50    #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52    access_key: Option<SensitiveString>,
53
54    /// A list of access keys to authenticate requests against.
55    ///
56    /// AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
57    /// configured, `access_keys` should be set to the same value. Otherwise, all requests are allowed.
58    #[configurable(metadata(docs::examples = "access_keys_example()"))]
59    access_keys: Option<Vec<SensitiveString>>,
60
61    /// Whether or not to store the AWS Firehose Access Key in event secrets.
62    ///
63    /// If set to `true`, when incoming requests contains an access key sent by AWS Firehose, it is kept in the
64    /// event secrets as "aws_kinesis_firehose_access_key".
65    #[configurable(derived)]
66    store_access_key: bool,
67
68    /// The compression scheme to use for decompressing records within the Firehose message.
69    ///
70    /// Some services, like AWS CloudWatch Logs, [compresses the events with gzip][events_with_gzip],
71    /// before sending them AWS Kinesis Firehose. This option can be used to automatically decompress
72    /// them before forwarding them to the next component.
73    ///
74    /// Note that this is different from [Content encoding option][encoding_option] of the
75    /// Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.
76    ///
77    /// [events_with_gzip]: https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html
78    /// [encoding_option]: https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http
79    #[serde(default)]
80    record_compression: Compression,
81
82    #[configurable(derived)]
83    tls: Option<TlsEnableableConfig>,
84
85    #[configurable(derived)]
86    #[configurable(metadata(docs::advanced))]
87    #[serde(default = "default_framing_message_based")]
88    framing: FramingConfig,
89
90    #[configurable(derived)]
91    #[configurable(metadata(docs::advanced))]
92    #[serde(default = "default_decoding")]
93    decoding: DeserializerConfig,
94
95    #[configurable(derived)]
96    #[serde(default, deserialize_with = "bool_or_struct")]
97    acknowledgements: SourceAcknowledgementsConfig,
98
99    /// The namespace to use for logs. This overrides the global setting.
100    #[configurable(metadata(docs::hidden))]
101    #[serde(default)]
102    log_namespace: Option<bool>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110    ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113/// Compression scheme for records in a Firehose message.
114#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118#[derivative(Default)]
119pub enum Compression {
120    /// Automatically attempt to determine the compression scheme.
121    ///
122    /// The compression scheme of the object is determined by looking at its file signature, also known
123    /// as [magic bytes][magic_bytes].
124    ///
125    /// If the record fails to decompress with the discovered format, the record is forwarded as is.
126    /// Thus, if you know the records are always gzip encoded (for example, if they are coming from AWS CloudWatch Logs),
127    /// set `gzip` in this field so that any records that are not-gzipped are rejected.
128    ///
129    /// [magic_bytes]: https://en.wikipedia.org/wiki/List_of_file_signatures
130    #[derivative(Default)]
131    Auto,
132
133    /// Uncompressed.
134    None,
135
136    /// GZIP.
137    Gzip,
138}
139
140impl fmt::Display for Compression {
141    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
142        match self {
143            Compression::Auto => write!(fmt, "auto"),
144            Compression::None => write!(fmt, "none"),
145            Compression::Gzip => write!(fmt, "gzip"),
146        }
147    }
148}
149
150#[async_trait::async_trait]
151#[typetag::serde(name = "aws_kinesis_firehose")]
152impl SourceConfig for AwsKinesisFirehoseConfig {
153    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
154        let log_namespace = cx.log_namespace(self.log_namespace);
155        let decoder =
156            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
157                .build()?;
158
159        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
160
161        if self.access_key.is_some() {
162            warn!("DEPRECATION `access_key`, use `access_keys` instead.")
163        }
164
165        // Merge with legacy `access_key`
166        let access_keys = self
167            .access_keys
168            .iter()
169            .flatten()
170            .chain(self.access_key.iter());
171
172        let svc = filters::firehose(
173            access_keys.map(|key| key.inner().to_string()).collect(),
174            self.store_access_key,
175            self.record_compression,
176            decoder,
177            acknowledgements,
178            cx.out,
179            log_namespace,
180        );
181
182        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
183        let listener = tls.bind(&self.address).await?;
184
185        let keepalive_settings = self.keepalive.clone();
186        let shutdown = cx.shutdown;
187        Ok(Box::pin(async move {
188            let span = Span::current();
189            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190                let svc = ServiceBuilder::new()
191                    .layer(build_http_trace_layer(span.clone()))
192                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
193                        MaxConnectionAgeLayer::new(
194                            Duration::from_secs(secs),
195                            keepalive_settings.max_connection_age_jitter_factor,
196                            conn.peer_addr(),
197                        )
198                    }))
199                    .service(warp::service(svc.clone()));
200                futures_util::future::ok::<_, Infallible>(svc)
201            });
202
203            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
204                .serve(make_svc)
205                .with_graceful_shutdown(shutdown.map(|_| ()))
206                .await
207                .map_err(|err| {
208                    error!("An error occurred: {:?}.", err);
209                })?;
210
211            Ok(())
212        }))
213    }
214
215    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
216        let schema_definition = self
217            .decoding
218            .schema_definition(global_log_namespace.merge(self.log_namespace))
219            .with_standard_vector_source_metadata()
220            .with_source_metadata(
221                Self::NAME,
222                Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
223                &owned_value_path!("request_id"),
224                Kind::bytes(),
225                None,
226            )
227            .with_source_metadata(
228                Self::NAME,
229                Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
230                &owned_value_path!("source_arn"),
231                Kind::bytes(),
232                None,
233            );
234
235        vec![SourceOutput::new_maybe_logs(
236            self.decoding.output_type(),
237            schema_definition,
238        )]
239    }
240
241    fn resources(&self) -> Vec<Resource> {
242        vec![Resource::tcp(self.address)]
243    }
244
245    fn can_acknowledge(&self) -> bool {
246        true
247    }
248}
249
250impl GenerateConfig for AwsKinesisFirehoseConfig {
251    fn generate_config() -> toml::Value {
252        toml::Value::try_from(Self {
253            address: "0.0.0.0:443".parse().unwrap(),
254            access_key: None,
255            access_keys: None,
256            store_access_key: false,
257            tls: None,
258            record_compression: Default::default(),
259            framing: default_framing_message_based(),
260            decoding: default_decoding(),
261            acknowledgements: Default::default(),
262            log_namespace: None,
263            keepalive: Default::default(),
264        })
265        .unwrap()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    #![allow(clippy::print_stdout)] //tests
272
273    use std::{
274        io::{Cursor, Read},
275        net::SocketAddr,
276    };
277
278    use base64::prelude::{BASE64_STANDARD, Engine as _};
279    use bytes::Bytes;
280    use chrono::{DateTime, SubsecRound, Utc};
281    use flate2::read::GzEncoder;
282    use futures::Stream;
283    use similar_asserts::assert_eq;
284    use tokio::time::{Duration, sleep};
285    use vector_lib::{assert_event_data_eq, lookup::path};
286    use vrl::value;
287
288    use super::*;
289    use crate::{
290        SourceSender,
291        event::{Event, EventStatus},
292        log_event,
293        test_util::{
294            addr::{PortGuard, next_addr},
295            collect_ready,
296            components::{SOURCE_TAGS, assert_source_compliance},
297            wait_for_tcp,
298        },
299    };
300
301    const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
302    const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
303    // example CloudWatch Logs subscription event
304    const RECORD: &str = r#"
305            {
306                "messageType": "DATA_MESSAGE",
307                "owner": "071959437513",
308                "logGroup": "/jesse/test",
309                "logStream": "test",
310                "subscriptionFilters": ["Destination"],
311                "logEvents": [
312                    {
313                        "id": "35683658089614582423604394983260738922885519999578275840",
314                        "timestamp": 1600110569039,
315                        "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
316                    },
317                    {
318                        "id": "35683658089659183914001456229543810359430816722590236673",
319                        "timestamp": 1600110569041,
320                        "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
321                    }
322                ]
323            }
324        "#;
325
326    #[test]
327    fn generate_config() {
328        crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
329    }
330
331    async fn source(
332        access_key: Option<SensitiveString>,
333        access_keys: Option<Vec<SensitiveString>>,
334        store_access_key: bool,
335        record_compression: Compression,
336        delivered: bool,
337        log_namespace: bool,
338    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
339        use EventStatus::*;
340        let status = if delivered { Delivered } else { Rejected };
341        let (sender, recv) = SourceSender::new_test_finalize(status);
342        let (_guard, address) = next_addr();
343        let cx = SourceContext::new_test(sender, None);
344        tokio::spawn(async move {
345            AwsKinesisFirehoseConfig {
346                address,
347                tls: None,
348                access_key,
349                access_keys,
350                store_access_key,
351                record_compression,
352                framing: default_framing_message_based(),
353                decoding: default_decoding(),
354                acknowledgements: true.into(),
355                log_namespace: Some(log_namespace),
356                keepalive: Default::default(),
357            }
358            .build(cx)
359            .await
360            .unwrap()
361            .await
362            .unwrap()
363        });
364        // Wait for the component to bind to the port
365        wait_for_tcp(address).await;
366        (recv, address, _guard)
367    }
368
369    /// Sends the body to the address with the appropriate Firehose headers
370    ///
371    /// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
372    async fn send(
373        address: SocketAddr,
374        timestamp: DateTime<Utc>,
375        records: Vec<&[u8]>,
376        key: Option<&str>,
377        gzip: bool,
378        record_compression: Compression,
379    ) -> reqwest::Result<reqwest::Response> {
380        let request = models::FirehoseRequest {
381            access_key: key.map(|s| s.to_string()),
382            request_id: REQUEST_ID.to_string(),
383            timestamp,
384            records: records
385                .into_iter()
386                .map(|record| models::EncodedFirehoseRecord {
387                    data: encode_record(record, record_compression).unwrap(),
388                })
389                .collect(),
390        };
391
392        let mut builder = reqwest::Client::new()
393            .post(format!("http://{address}"))
394            .header("host", address.to_string())
395            .header(
396                "x-amzn-trace-id",
397                "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
398            )
399            .header("x-amz-firehose-protocol-version", "1.0")
400            .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
401            .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
402            .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
403            .header("content-type", "application/json");
404
405        if let Some(key) = key {
406            builder = builder.header("x-amz-firehose-access-key", key);
407        }
408
409        if gzip {
410            let mut gz = GzEncoder::new(
411                Cursor::new(serde_json::to_vec(&request).unwrap()),
412                flate2::Compression::fast(),
413            );
414            let mut buffer = Vec::new();
415            gz.read_to_end(&mut buffer).unwrap();
416            builder = builder.header("content-encoding", "gzip").body(buffer);
417        } else {
418            builder = builder.json(&request);
419        }
420
421        builder.send().await
422    }
423
424    async fn spawn_send(
425        address: SocketAddr,
426        timestamp: DateTime<Utc>,
427        records: Vec<&'static [u8]>,
428        key: Option<&'static str>,
429        gzip: bool,
430        record_compression: Compression,
431    ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
432        let handle = tokio::spawn(async move {
433            send(address, timestamp, records, key, gzip, record_compression).await
434        });
435        sleep(Duration::from_millis(500)).await;
436        handle
437    }
438
439    /// Encodes record data to mach AWS's representation: base64 encoded with an additional
440    /// compression
441    fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
442        let compressed = match compression {
443            Compression::Auto => panic!("cannot encode records as Auto"),
444            Compression::Gzip => {
445                let mut buffer = Vec::new();
446                if !record.is_empty() {
447                    let mut gz = GzEncoder::new(record, flate2::Compression::fast());
448                    gz.read_to_end(&mut buffer)?;
449                }
450                buffer
451            }
452            Compression::None => record.to_vec(),
453        };
454
455        Ok(BASE64_STANDARD.encode(compressed))
456    }
457
458    #[tokio::test]
459    async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
460        let gzipped_record = {
461            let mut buf = Vec::new();
462            let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
463            gz.read_to_end(&mut buf).unwrap();
464            buf
465        };
466
467        for (source_record_compression, record_compression, success, record, expected) in [
468            (
469                Compression::Auto,
470                Compression::Gzip,
471                true,
472                RECORD.as_bytes(),
473                RECORD.as_bytes().to_owned(),
474            ),
475            (
476                Compression::Auto,
477                Compression::None,
478                true,
479                RECORD.as_bytes(),
480                RECORD.as_bytes().to_owned(),
481            ),
482            (
483                Compression::None,
484                Compression::Gzip,
485                true,
486                RECORD.as_bytes(),
487                gzipped_record,
488            ),
489            (
490                Compression::None,
491                Compression::None,
492                true,
493                RECORD.as_bytes(),
494                RECORD.as_bytes().to_owned(),
495            ),
496            (
497                Compression::Gzip,
498                Compression::Gzip,
499                true,
500                RECORD.as_bytes(),
501                RECORD.as_bytes().to_owned(),
502            ),
503            (
504                Compression::Gzip,
505                Compression::None,
506                false,
507                RECORD.as_bytes(),
508                RECORD.as_bytes().to_owned(),
509            ),
510            (
511                Compression::Gzip,
512                Compression::Gzip,
513                true,
514                "".as_bytes(),
515                Vec::new(),
516            ),
517        ] {
518            let (rx, addr, _guard) =
519                source(None, None, false, source_record_compression, true, false).await;
520
521            let timestamp: DateTime<Utc> = Utc::now();
522
523            let res = spawn_send(
524                addr,
525                timestamp,
526                vec![record],
527                None,
528                false,
529                record_compression,
530            )
531            .await;
532
533            if success {
534                let events = collect_ready(rx).await;
535
536                let res = res.await.unwrap().unwrap();
537                assert_eq!(200, res.status().as_u16());
538
539                assert_event_data_eq!(
540                    events,
541                    vec![log_event! {
542                        "source_type" => Bytes::from("aws_kinesis_firehose"),
543                        "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
544                        "message" => Bytes::from(expected),
545                        "request_id" => REQUEST_ID,
546                        "source_arn" => SOURCE_ARN,
547                    },]
548                );
549
550                let response: models::FirehoseResponse = res.json().await.unwrap();
551                assert_eq!(response.request_id, REQUEST_ID);
552            } else {
553                let res = res.await.unwrap().unwrap();
554                assert_eq!(400, res.status().as_u16());
555            }
556        }
557    }
558
559    #[tokio::test]
560    async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
561        let gzipped_record = {
562            let mut buf = Vec::new();
563            let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
564            gz.read_to_end(&mut buf).unwrap();
565            buf
566        };
567
568        for (source_record_compression, record_compression, success, record, expected) in [
569            (
570                Compression::Auto,
571                Compression::Gzip,
572                true,
573                RECORD.as_bytes(),
574                RECORD.as_bytes().to_owned(),
575            ),
576            (
577                Compression::Auto,
578                Compression::None,
579                true,
580                RECORD.as_bytes(),
581                RECORD.as_bytes().to_owned(),
582            ),
583            (
584                Compression::None,
585                Compression::Gzip,
586                true,
587                RECORD.as_bytes(),
588                gzipped_record,
589            ),
590            (
591                Compression::None,
592                Compression::None,
593                true,
594                RECORD.as_bytes(),
595                RECORD.as_bytes().to_owned(),
596            ),
597            (
598                Compression::Gzip,
599                Compression::Gzip,
600                true,
601                RECORD.as_bytes(),
602                RECORD.as_bytes().to_owned(),
603            ),
604            (
605                Compression::Gzip,
606                Compression::None,
607                false,
608                RECORD.as_bytes(),
609                RECORD.as_bytes().to_owned(),
610            ),
611            (
612                Compression::Gzip,
613                Compression::Gzip,
614                true,
615                "".as_bytes(),
616                Vec::new(),
617            ),
618        ] {
619            let (rx, addr, _guard) =
620                source(None, None, false, source_record_compression, true, true).await;
621
622            let timestamp: DateTime<Utc> = Utc::now();
623
624            let res = spawn_send(
625                addr,
626                timestamp,
627                vec![record],
628                None,
629                false,
630                record_compression,
631            )
632            .await;
633
634            if success {
635                let events = collect_ready(rx).await;
636
637                let res = res.await.unwrap().unwrap();
638                assert_eq!(200, res.status().as_u16());
639
640                for event in events {
641                    let log = event.as_log();
642                    let meta = log.metadata();
643
644                    // event data, currently assumes default bytes deserializer
645                    assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
646
647                    // vector metadata
648                    assert_eq!(
649                        meta.value().get(path!("vector", "source_type")).unwrap(),
650                        &value!("aws_kinesis_firehose")
651                    );
652                    assert!(
653                        meta.value()
654                            .get(path!("vector", "ingest_timestamp"))
655                            .unwrap()
656                            .is_timestamp()
657                    );
658
659                    // source metadata
660                    assert_eq!(
661                        meta.value()
662                            .get(path!("aws_kinesis_firehose", "request_id"))
663                            .unwrap(),
664                        &value!(REQUEST_ID)
665                    );
666                    assert_eq!(
667                        meta.value()
668                            .get(path!("aws_kinesis_firehose", "source_arn"))
669                            .unwrap(),
670                        &value!(SOURCE_ARN)
671                    );
672                    assert_eq!(
673                        meta.value()
674                            .get(path!("aws_kinesis_firehose", "timestamp"))
675                            .unwrap(),
676                        &value!(timestamp.trunc_subsecs(3))
677                    );
678                }
679
680                let response: models::FirehoseResponse = res.json().await.unwrap();
681                assert_eq!(response.request_id, REQUEST_ID);
682            } else {
683                let res = res.await.unwrap().unwrap();
684                assert_eq!(400, res.status().as_u16());
685            }
686        }
687    }
688
689    #[tokio::test]
690    async fn aws_kinesis_firehose_forwards_events_gzip_request() {
691        assert_source_compliance(&SOURCE_TAGS, async move {
692            let (rx, addr, _guard) =
693                source(None, None, false, Default::default(), true, false).await;
694
695            let timestamp: DateTime<Utc> = Utc::now();
696
697            let res = spawn_send(
698                addr,
699                timestamp,
700                vec![RECORD.as_bytes()],
701                None,
702                true,
703                Compression::None,
704            )
705            .await;
706
707            let events = collect_ready(rx).await;
708            let res = res.await.unwrap().unwrap();
709            assert_eq!(200, res.status().as_u16());
710
711            assert_event_data_eq!(
712                events,
713                vec![log_event! {
714                    "source_type" => Bytes::from("aws_kinesis_firehose"),
715                    "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
716                    "message"=> RECORD,
717                    "request_id" => REQUEST_ID,
718                    "source_arn" => SOURCE_ARN,
719                },]
720            );
721
722            let response: models::FirehoseResponse = res.json().await.unwrap();
723            assert_eq!(response.request_id, REQUEST_ID);
724        })
725        .await;
726    }
727
728    #[tokio::test]
729    async fn aws_kinesis_firehose_rejects_bad_access_key() {
730        let (_rx, addr, _guard) = source(
731            Some("an access key".to_string().into()),
732            Some(vec!["an access key in list".to_string().into()]),
733            Default::default(),
734            Default::default(),
735            true,
736            false,
737        )
738        .await;
739
740        let res = send(
741            addr,
742            Utc::now(),
743            vec![],
744            Some("bad access key"),
745            false,
746            Compression::None,
747        )
748        .await
749        .unwrap();
750        assert_eq!(401, res.status().as_u16());
751
752        let response: models::FirehoseResponse = res.json().await.unwrap();
753        assert_eq!(response.request_id, REQUEST_ID);
754    }
755
756    #[tokio::test]
757    async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
758        let (_rx, addr, _guard) = source(
759            None,
760            Some(vec!["an access key in list".to_string().into()]),
761            Default::default(),
762            Default::default(),
763            true,
764            false,
765        )
766        .await;
767
768        let res = send(
769            addr,
770            Utc::now(),
771            vec![],
772            Some("bad access key"),
773            false,
774            Compression::None,
775        )
776        .await
777        .unwrap();
778        assert_eq!(401, res.status().as_u16());
779
780        let response: models::FirehoseResponse = res.json().await.unwrap();
781        assert_eq!(response.request_id, REQUEST_ID);
782    }
783
784    #[tokio::test]
785    async fn aws_kinesis_firehose_accepts_merged_access_keys() {
786        let valid_access_key = SensitiveString::from(String::from("an access key in list"));
787
788        let (_rx, addr, _guard) = source(
789            Some(valid_access_key.clone()),
790            Some(vec!["valid access key 2".to_string().into()]),
791            Default::default(),
792            Default::default(),
793            true,
794            false,
795        )
796        .await;
797
798        let res = send(
799            addr,
800            Utc::now(),
801            vec![],
802            Some(valid_access_key.clone().inner()),
803            false,
804            Compression::None,
805        )
806        .await
807        .unwrap();
808
809        assert_eq!(200, res.status().as_u16());
810
811        let response: models::FirehoseResponse = res.json().await.unwrap();
812        assert_eq!(response.request_id, REQUEST_ID);
813    }
814
815    #[tokio::test]
816    async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
817        let valid_access_key = "an access key in list".to_string();
818
819        let (_rx, addr, _guard) = source(
820            None,
821            Some(vec![
822                valid_access_key.clone().into(),
823                "valid access key 2".to_string().into(),
824            ]),
825            Default::default(),
826            Default::default(),
827            true,
828            false,
829        )
830        .await;
831
832        let res = send(
833            addr,
834            Utc::now(),
835            vec![],
836            Some(&valid_access_key),
837            false,
838            Compression::None,
839        )
840        .await
841        .unwrap();
842
843        assert_eq!(200, res.status().as_u16());
844
845        let response: models::FirehoseResponse = res.json().await.unwrap();
846        assert_eq!(response.request_id, REQUEST_ID);
847    }
848
849    #[tokio::test]
850    async fn handles_acknowledgement_failure() {
851        let expected = RECORD.as_bytes().to_owned();
852
853        let (rx, addr, _guard) = source(None, None, false, Compression::None, false, false).await;
854
855        let timestamp: DateTime<Utc> = Utc::now();
856
857        let res = spawn_send(
858            addr,
859            timestamp,
860            vec![RECORD.as_bytes()],
861            None,
862            false,
863            Compression::None,
864        )
865        .await;
866
867        let events = collect_ready(rx).await;
868
869        let res = res.await.unwrap().unwrap();
870        assert_eq!(406, res.status().as_u16());
871
872        assert_event_data_eq!(
873            events,
874            vec![log_event! {
875                "source_type" => Bytes::from("aws_kinesis_firehose"),
876                "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
877                "message"=> Bytes::from(expected),
878                "request_id" => REQUEST_ID,
879                "source_arn" => SOURCE_ARN,
880            },]
881        );
882
883        let response: models::FirehoseResponse = res.json().await.unwrap();
884        assert_eq!(response.request_id, REQUEST_ID);
885    }
886
887    #[tokio::test]
888    async fn event_access_key_passthrough_enabled() {
889        let (rx, address, _guard) = source(
890            None,
891            Some(vec!["an access key".to_string().into()]),
892            true,
893            Default::default(),
894            true,
895            true,
896        )
897        .await;
898
899        let timestamp: DateTime<Utc> = Utc::now();
900
901        spawn_send(
902            address,
903            timestamp,
904            vec![RECORD.as_bytes()],
905            Some("an access key"),
906            false,
907            Compression::None,
908        )
909        .await;
910
911        let events = collect_ready(rx).await;
912        let access_key = events[0]
913            .metadata()
914            .secrets()
915            .get("aws_kinesis_firehose_access_key")
916            .unwrap();
917        assert_eq!(access_key.to_string(), "an access key".to_string());
918    }
919
920    #[tokio::test]
921    async fn no_authorization_access_key_passthrough_enabled() {
922        let (rx, address, _guard) = source(None, None, true, Default::default(), true, true).await;
923
924        let timestamp: DateTime<Utc> = Utc::now();
925
926        spawn_send(
927            address,
928            timestamp,
929            vec![RECORD.as_bytes()],
930            None,
931            false,
932            Compression::None,
933        )
934        .await;
935
936        let events = collect_ready(rx).await;
937
938        assert!(
939            events[0]
940                .metadata()
941                .secrets()
942                .get("aws_kinesis_firehose_access_key")
943                .is_none()
944        );
945    }
946}