vector/sources/aws_kinesis_firehose/
mod.rs

1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9    codecs::decoding::{DeserializerConfig, FramingConfig},
10    config::{LegacyKey, LogNamespace},
11    configurable::configurable_component,
12    lookup::owned_value_path,
13    sensitive_string::SensitiveString,
14    tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19    codecs::DecodingConfig,
20    config::{
21        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22        SourceOutput,
23    },
24    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25    serde::{bool_or_struct, default_decoding, default_framing_message_based},
26    tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34/// Configuration for the `aws_kinesis_firehose` source.
35#[configurable_component(source(
36    "aws_kinesis_firehose",
37    "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41    /// The socket address to listen for connections on.
42    #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43    #[configurable(metadata(docs::examples = "localhost:443"))]
44    address: SocketAddr,
45
46    /// An access key to authenticate requests against.
47    ///
48    /// AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
49    /// configured, `access_key` should be set to the same value. Otherwise, all requests are allowed.
50    #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52    access_key: Option<SensitiveString>,
53
54    /// A list of access keys to authenticate requests against.
55    ///
56    /// AWS Kinesis Firehose can be configured to pass along a user-configurable access key with each request. If
57    /// configured, `access_keys` should be set to the same value. Otherwise, all requests are allowed.
58    #[configurable(metadata(docs::examples = "access_keys_example()"))]
59    access_keys: Option<Vec<SensitiveString>>,
60
61    /// Whether or not to store the AWS Firehose Access Key in event secrets.
62    ///
63    /// If set to `true`, when incoming requests contains an access key sent by AWS Firehose, it is kept in the
64    /// event secrets as "aws_kinesis_firehose_access_key".
65    #[configurable(derived)]
66    store_access_key: bool,
67
68    /// The compression scheme to use for decompressing records within the Firehose message.
69    ///
70    /// Some services, like AWS CloudWatch Logs, [compresses the events with gzip][events_with_gzip],
71    /// before sending them AWS Kinesis Firehose. This option can be used to automatically decompress
72    /// them before forwarding them to the next component.
73    ///
74    /// Note that this is different from [Content encoding option][encoding_option] of the
75    /// Firehose HTTP endpoint destination. That option controls the content encoding of the entire HTTP request.
76    ///
77    /// [events_with_gzip]: https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html
78    /// [encoding_option]: https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http
79    #[serde(default)]
80    record_compression: Compression,
81
82    #[configurable(derived)]
83    tls: Option<TlsEnableableConfig>,
84
85    #[configurable(derived)]
86    #[configurable(metadata(docs::advanced))]
87    #[serde(default = "default_framing_message_based")]
88    framing: FramingConfig,
89
90    #[configurable(derived)]
91    #[configurable(metadata(docs::advanced))]
92    #[serde(default = "default_decoding")]
93    decoding: DeserializerConfig,
94
95    #[configurable(derived)]
96    #[serde(default, deserialize_with = "bool_or_struct")]
97    acknowledgements: SourceAcknowledgementsConfig,
98
99    /// The namespace to use for logs. This overrides the global setting.
100    #[configurable(metadata(docs::hidden))]
101    #[serde(default)]
102    log_namespace: Option<bool>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110    ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113/// Compression scheme for records in a Firehose message.
114#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118pub enum Compression {
119    /// Automatically attempt to determine the compression scheme.
120    ///
121    /// The compression scheme of the object is determined by looking at its file signature, also known
122    /// as [magic bytes][magic_bytes].
123    ///
124    /// If the record fails to decompress with the discovered format, the record is forwarded as is.
125    /// Thus, if you know the records are always gzip encoded (for example, if they are coming from AWS CloudWatch Logs),
126    /// set `gzip` in this field so that any records that are not-gzipped are rejected.
127    ///
128    /// [magic_bytes]: https://en.wikipedia.org/wiki/List_of_file_signatures
129    #[default]
130    Auto,
131
132    /// Uncompressed.
133    None,
134
135    /// GZIP.
136    Gzip,
137}
138
139impl fmt::Display for Compression {
140    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
141        match self {
142            Compression::Auto => write!(fmt, "auto"),
143            Compression::None => write!(fmt, "none"),
144            Compression::Gzip => write!(fmt, "gzip"),
145        }
146    }
147}
148
149#[async_trait::async_trait]
150#[typetag::serde(name = "aws_kinesis_firehose")]
151impl SourceConfig for AwsKinesisFirehoseConfig {
152    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
153        let log_namespace = cx.log_namespace(self.log_namespace);
154        let decoder =
155            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
156                .build()?;
157
158        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
159
160        if self.access_key.is_some() {
161            warn!("DEPRECATION `access_key`, use `access_keys` instead.")
162        }
163
164        // Merge with legacy `access_key`
165        let access_keys = self
166            .access_keys
167            .iter()
168            .flatten()
169            .chain(self.access_key.iter());
170
171        let svc = filters::firehose(
172            access_keys.map(|key| key.inner().to_string()).collect(),
173            self.store_access_key,
174            self.record_compression,
175            decoder,
176            acknowledgements,
177            cx.out,
178            log_namespace,
179        );
180
181        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
182        let listener = tls.bind(&self.address).await?;
183
184        let keepalive_settings = self.keepalive.clone();
185        let shutdown = cx.shutdown;
186        Ok(Box::pin(async move {
187            let span = Span::current();
188            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
189                let svc = ServiceBuilder::new()
190                    .layer(build_http_trace_layer(span.clone()))
191                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
192                        MaxConnectionAgeLayer::new(
193                            Duration::from_secs(secs),
194                            keepalive_settings.max_connection_age_jitter_factor,
195                            conn.peer_addr(),
196                        )
197                    }))
198                    .service(warp::service(svc.clone()));
199                futures_util::future::ok::<_, Infallible>(svc)
200            });
201
202            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
203                .serve(make_svc)
204                .with_graceful_shutdown(shutdown.map(|_| ()))
205                .await
206                .map_err(|err| {
207                    error!("An error occurred: {:?}.", err);
208                })?;
209
210            Ok(())
211        }))
212    }
213
214    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
215        let schema_definition = self
216            .decoding
217            .schema_definition(global_log_namespace.merge(self.log_namespace))
218            .with_standard_vector_source_metadata()
219            .with_source_metadata(
220                Self::NAME,
221                Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
222                &owned_value_path!("request_id"),
223                Kind::bytes(),
224                None,
225            )
226            .with_source_metadata(
227                Self::NAME,
228                Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
229                &owned_value_path!("source_arn"),
230                Kind::bytes(),
231                None,
232            );
233
234        vec![SourceOutput::new_maybe_logs(
235            self.decoding.output_type(),
236            schema_definition,
237        )]
238    }
239
240    fn resources(&self) -> Vec<Resource> {
241        vec![Resource::tcp(self.address)]
242    }
243
244    fn can_acknowledge(&self) -> bool {
245        true
246    }
247}
248
249impl GenerateConfig for AwsKinesisFirehoseConfig {
250    fn generate_config() -> toml::Value {
251        toml::Value::try_from(Self {
252            address: "0.0.0.0:443".parse().unwrap(),
253            access_key: None,
254            access_keys: None,
255            store_access_key: false,
256            tls: None,
257            record_compression: Default::default(),
258            framing: default_framing_message_based(),
259            decoding: default_decoding(),
260            acknowledgements: Default::default(),
261            log_namespace: None,
262            keepalive: Default::default(),
263        })
264        .unwrap()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    #![allow(clippy::print_stdout)] //tests
271
272    use std::{
273        io::{Cursor, Read},
274        net::SocketAddr,
275    };
276
277    use base64::prelude::{BASE64_STANDARD, Engine as _};
278    use bytes::Bytes;
279    use chrono::{DateTime, SubsecRound, Utc};
280    use flate2::read::GzEncoder;
281    use futures::Stream;
282    use similar_asserts::assert_eq;
283    use tokio::time::{Duration, sleep};
284    use vector_lib::{assert_event_data_eq, lookup::path};
285    use vrl::value;
286
287    use super::*;
288    use crate::{
289        SourceSender,
290        event::{Event, EventStatus},
291        log_event,
292        test_util::{
293            addr::{PortGuard, next_addr},
294            collect_ready,
295            components::{SOURCE_TAGS, assert_source_compliance},
296            wait_for_tcp,
297        },
298    };
299
300    const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
301    const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
302    // example CloudWatch Logs subscription event
303    const RECORD: &str = r#"
304            {
305                "messageType": "DATA_MESSAGE",
306                "owner": "071959437513",
307                "logGroup": "/jesse/test",
308                "logStream": "test",
309                "subscriptionFilters": ["Destination"],
310                "logEvents": [
311                    {
312                        "id": "35683658089614582423604394983260738922885519999578275840",
313                        "timestamp": 1600110569039,
314                        "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
315                    },
316                    {
317                        "id": "35683658089659183914001456229543810359430816722590236673",
318                        "timestamp": 1600110569041,
319                        "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
320                    }
321                ]
322            }
323        "#;
324
325    #[test]
326    fn generate_config() {
327        crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
328    }
329
330    async fn source(
331        access_key: Option<SensitiveString>,
332        access_keys: Option<Vec<SensitiveString>>,
333        store_access_key: bool,
334        record_compression: Compression,
335        delivered: bool,
336        log_namespace: bool,
337    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
338        use EventStatus::*;
339        let status = if delivered { Delivered } else { Rejected };
340        let (sender, recv) = SourceSender::new_test_finalize(status);
341        let (_guard, address) = next_addr();
342        let cx = SourceContext::new_test(sender, None);
343        tokio::spawn(async move {
344            AwsKinesisFirehoseConfig {
345                address,
346                tls: None,
347                access_key,
348                access_keys,
349                store_access_key,
350                record_compression,
351                framing: default_framing_message_based(),
352                decoding: default_decoding(),
353                acknowledgements: true.into(),
354                log_namespace: Some(log_namespace),
355                keepalive: Default::default(),
356            }
357            .build(cx)
358            .await
359            .unwrap()
360            .await
361            .unwrap()
362        });
363        // Wait for the component to bind to the port
364        wait_for_tcp(address).await;
365        (recv, address, _guard)
366    }
367
368    /// Sends the body to the address with the appropriate Firehose headers
369    ///
370    /// https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html
371    async fn send(
372        address: SocketAddr,
373        timestamp: DateTime<Utc>,
374        records: Vec<&[u8]>,
375        key: Option<&str>,
376        gzip: bool,
377        record_compression: Compression,
378    ) -> reqwest::Result<reqwest::Response> {
379        let request = models::FirehoseRequest {
380            access_key: key.map(|s| s.to_string()),
381            request_id: REQUEST_ID.to_string(),
382            timestamp,
383            records: records
384                .into_iter()
385                .map(|record| models::EncodedFirehoseRecord {
386                    data: encode_record(record, record_compression).unwrap(),
387                })
388                .collect(),
389        };
390
391        let mut builder = reqwest::Client::new()
392            .post(format!("http://{address}"))
393            .header("host", address.to_string())
394            .header(
395                "x-amzn-trace-id",
396                "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
397            )
398            .header("x-amz-firehose-protocol-version", "1.0")
399            .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
400            .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
401            .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
402            .header("content-type", "application/json");
403
404        if let Some(key) = key {
405            builder = builder.header("x-amz-firehose-access-key", key);
406        }
407
408        if gzip {
409            let mut gz = GzEncoder::new(
410                Cursor::new(serde_json::to_vec(&request).unwrap()),
411                flate2::Compression::fast(),
412            );
413            let mut buffer = Vec::new();
414            gz.read_to_end(&mut buffer).unwrap();
415            builder = builder.header("content-encoding", "gzip").body(buffer);
416        } else {
417            builder = builder.json(&request);
418        }
419
420        builder.send().await
421    }
422
423    async fn spawn_send(
424        address: SocketAddr,
425        timestamp: DateTime<Utc>,
426        records: Vec<&'static [u8]>,
427        key: Option<&'static str>,
428        gzip: bool,
429        record_compression: Compression,
430    ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
431        let handle = tokio::spawn(async move {
432            send(address, timestamp, records, key, gzip, record_compression).await
433        });
434        sleep(Duration::from_millis(500)).await;
435        handle
436    }
437
438    /// Encodes record data to mach AWS's representation: base64 encoded with an additional
439    /// compression
440    fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
441        let compressed = match compression {
442            Compression::Auto => panic!("cannot encode records as Auto"),
443            Compression::Gzip => {
444                let mut buffer = Vec::new();
445                if !record.is_empty() {
446                    let mut gz = GzEncoder::new(record, flate2::Compression::fast());
447                    gz.read_to_end(&mut buffer)?;
448                }
449                buffer
450            }
451            Compression::None => record.to_vec(),
452        };
453
454        Ok(BASE64_STANDARD.encode(compressed))
455    }
456
457    #[tokio::test]
458    async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
459        let gzipped_record = {
460            let mut buf = Vec::new();
461            let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
462            gz.read_to_end(&mut buf).unwrap();
463            buf
464        };
465
466        for (source_record_compression, record_compression, success, record, expected) in [
467            (
468                Compression::Auto,
469                Compression::Gzip,
470                true,
471                RECORD.as_bytes(),
472                RECORD.as_bytes().to_owned(),
473            ),
474            (
475                Compression::Auto,
476                Compression::None,
477                true,
478                RECORD.as_bytes(),
479                RECORD.as_bytes().to_owned(),
480            ),
481            (
482                Compression::None,
483                Compression::Gzip,
484                true,
485                RECORD.as_bytes(),
486                gzipped_record,
487            ),
488            (
489                Compression::None,
490                Compression::None,
491                true,
492                RECORD.as_bytes(),
493                RECORD.as_bytes().to_owned(),
494            ),
495            (
496                Compression::Gzip,
497                Compression::Gzip,
498                true,
499                RECORD.as_bytes(),
500                RECORD.as_bytes().to_owned(),
501            ),
502            (
503                Compression::Gzip,
504                Compression::None,
505                false,
506                RECORD.as_bytes(),
507                RECORD.as_bytes().to_owned(),
508            ),
509            (
510                Compression::Gzip,
511                Compression::Gzip,
512                true,
513                "".as_bytes(),
514                Vec::new(),
515            ),
516        ] {
517            let (rx, addr, _guard) =
518                source(None, None, false, source_record_compression, true, false).await;
519
520            let timestamp: DateTime<Utc> = Utc::now();
521
522            let res = spawn_send(
523                addr,
524                timestamp,
525                vec![record],
526                None,
527                false,
528                record_compression,
529            )
530            .await;
531
532            if success {
533                let events = collect_ready(rx).await;
534
535                let res = res.await.unwrap().unwrap();
536                assert_eq!(200, res.status().as_u16());
537
538                assert_event_data_eq!(
539                    events,
540                    vec![log_event! {
541                        "source_type" => Bytes::from("aws_kinesis_firehose"),
542                        "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
543                        "message" => Bytes::from(expected),
544                        "request_id" => REQUEST_ID,
545                        "source_arn" => SOURCE_ARN,
546                    },]
547                );
548
549                let response: models::FirehoseResponse = res.json().await.unwrap();
550                assert_eq!(response.request_id, REQUEST_ID);
551            } else {
552                let res = res.await.unwrap().unwrap();
553                assert_eq!(400, res.status().as_u16());
554            }
555        }
556    }
557
558    #[tokio::test]
559    async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
560        let gzipped_record = {
561            let mut buf = Vec::new();
562            let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
563            gz.read_to_end(&mut buf).unwrap();
564            buf
565        };
566
567        for (source_record_compression, record_compression, success, record, expected) in [
568            (
569                Compression::Auto,
570                Compression::Gzip,
571                true,
572                RECORD.as_bytes(),
573                RECORD.as_bytes().to_owned(),
574            ),
575            (
576                Compression::Auto,
577                Compression::None,
578                true,
579                RECORD.as_bytes(),
580                RECORD.as_bytes().to_owned(),
581            ),
582            (
583                Compression::None,
584                Compression::Gzip,
585                true,
586                RECORD.as_bytes(),
587                gzipped_record,
588            ),
589            (
590                Compression::None,
591                Compression::None,
592                true,
593                RECORD.as_bytes(),
594                RECORD.as_bytes().to_owned(),
595            ),
596            (
597                Compression::Gzip,
598                Compression::Gzip,
599                true,
600                RECORD.as_bytes(),
601                RECORD.as_bytes().to_owned(),
602            ),
603            (
604                Compression::Gzip,
605                Compression::None,
606                false,
607                RECORD.as_bytes(),
608                RECORD.as_bytes().to_owned(),
609            ),
610            (
611                Compression::Gzip,
612                Compression::Gzip,
613                true,
614                "".as_bytes(),
615                Vec::new(),
616            ),
617        ] {
618            let (rx, addr, _guard) =
619                source(None, None, false, source_record_compression, true, true).await;
620
621            let timestamp: DateTime<Utc> = Utc::now();
622
623            let res = spawn_send(
624                addr,
625                timestamp,
626                vec![record],
627                None,
628                false,
629                record_compression,
630            )
631            .await;
632
633            if success {
634                let events = collect_ready(rx).await;
635
636                let res = res.await.unwrap().unwrap();
637                assert_eq!(200, res.status().as_u16());
638
639                for event in events {
640                    let log = event.as_log();
641                    let meta = log.metadata();
642
643                    // event data, currently assumes default bytes deserializer
644                    assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
645
646                    // vector metadata
647                    assert_eq!(
648                        meta.value().get(path!("vector", "source_type")).unwrap(),
649                        &value!("aws_kinesis_firehose")
650                    );
651                    assert!(
652                        meta.value()
653                            .get(path!("vector", "ingest_timestamp"))
654                            .unwrap()
655                            .is_timestamp()
656                    );
657
658                    // source metadata
659                    assert_eq!(
660                        meta.value()
661                            .get(path!("aws_kinesis_firehose", "request_id"))
662                            .unwrap(),
663                        &value!(REQUEST_ID)
664                    );
665                    assert_eq!(
666                        meta.value()
667                            .get(path!("aws_kinesis_firehose", "source_arn"))
668                            .unwrap(),
669                        &value!(SOURCE_ARN)
670                    );
671                    assert_eq!(
672                        meta.value()
673                            .get(path!("aws_kinesis_firehose", "timestamp"))
674                            .unwrap(),
675                        &value!(timestamp.trunc_subsecs(3))
676                    );
677                }
678
679                let response: models::FirehoseResponse = res.json().await.unwrap();
680                assert_eq!(response.request_id, REQUEST_ID);
681            } else {
682                let res = res.await.unwrap().unwrap();
683                assert_eq!(400, res.status().as_u16());
684            }
685        }
686    }
687
688    #[tokio::test]
689    async fn aws_kinesis_firehose_forwards_events_gzip_request() {
690        assert_source_compliance(&SOURCE_TAGS, async move {
691            let (rx, addr, _guard) =
692                source(None, None, false, Default::default(), true, false).await;
693
694            let timestamp: DateTime<Utc> = Utc::now();
695
696            let res = spawn_send(
697                addr,
698                timestamp,
699                vec![RECORD.as_bytes()],
700                None,
701                true,
702                Compression::None,
703            )
704            .await;
705
706            let events = collect_ready(rx).await;
707            let res = res.await.unwrap().unwrap();
708            assert_eq!(200, res.status().as_u16());
709
710            assert_event_data_eq!(
711                events,
712                vec![log_event! {
713                    "source_type" => Bytes::from("aws_kinesis_firehose"),
714                    "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
715                    "message"=> RECORD,
716                    "request_id" => REQUEST_ID,
717                    "source_arn" => SOURCE_ARN,
718                },]
719            );
720
721            let response: models::FirehoseResponse = res.json().await.unwrap();
722            assert_eq!(response.request_id, REQUEST_ID);
723        })
724        .await;
725    }
726
727    #[tokio::test]
728    async fn aws_kinesis_firehose_rejects_bad_access_key() {
729        let (_rx, addr, _guard) = source(
730            Some("an access key".to_string().into()),
731            Some(vec!["an access key in list".to_string().into()]),
732            Default::default(),
733            Default::default(),
734            true,
735            false,
736        )
737        .await;
738
739        let res = send(
740            addr,
741            Utc::now(),
742            vec![],
743            Some("bad access key"),
744            false,
745            Compression::None,
746        )
747        .await
748        .unwrap();
749        assert_eq!(401, res.status().as_u16());
750
751        let response: models::FirehoseResponse = res.json().await.unwrap();
752        assert_eq!(response.request_id, REQUEST_ID);
753    }
754
755    #[tokio::test]
756    async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
757        let (_rx, addr, _guard) = source(
758            None,
759            Some(vec!["an access key in list".to_string().into()]),
760            Default::default(),
761            Default::default(),
762            true,
763            false,
764        )
765        .await;
766
767        let res = send(
768            addr,
769            Utc::now(),
770            vec![],
771            Some("bad access key"),
772            false,
773            Compression::None,
774        )
775        .await
776        .unwrap();
777        assert_eq!(401, res.status().as_u16());
778
779        let response: models::FirehoseResponse = res.json().await.unwrap();
780        assert_eq!(response.request_id, REQUEST_ID);
781    }
782
783    #[tokio::test]
784    async fn aws_kinesis_firehose_accepts_merged_access_keys() {
785        let valid_access_key = SensitiveString::from(String::from("an access key in list"));
786
787        let (_rx, addr, _guard) = source(
788            Some(valid_access_key.clone()),
789            Some(vec!["valid access key 2".to_string().into()]),
790            Default::default(),
791            Default::default(),
792            true,
793            false,
794        )
795        .await;
796
797        let res = send(
798            addr,
799            Utc::now(),
800            vec![],
801            Some(valid_access_key.clone().inner()),
802            false,
803            Compression::None,
804        )
805        .await
806        .unwrap();
807
808        assert_eq!(200, res.status().as_u16());
809
810        let response: models::FirehoseResponse = res.json().await.unwrap();
811        assert_eq!(response.request_id, REQUEST_ID);
812    }
813
814    #[tokio::test]
815    async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
816        let valid_access_key = "an access key in list".to_string();
817
818        let (_rx, addr, _guard) = source(
819            None,
820            Some(vec![
821                valid_access_key.clone().into(),
822                "valid access key 2".to_string().into(),
823            ]),
824            Default::default(),
825            Default::default(),
826            true,
827            false,
828        )
829        .await;
830
831        let res = send(
832            addr,
833            Utc::now(),
834            vec![],
835            Some(&valid_access_key),
836            false,
837            Compression::None,
838        )
839        .await
840        .unwrap();
841
842        assert_eq!(200, res.status().as_u16());
843
844        let response: models::FirehoseResponse = res.json().await.unwrap();
845        assert_eq!(response.request_id, REQUEST_ID);
846    }
847
848    #[tokio::test]
849    async fn handles_acknowledgement_failure() {
850        let expected = RECORD.as_bytes().to_owned();
851
852        let (rx, addr, _guard) = source(None, None, false, Compression::None, false, false).await;
853
854        let timestamp: DateTime<Utc> = Utc::now();
855
856        let res = spawn_send(
857            addr,
858            timestamp,
859            vec![RECORD.as_bytes()],
860            None,
861            false,
862            Compression::None,
863        )
864        .await;
865
866        let events = collect_ready(rx).await;
867
868        let res = res.await.unwrap().unwrap();
869        assert_eq!(406, res.status().as_u16());
870
871        assert_event_data_eq!(
872            events,
873            vec![log_event! {
874                "source_type" => Bytes::from("aws_kinesis_firehose"),
875                "timestamp" => timestamp.trunc_subsecs(3), // AWS sends timestamps as ms
876                "message"=> Bytes::from(expected),
877                "request_id" => REQUEST_ID,
878                "source_arn" => SOURCE_ARN,
879            },]
880        );
881
882        let response: models::FirehoseResponse = res.json().await.unwrap();
883        assert_eq!(response.request_id, REQUEST_ID);
884    }
885
886    #[tokio::test]
887    async fn event_access_key_passthrough_enabled() {
888        let (rx, address, _guard) = source(
889            None,
890            Some(vec!["an access key".to_string().into()]),
891            true,
892            Default::default(),
893            true,
894            true,
895        )
896        .await;
897
898        let timestamp: DateTime<Utc> = Utc::now();
899
900        spawn_send(
901            address,
902            timestamp,
903            vec![RECORD.as_bytes()],
904            Some("an access key"),
905            false,
906            Compression::None,
907        )
908        .await;
909
910        let events = collect_ready(rx).await;
911        let access_key = events[0]
912            .metadata()
913            .secrets()
914            .get("aws_kinesis_firehose_access_key")
915            .unwrap();
916        assert_eq!(access_key.to_string(), "an access key".to_string());
917    }
918
919    #[tokio::test]
920    async fn no_authorization_access_key_passthrough_enabled() {
921        let (rx, address, _guard) = source(None, None, true, Default::default(), true, true).await;
922
923        let timestamp: DateTime<Utc> = Utc::now();
924
925        spawn_send(
926            address,
927            timestamp,
928            vec![RECORD.as_bytes()],
929            None,
930            false,
931            Compression::None,
932        )
933        .await;
934
935        let events = collect_ready(rx).await;
936
937        assert!(
938            events[0]
939                .metadata()
940                .secrets()
941                .get("aws_kinesis_firehose_access_key")
942                .is_none()
943        );
944    }
945}