1use std::time::Duration;
2use std::{convert::Infallible, fmt, net::SocketAddr};
3
4use futures::FutureExt;
5use hyper::{service::make_service_fn, Server};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
10use vector_lib::config::{LegacyKey, LogNamespace};
11use vector_lib::configurable::configurable_component;
12use vector_lib::lookup::owned_value_path;
13use vector_lib::sensitive_string::SensitiveString;
14use vector_lib::tls::MaybeTlsIncomingStream;
15use vrl::value::Kind;
16
17use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
18use crate::{
19 codecs::DecodingConfig,
20 config::{
21 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22 SourceOutput,
23 },
24 http::build_http_trace_layer,
25 serde::{bool_or_struct, default_decoding, default_framing_message_based},
26 tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34#[configurable_component(source(
36 "aws_kinesis_firehose",
37 "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41 #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43 #[configurable(metadata(docs::examples = "localhost:443"))]
44 address: SocketAddr,
45
46 #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52 access_key: Option<SensitiveString>,
53
54 #[configurable(metadata(docs::examples = "access_keys_example()"))]
59 access_keys: Option<Vec<SensitiveString>>,
60
61 #[configurable(derived)]
66 store_access_key: bool,
67
68 #[serde(default)]
80 record_compression: Compression,
81
82 #[configurable(derived)]
83 tls: Option<TlsEnableableConfig>,
84
85 #[configurable(derived)]
86 #[configurable(metadata(docs::advanced))]
87 #[serde(default = "default_framing_message_based")]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[configurable(metadata(docs::advanced))]
92 #[serde(default = "default_decoding")]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110 ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118#[derivative(Default)]
119pub enum Compression {
120 #[derivative(Default)]
131 Auto,
132
133 None,
135
136 Gzip,
138}
139
140impl fmt::Display for Compression {
141 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
142 match self {
143 Compression::Auto => write!(fmt, "auto"),
144 Compression::None => write!(fmt, "none"),
145 Compression::Gzip => write!(fmt, "gzip"),
146 }
147 }
148}
149
150#[async_trait::async_trait]
151#[typetag::serde(name = "aws_kinesis_firehose")]
152impl SourceConfig for AwsKinesisFirehoseConfig {
153 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
154 let log_namespace = cx.log_namespace(self.log_namespace);
155 let decoder =
156 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
157 .build()?;
158
159 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
160
161 if self.access_key.is_some() {
162 warn!("DEPRECATION `access_key`, use `access_keys` instead.")
163 }
164
165 let access_keys = self
167 .access_keys
168 .iter()
169 .flatten()
170 .chain(self.access_key.iter());
171
172 let svc = filters::firehose(
173 access_keys.map(|key| key.inner().to_string()).collect(),
174 self.store_access_key,
175 self.record_compression,
176 decoder,
177 acknowledgements,
178 cx.out,
179 log_namespace,
180 );
181
182 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
183 let listener = tls.bind(&self.address).await?;
184
185 let keepalive_settings = self.keepalive.clone();
186 let shutdown = cx.shutdown;
187 Ok(Box::pin(async move {
188 let span = Span::current();
189 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190 let svc = ServiceBuilder::new()
191 .layer(build_http_trace_layer(span.clone()))
192 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
193 MaxConnectionAgeLayer::new(
194 Duration::from_secs(secs),
195 keepalive_settings.max_connection_age_jitter_factor,
196 conn.peer_addr(),
197 )
198 }))
199 .service(warp::service(svc.clone()));
200 futures_util::future::ok::<_, Infallible>(svc)
201 });
202
203 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
204 .serve(make_svc)
205 .with_graceful_shutdown(shutdown.map(|_| ()))
206 .await
207 .map_err(|err| {
208 error!("An error occurred: {:?}.", err);
209 })?;
210
211 Ok(())
212 }))
213 }
214
215 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
216 let schema_definition = self
217 .decoding
218 .schema_definition(global_log_namespace.merge(self.log_namespace))
219 .with_standard_vector_source_metadata()
220 .with_source_metadata(
221 Self::NAME,
222 Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
223 &owned_value_path!("request_id"),
224 Kind::bytes(),
225 None,
226 )
227 .with_source_metadata(
228 Self::NAME,
229 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
230 &owned_value_path!("source_arn"),
231 Kind::bytes(),
232 None,
233 );
234
235 vec![SourceOutput::new_maybe_logs(
236 self.decoding.output_type(),
237 schema_definition,
238 )]
239 }
240
241 fn resources(&self) -> Vec<Resource> {
242 vec![Resource::tcp(self.address)]
243 }
244
245 fn can_acknowledge(&self) -> bool {
246 true
247 }
248}
249
250impl GenerateConfig for AwsKinesisFirehoseConfig {
251 fn generate_config() -> toml::Value {
252 toml::Value::try_from(Self {
253 address: "0.0.0.0:443".parse().unwrap(),
254 access_key: None,
255 access_keys: None,
256 store_access_key: false,
257 tls: None,
258 record_compression: Default::default(),
259 framing: default_framing_message_based(),
260 decoding: default_decoding(),
261 acknowledgements: Default::default(),
262 log_namespace: None,
263 keepalive: Default::default(),
264 })
265 .unwrap()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 #![allow(clippy::print_stdout)] use std::{
274 io::{Cursor, Read},
275 net::SocketAddr,
276 };
277
278 use base64::prelude::{Engine as _, BASE64_STANDARD};
279 use bytes::Bytes;
280 use chrono::{DateTime, SubsecRound, Utc};
281 use flate2::read::GzEncoder;
282 use futures::Stream;
283 use similar_asserts::assert_eq;
284 use tokio::time::{sleep, Duration};
285 use vector_lib::assert_event_data_eq;
286 use vector_lib::lookup::path;
287 use vrl::value;
288
289 use super::*;
290 use crate::{
291 event::{Event, EventStatus},
292 log_event,
293 test_util::{
294 collect_ready,
295 components::{assert_source_compliance, SOURCE_TAGS},
296 next_addr, wait_for_tcp,
297 },
298 SourceSender,
299 };
300
301 const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
302 const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
303 const RECORD: &str = r#"
305 {
306 "messageType": "DATA_MESSAGE",
307 "owner": "071959437513",
308 "logGroup": "/jesse/test",
309 "logStream": "test",
310 "subscriptionFilters": ["Destination"],
311 "logEvents": [
312 {
313 "id": "35683658089614582423604394983260738922885519999578275840",
314 "timestamp": 1600110569039,
315 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
316 },
317 {
318 "id": "35683658089659183914001456229543810359430816722590236673",
319 "timestamp": 1600110569041,
320 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
321 }
322 ]
323 }
324 "#;
325
326 #[test]
327 fn generate_config() {
328 crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
329 }
330
331 async fn source(
332 access_key: Option<SensitiveString>,
333 access_keys: Option<Vec<SensitiveString>>,
334 store_access_key: bool,
335 record_compression: Compression,
336 delivered: bool,
337 log_namespace: bool,
338 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
339 use EventStatus::*;
340 let status = if delivered { Delivered } else { Rejected };
341 let (sender, recv) = SourceSender::new_test_finalize(status);
342 let address = next_addr();
343 let cx = SourceContext::new_test(sender, None);
344 tokio::spawn(async move {
345 AwsKinesisFirehoseConfig {
346 address,
347 tls: None,
348 access_key,
349 access_keys,
350 store_access_key,
351 record_compression,
352 framing: default_framing_message_based(),
353 decoding: default_decoding(),
354 acknowledgements: true.into(),
355 log_namespace: Some(log_namespace),
356 keepalive: Default::default(),
357 }
358 .build(cx)
359 .await
360 .unwrap()
361 .await
362 .unwrap()
363 });
364 wait_for_tcp(address).await;
365 (recv, address)
366 }
367
368 async fn send(
372 address: SocketAddr,
373 timestamp: DateTime<Utc>,
374 records: Vec<&[u8]>,
375 key: Option<&str>,
376 gzip: bool,
377 record_compression: Compression,
378 ) -> reqwest::Result<reqwest::Response> {
379 let request = models::FirehoseRequest {
380 access_key: key.map(|s| s.to_string()),
381 request_id: REQUEST_ID.to_string(),
382 timestamp,
383 records: records
384 .into_iter()
385 .map(|record| models::EncodedFirehoseRecord {
386 data: encode_record(record, record_compression).unwrap(),
387 })
388 .collect(),
389 };
390
391 let mut builder = reqwest::Client::new()
392 .post(format!("http://{address}"))
393 .header("host", address.to_string())
394 .header(
395 "x-amzn-trace-id",
396 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
397 )
398 .header("x-amz-firehose-protocol-version", "1.0")
399 .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
400 .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
401 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
402 .header("content-type", "application/json");
403
404 if let Some(key) = key {
405 builder = builder.header("x-amz-firehose-access-key", key);
406 }
407
408 if gzip {
409 let mut gz = GzEncoder::new(
410 Cursor::new(serde_json::to_vec(&request).unwrap()),
411 flate2::Compression::fast(),
412 );
413 let mut buffer = Vec::new();
414 gz.read_to_end(&mut buffer).unwrap();
415 builder = builder.header("content-encoding", "gzip").body(buffer);
416 } else {
417 builder = builder.json(&request);
418 }
419
420 builder.send().await
421 }
422
423 async fn spawn_send(
424 address: SocketAddr,
425 timestamp: DateTime<Utc>,
426 records: Vec<&'static [u8]>,
427 key: Option<&'static str>,
428 gzip: bool,
429 record_compression: Compression,
430 ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
431 let handle = tokio::spawn(async move {
432 send(address, timestamp, records, key, gzip, record_compression).await
433 });
434 sleep(Duration::from_millis(100)).await;
435 handle
436 }
437
438 fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
441 let compressed = match compression {
442 Compression::Auto => panic!("cannot encode records as Auto"),
443 Compression::Gzip => {
444 let mut buffer = Vec::new();
445 if !record.is_empty() {
446 let mut gz = GzEncoder::new(record, flate2::Compression::fast());
447 gz.read_to_end(&mut buffer)?;
448 }
449 buffer
450 }
451 Compression::None => record.to_vec(),
452 };
453
454 Ok(BASE64_STANDARD.encode(compressed))
455 }
456
457 #[tokio::test]
458 async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
459 let gzipped_record = {
460 let mut buf = Vec::new();
461 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
462 gz.read_to_end(&mut buf).unwrap();
463 buf
464 };
465
466 for (source_record_compression, record_compression, success, record, expected) in [
467 (
468 Compression::Auto,
469 Compression::Gzip,
470 true,
471 RECORD.as_bytes(),
472 RECORD.as_bytes().to_owned(),
473 ),
474 (
475 Compression::Auto,
476 Compression::None,
477 true,
478 RECORD.as_bytes(),
479 RECORD.as_bytes().to_owned(),
480 ),
481 (
482 Compression::None,
483 Compression::Gzip,
484 true,
485 RECORD.as_bytes(),
486 gzipped_record,
487 ),
488 (
489 Compression::None,
490 Compression::None,
491 true,
492 RECORD.as_bytes(),
493 RECORD.as_bytes().to_owned(),
494 ),
495 (
496 Compression::Gzip,
497 Compression::Gzip,
498 true,
499 RECORD.as_bytes(),
500 RECORD.as_bytes().to_owned(),
501 ),
502 (
503 Compression::Gzip,
504 Compression::None,
505 false,
506 RECORD.as_bytes(),
507 RECORD.as_bytes().to_owned(),
508 ),
509 (
510 Compression::Gzip,
511 Compression::Gzip,
512 true,
513 "".as_bytes(),
514 Vec::new(),
515 ),
516 ] {
517 let (rx, addr) =
518 source(None, None, false, source_record_compression, true, false).await;
519
520 let timestamp: DateTime<Utc> = Utc::now();
521
522 let res = spawn_send(
523 addr,
524 timestamp,
525 vec![record],
526 None,
527 false,
528 record_compression,
529 )
530 .await;
531
532 if success {
533 let events = collect_ready(rx).await;
534
535 let res = res.await.unwrap().unwrap();
536 assert_eq!(200, res.status().as_u16());
537
538 assert_event_data_eq!(
539 events,
540 vec![log_event! {
541 "source_type" => Bytes::from("aws_kinesis_firehose"),
542 "timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
544 "request_id" => REQUEST_ID,
545 "source_arn" => SOURCE_ARN,
546 },]
547 );
548
549 let response: models::FirehoseResponse = res.json().await.unwrap();
550 assert_eq!(response.request_id, REQUEST_ID);
551 } else {
552 let res = res.await.unwrap().unwrap();
553 assert_eq!(400, res.status().as_u16());
554 }
555 }
556 }
557
558 #[tokio::test]
559 async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
560 let gzipped_record = {
561 let mut buf = Vec::new();
562 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
563 gz.read_to_end(&mut buf).unwrap();
564 buf
565 };
566
567 for (source_record_compression, record_compression, success, record, expected) in [
568 (
569 Compression::Auto,
570 Compression::Gzip,
571 true,
572 RECORD.as_bytes(),
573 RECORD.as_bytes().to_owned(),
574 ),
575 (
576 Compression::Auto,
577 Compression::None,
578 true,
579 RECORD.as_bytes(),
580 RECORD.as_bytes().to_owned(),
581 ),
582 (
583 Compression::None,
584 Compression::Gzip,
585 true,
586 RECORD.as_bytes(),
587 gzipped_record,
588 ),
589 (
590 Compression::None,
591 Compression::None,
592 true,
593 RECORD.as_bytes(),
594 RECORD.as_bytes().to_owned(),
595 ),
596 (
597 Compression::Gzip,
598 Compression::Gzip,
599 true,
600 RECORD.as_bytes(),
601 RECORD.as_bytes().to_owned(),
602 ),
603 (
604 Compression::Gzip,
605 Compression::None,
606 false,
607 RECORD.as_bytes(),
608 RECORD.as_bytes().to_owned(),
609 ),
610 (
611 Compression::Gzip,
612 Compression::Gzip,
613 true,
614 "".as_bytes(),
615 Vec::new(),
616 ),
617 ] {
618 let (rx, addr) = source(None, None, false, source_record_compression, true, true).await;
619
620 let timestamp: DateTime<Utc> = Utc::now();
621
622 let res = spawn_send(
623 addr,
624 timestamp,
625 vec![record],
626 None,
627 false,
628 record_compression,
629 )
630 .await;
631
632 if success {
633 let events = collect_ready(rx).await;
634
635 let res = res.await.unwrap().unwrap();
636 assert_eq!(200, res.status().as_u16());
637
638 for event in events {
639 let log = event.as_log();
640 let meta = log.metadata();
641
642 assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
644
645 assert_eq!(
647 meta.value().get(path!("vector", "source_type")).unwrap(),
648 &value!("aws_kinesis_firehose")
649 );
650 assert!(meta
651 .value()
652 .get(path!("vector", "ingest_timestamp"))
653 .unwrap()
654 .is_timestamp());
655
656 assert_eq!(
658 meta.value()
659 .get(path!("aws_kinesis_firehose", "request_id"))
660 .unwrap(),
661 &value!(REQUEST_ID)
662 );
663 assert_eq!(
664 meta.value()
665 .get(path!("aws_kinesis_firehose", "source_arn"))
666 .unwrap(),
667 &value!(SOURCE_ARN)
668 );
669 assert_eq!(
670 meta.value()
671 .get(path!("aws_kinesis_firehose", "timestamp"))
672 .unwrap(),
673 &value!(timestamp.trunc_subsecs(3))
674 );
675 }
676
677 let response: models::FirehoseResponse = res.json().await.unwrap();
678 assert_eq!(response.request_id, REQUEST_ID);
679 } else {
680 let res = res.await.unwrap().unwrap();
681 assert_eq!(400, res.status().as_u16());
682 }
683 }
684 }
685
686 #[tokio::test]
687 async fn aws_kinesis_firehose_forwards_events_gzip_request() {
688 assert_source_compliance(&SOURCE_TAGS, async move {
689 let (rx, addr) = source(None, None, false, Default::default(), true, false).await;
690
691 let timestamp: DateTime<Utc> = Utc::now();
692
693 let res = spawn_send(
694 addr,
695 timestamp,
696 vec![RECORD.as_bytes()],
697 None,
698 true,
699 Compression::None,
700 )
701 .await;
702
703 let events = collect_ready(rx).await;
704 let res = res.await.unwrap().unwrap();
705 assert_eq!(200, res.status().as_u16());
706
707 assert_event_data_eq!(
708 events,
709 vec![log_event! {
710 "source_type" => Bytes::from("aws_kinesis_firehose"),
711 "timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
713 "request_id" => REQUEST_ID,
714 "source_arn" => SOURCE_ARN,
715 },]
716 );
717
718 let response: models::FirehoseResponse = res.json().await.unwrap();
719 assert_eq!(response.request_id, REQUEST_ID);
720 })
721 .await;
722 }
723
724 #[tokio::test]
725 async fn aws_kinesis_firehose_rejects_bad_access_key() {
726 let (_rx, addr) = source(
727 Some("an access key".to_string().into()),
728 Some(vec!["an access key in list".to_string().into()]),
729 Default::default(),
730 Default::default(),
731 true,
732 false,
733 )
734 .await;
735
736 let res = send(
737 addr,
738 Utc::now(),
739 vec![],
740 Some("bad access key"),
741 false,
742 Compression::None,
743 )
744 .await
745 .unwrap();
746 assert_eq!(401, res.status().as_u16());
747
748 let response: models::FirehoseResponse = res.json().await.unwrap();
749 assert_eq!(response.request_id, REQUEST_ID);
750 }
751
752 #[tokio::test]
753 async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
754 let (_rx, addr) = source(
755 None,
756 Some(vec!["an access key in list".to_string().into()]),
757 Default::default(),
758 Default::default(),
759 true,
760 false,
761 )
762 .await;
763
764 let res = send(
765 addr,
766 Utc::now(),
767 vec![],
768 Some("bad access key"),
769 false,
770 Compression::None,
771 )
772 .await
773 .unwrap();
774 assert_eq!(401, res.status().as_u16());
775
776 let response: models::FirehoseResponse = res.json().await.unwrap();
777 assert_eq!(response.request_id, REQUEST_ID);
778 }
779
780 #[tokio::test]
781 async fn aws_kinesis_firehose_accepts_merged_access_keys() {
782 let valid_access_key = SensitiveString::from(String::from("an access key in list"));
783
784 let (_rx, addr) = source(
785 Some(valid_access_key.clone()),
786 Some(vec!["valid access key 2".to_string().into()]),
787 Default::default(),
788 Default::default(),
789 true,
790 false,
791 )
792 .await;
793
794 let res = send(
795 addr,
796 Utc::now(),
797 vec![],
798 Some(valid_access_key.clone().inner()),
799 false,
800 Compression::None,
801 )
802 .await
803 .unwrap();
804
805 assert_eq!(200, res.status().as_u16());
806
807 let response: models::FirehoseResponse = res.json().await.unwrap();
808 assert_eq!(response.request_id, REQUEST_ID);
809 }
810
811 #[tokio::test]
812 async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
813 let valid_access_key = "an access key in list".to_string();
814
815 let (_rx, addr) = source(
816 None,
817 Some(vec![
818 valid_access_key.clone().into(),
819 "valid access key 2".to_string().into(),
820 ]),
821 Default::default(),
822 Default::default(),
823 true,
824 false,
825 )
826 .await;
827
828 let res = send(
829 addr,
830 Utc::now(),
831 vec![],
832 Some(&valid_access_key),
833 false,
834 Compression::None,
835 )
836 .await
837 .unwrap();
838
839 assert_eq!(200, res.status().as_u16());
840
841 let response: models::FirehoseResponse = res.json().await.unwrap();
842 assert_eq!(response.request_id, REQUEST_ID);
843 }
844
845 #[tokio::test]
846 async fn handles_acknowledgement_failure() {
847 let expected = RECORD.as_bytes().to_owned();
848
849 let (rx, addr) = source(None, None, false, Compression::None, false, false).await;
850
851 let timestamp: DateTime<Utc> = Utc::now();
852
853 let res = spawn_send(
854 addr,
855 timestamp,
856 vec![RECORD.as_bytes()],
857 None,
858 false,
859 Compression::None,
860 )
861 .await;
862
863 let events = collect_ready(rx).await;
864
865 let res = res.await.unwrap().unwrap();
866 assert_eq!(406, res.status().as_u16());
867
868 assert_event_data_eq!(
869 events,
870 vec![log_event! {
871 "source_type" => Bytes::from("aws_kinesis_firehose"),
872 "timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
874 "request_id" => REQUEST_ID,
875 "source_arn" => SOURCE_ARN,
876 },]
877 );
878
879 let response: models::FirehoseResponse = res.json().await.unwrap();
880 assert_eq!(response.request_id, REQUEST_ID);
881 }
882
883 #[tokio::test]
884 async fn event_access_key_passthrough_enabled() {
885 let (rx, address) = source(
886 None,
887 Some(vec!["an access key".to_string().into()]),
888 true,
889 Default::default(),
890 true,
891 true,
892 )
893 .await;
894
895 let timestamp: DateTime<Utc> = Utc::now();
896
897 spawn_send(
898 address,
899 timestamp,
900 vec![RECORD.as_bytes()],
901 Some("an access key"),
902 false,
903 Compression::None,
904 )
905 .await;
906
907 let events = collect_ready(rx).await;
908 let access_key = events[0]
909 .metadata()
910 .secrets()
911 .get("aws_kinesis_firehose_access_key")
912 .unwrap();
913 assert_eq!(access_key.to_string(), "an access key".to_string());
914 }
915
916 #[tokio::test]
917 async fn no_authorization_access_key_passthrough_enabled() {
918 let (rx, address) = source(None, None, true, Default::default(), true, true).await;
919
920 let timestamp: DateTime<Utc> = Utc::now();
921
922 spawn_send(
923 address,
924 timestamp,
925 vec![RECORD.as_bytes()],
926 None,
927 false,
928 Compression::None,
929 )
930 .await;
931
932 let events = collect_ready(rx).await;
933
934 assert!(events[0]
935 .metadata()
936 .secrets()
937 .get("aws_kinesis_firehose_access_key")
938 .is_none());
939 }
940}