1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9 codecs::decoding::{DeserializerConfig, FramingConfig},
10 config::{LegacyKey, LogNamespace},
11 configurable::configurable_component,
12 lookup::owned_value_path,
13 sensitive_string::SensitiveString,
14 tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19 codecs::DecodingConfig,
20 config::{
21 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22 SourceOutput,
23 },
24 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25 serde::{bool_or_struct, default_decoding, default_framing_message_based},
26 tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34#[configurable_component(source(
36 "aws_kinesis_firehose",
37 "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41 #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43 #[configurable(metadata(docs::examples = "localhost:443"))]
44 address: SocketAddr,
45
46 #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52 access_key: Option<SensitiveString>,
53
54 #[configurable(metadata(docs::examples = "access_keys_example()"))]
59 access_keys: Option<Vec<SensitiveString>>,
60
61 #[configurable(derived)]
66 store_access_key: bool,
67
68 #[serde(default)]
80 record_compression: Compression,
81
82 #[configurable(derived)]
83 tls: Option<TlsEnableableConfig>,
84
85 #[configurable(derived)]
86 #[configurable(metadata(docs::advanced))]
87 #[serde(default = "default_framing_message_based")]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[configurable(metadata(docs::advanced))]
92 #[serde(default = "default_decoding")]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110 ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118#[derivative(Default)]
119pub enum Compression {
120 #[derivative(Default)]
131 Auto,
132
133 None,
135
136 Gzip,
138}
139
140impl fmt::Display for Compression {
141 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
142 match self {
143 Compression::Auto => write!(fmt, "auto"),
144 Compression::None => write!(fmt, "none"),
145 Compression::Gzip => write!(fmt, "gzip"),
146 }
147 }
148}
149
150#[async_trait::async_trait]
151#[typetag::serde(name = "aws_kinesis_firehose")]
152impl SourceConfig for AwsKinesisFirehoseConfig {
153 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
154 let log_namespace = cx.log_namespace(self.log_namespace);
155 let decoder =
156 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
157 .build()?;
158
159 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
160
161 if self.access_key.is_some() {
162 warn!("DEPRECATION `access_key`, use `access_keys` instead.")
163 }
164
165 let access_keys = self
167 .access_keys
168 .iter()
169 .flatten()
170 .chain(self.access_key.iter());
171
172 let svc = filters::firehose(
173 access_keys.map(|key| key.inner().to_string()).collect(),
174 self.store_access_key,
175 self.record_compression,
176 decoder,
177 acknowledgements,
178 cx.out,
179 log_namespace,
180 );
181
182 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
183 let listener = tls.bind(&self.address).await?;
184
185 let keepalive_settings = self.keepalive.clone();
186 let shutdown = cx.shutdown;
187 Ok(Box::pin(async move {
188 let span = Span::current();
189 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190 let svc = ServiceBuilder::new()
191 .layer(build_http_trace_layer(span.clone()))
192 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
193 MaxConnectionAgeLayer::new(
194 Duration::from_secs(secs),
195 keepalive_settings.max_connection_age_jitter_factor,
196 conn.peer_addr(),
197 )
198 }))
199 .service(warp::service(svc.clone()));
200 futures_util::future::ok::<_, Infallible>(svc)
201 });
202
203 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
204 .serve(make_svc)
205 .with_graceful_shutdown(shutdown.map(|_| ()))
206 .await
207 .map_err(|err| {
208 error!("An error occurred: {:?}.", err);
209 })?;
210
211 Ok(())
212 }))
213 }
214
215 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
216 let schema_definition = self
217 .decoding
218 .schema_definition(global_log_namespace.merge(self.log_namespace))
219 .with_standard_vector_source_metadata()
220 .with_source_metadata(
221 Self::NAME,
222 Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
223 &owned_value_path!("request_id"),
224 Kind::bytes(),
225 None,
226 )
227 .with_source_metadata(
228 Self::NAME,
229 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
230 &owned_value_path!("source_arn"),
231 Kind::bytes(),
232 None,
233 );
234
235 vec![SourceOutput::new_maybe_logs(
236 self.decoding.output_type(),
237 schema_definition,
238 )]
239 }
240
241 fn resources(&self) -> Vec<Resource> {
242 vec![Resource::tcp(self.address)]
243 }
244
245 fn can_acknowledge(&self) -> bool {
246 true
247 }
248}
249
250impl GenerateConfig for AwsKinesisFirehoseConfig {
251 fn generate_config() -> toml::Value {
252 toml::Value::try_from(Self {
253 address: "0.0.0.0:443".parse().unwrap(),
254 access_key: None,
255 access_keys: None,
256 store_access_key: false,
257 tls: None,
258 record_compression: Default::default(),
259 framing: default_framing_message_based(),
260 decoding: default_decoding(),
261 acknowledgements: Default::default(),
262 log_namespace: None,
263 keepalive: Default::default(),
264 })
265 .unwrap()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 #![allow(clippy::print_stdout)] use std::{
274 io::{Cursor, Read},
275 net::SocketAddr,
276 };
277
278 use base64::prelude::{BASE64_STANDARD, Engine as _};
279 use bytes::Bytes;
280 use chrono::{DateTime, SubsecRound, Utc};
281 use flate2::read::GzEncoder;
282 use futures::Stream;
283 use similar_asserts::assert_eq;
284 use tokio::time::{Duration, sleep};
285 use vector_lib::{assert_event_data_eq, lookup::path};
286 use vrl::value;
287
288 use super::*;
289 use crate::{
290 SourceSender,
291 event::{Event, EventStatus},
292 log_event,
293 test_util::{
294 collect_ready,
295 components::{SOURCE_TAGS, assert_source_compliance},
296 next_addr, wait_for_tcp,
297 },
298 };
299
300 const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
301 const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
302 const RECORD: &str = r#"
304 {
305 "messageType": "DATA_MESSAGE",
306 "owner": "071959437513",
307 "logGroup": "/jesse/test",
308 "logStream": "test",
309 "subscriptionFilters": ["Destination"],
310 "logEvents": [
311 {
312 "id": "35683658089614582423604394983260738922885519999578275840",
313 "timestamp": 1600110569039,
314 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
315 },
316 {
317 "id": "35683658089659183914001456229543810359430816722590236673",
318 "timestamp": 1600110569041,
319 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
320 }
321 ]
322 }
323 "#;
324
325 #[test]
326 fn generate_config() {
327 crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
328 }
329
330 async fn source(
331 access_key: Option<SensitiveString>,
332 access_keys: Option<Vec<SensitiveString>>,
333 store_access_key: bool,
334 record_compression: Compression,
335 delivered: bool,
336 log_namespace: bool,
337 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
338 use EventStatus::*;
339 let status = if delivered { Delivered } else { Rejected };
340 let (sender, recv) = SourceSender::new_test_finalize(status);
341 let address = next_addr();
342 let cx = SourceContext::new_test(sender, None);
343 tokio::spawn(async move {
344 AwsKinesisFirehoseConfig {
345 address,
346 tls: None,
347 access_key,
348 access_keys,
349 store_access_key,
350 record_compression,
351 framing: default_framing_message_based(),
352 decoding: default_decoding(),
353 acknowledgements: true.into(),
354 log_namespace: Some(log_namespace),
355 keepalive: Default::default(),
356 }
357 .build(cx)
358 .await
359 .unwrap()
360 .await
361 .unwrap()
362 });
363 wait_for_tcp(address).await;
364 (recv, address)
365 }
366
367 async fn send(
371 address: SocketAddr,
372 timestamp: DateTime<Utc>,
373 records: Vec<&[u8]>,
374 key: Option<&str>,
375 gzip: bool,
376 record_compression: Compression,
377 ) -> reqwest::Result<reqwest::Response> {
378 let request = models::FirehoseRequest {
379 access_key: key.map(|s| s.to_string()),
380 request_id: REQUEST_ID.to_string(),
381 timestamp,
382 records: records
383 .into_iter()
384 .map(|record| models::EncodedFirehoseRecord {
385 data: encode_record(record, record_compression).unwrap(),
386 })
387 .collect(),
388 };
389
390 let mut builder = reqwest::Client::new()
391 .post(format!("http://{address}"))
392 .header("host", address.to_string())
393 .header(
394 "x-amzn-trace-id",
395 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
396 )
397 .header("x-amz-firehose-protocol-version", "1.0")
398 .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
399 .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
400 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
401 .header("content-type", "application/json");
402
403 if let Some(key) = key {
404 builder = builder.header("x-amz-firehose-access-key", key);
405 }
406
407 if gzip {
408 let mut gz = GzEncoder::new(
409 Cursor::new(serde_json::to_vec(&request).unwrap()),
410 flate2::Compression::fast(),
411 );
412 let mut buffer = Vec::new();
413 gz.read_to_end(&mut buffer).unwrap();
414 builder = builder.header("content-encoding", "gzip").body(buffer);
415 } else {
416 builder = builder.json(&request);
417 }
418
419 builder.send().await
420 }
421
422 async fn spawn_send(
423 address: SocketAddr,
424 timestamp: DateTime<Utc>,
425 records: Vec<&'static [u8]>,
426 key: Option<&'static str>,
427 gzip: bool,
428 record_compression: Compression,
429 ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
430 let handle = tokio::spawn(async move {
431 send(address, timestamp, records, key, gzip, record_compression).await
432 });
433 sleep(Duration::from_millis(100)).await;
434 handle
435 }
436
437 fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
440 let compressed = match compression {
441 Compression::Auto => panic!("cannot encode records as Auto"),
442 Compression::Gzip => {
443 let mut buffer = Vec::new();
444 if !record.is_empty() {
445 let mut gz = GzEncoder::new(record, flate2::Compression::fast());
446 gz.read_to_end(&mut buffer)?;
447 }
448 buffer
449 }
450 Compression::None => record.to_vec(),
451 };
452
453 Ok(BASE64_STANDARD.encode(compressed))
454 }
455
456 #[tokio::test]
457 async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
458 let gzipped_record = {
459 let mut buf = Vec::new();
460 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
461 gz.read_to_end(&mut buf).unwrap();
462 buf
463 };
464
465 for (source_record_compression, record_compression, success, record, expected) in [
466 (
467 Compression::Auto,
468 Compression::Gzip,
469 true,
470 RECORD.as_bytes(),
471 RECORD.as_bytes().to_owned(),
472 ),
473 (
474 Compression::Auto,
475 Compression::None,
476 true,
477 RECORD.as_bytes(),
478 RECORD.as_bytes().to_owned(),
479 ),
480 (
481 Compression::None,
482 Compression::Gzip,
483 true,
484 RECORD.as_bytes(),
485 gzipped_record,
486 ),
487 (
488 Compression::None,
489 Compression::None,
490 true,
491 RECORD.as_bytes(),
492 RECORD.as_bytes().to_owned(),
493 ),
494 (
495 Compression::Gzip,
496 Compression::Gzip,
497 true,
498 RECORD.as_bytes(),
499 RECORD.as_bytes().to_owned(),
500 ),
501 (
502 Compression::Gzip,
503 Compression::None,
504 false,
505 RECORD.as_bytes(),
506 RECORD.as_bytes().to_owned(),
507 ),
508 (
509 Compression::Gzip,
510 Compression::Gzip,
511 true,
512 "".as_bytes(),
513 Vec::new(),
514 ),
515 ] {
516 let (rx, addr) =
517 source(None, None, false, source_record_compression, true, false).await;
518
519 let timestamp: DateTime<Utc> = Utc::now();
520
521 let res = spawn_send(
522 addr,
523 timestamp,
524 vec![record],
525 None,
526 false,
527 record_compression,
528 )
529 .await;
530
531 if success {
532 let events = collect_ready(rx).await;
533
534 let res = res.await.unwrap().unwrap();
535 assert_eq!(200, res.status().as_u16());
536
537 assert_event_data_eq!(
538 events,
539 vec![log_event! {
540 "source_type" => Bytes::from("aws_kinesis_firehose"),
541 "timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
543 "request_id" => REQUEST_ID,
544 "source_arn" => SOURCE_ARN,
545 },]
546 );
547
548 let response: models::FirehoseResponse = res.json().await.unwrap();
549 assert_eq!(response.request_id, REQUEST_ID);
550 } else {
551 let res = res.await.unwrap().unwrap();
552 assert_eq!(400, res.status().as_u16());
553 }
554 }
555 }
556
557 #[tokio::test]
558 async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
559 let gzipped_record = {
560 let mut buf = Vec::new();
561 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
562 gz.read_to_end(&mut buf).unwrap();
563 buf
564 };
565
566 for (source_record_compression, record_compression, success, record, expected) in [
567 (
568 Compression::Auto,
569 Compression::Gzip,
570 true,
571 RECORD.as_bytes(),
572 RECORD.as_bytes().to_owned(),
573 ),
574 (
575 Compression::Auto,
576 Compression::None,
577 true,
578 RECORD.as_bytes(),
579 RECORD.as_bytes().to_owned(),
580 ),
581 (
582 Compression::None,
583 Compression::Gzip,
584 true,
585 RECORD.as_bytes(),
586 gzipped_record,
587 ),
588 (
589 Compression::None,
590 Compression::None,
591 true,
592 RECORD.as_bytes(),
593 RECORD.as_bytes().to_owned(),
594 ),
595 (
596 Compression::Gzip,
597 Compression::Gzip,
598 true,
599 RECORD.as_bytes(),
600 RECORD.as_bytes().to_owned(),
601 ),
602 (
603 Compression::Gzip,
604 Compression::None,
605 false,
606 RECORD.as_bytes(),
607 RECORD.as_bytes().to_owned(),
608 ),
609 (
610 Compression::Gzip,
611 Compression::Gzip,
612 true,
613 "".as_bytes(),
614 Vec::new(),
615 ),
616 ] {
617 let (rx, addr) = source(None, None, false, source_record_compression, true, true).await;
618
619 let timestamp: DateTime<Utc> = Utc::now();
620
621 let res = spawn_send(
622 addr,
623 timestamp,
624 vec![record],
625 None,
626 false,
627 record_compression,
628 )
629 .await;
630
631 if success {
632 let events = collect_ready(rx).await;
633
634 let res = res.await.unwrap().unwrap();
635 assert_eq!(200, res.status().as_u16());
636
637 for event in events {
638 let log = event.as_log();
639 let meta = log.metadata();
640
641 assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
643
644 assert_eq!(
646 meta.value().get(path!("vector", "source_type")).unwrap(),
647 &value!("aws_kinesis_firehose")
648 );
649 assert!(
650 meta.value()
651 .get(path!("vector", "ingest_timestamp"))
652 .unwrap()
653 .is_timestamp()
654 );
655
656 assert_eq!(
658 meta.value()
659 .get(path!("aws_kinesis_firehose", "request_id"))
660 .unwrap(),
661 &value!(REQUEST_ID)
662 );
663 assert_eq!(
664 meta.value()
665 .get(path!("aws_kinesis_firehose", "source_arn"))
666 .unwrap(),
667 &value!(SOURCE_ARN)
668 );
669 assert_eq!(
670 meta.value()
671 .get(path!("aws_kinesis_firehose", "timestamp"))
672 .unwrap(),
673 &value!(timestamp.trunc_subsecs(3))
674 );
675 }
676
677 let response: models::FirehoseResponse = res.json().await.unwrap();
678 assert_eq!(response.request_id, REQUEST_ID);
679 } else {
680 let res = res.await.unwrap().unwrap();
681 assert_eq!(400, res.status().as_u16());
682 }
683 }
684 }
685
686 #[tokio::test]
687 async fn aws_kinesis_firehose_forwards_events_gzip_request() {
688 assert_source_compliance(&SOURCE_TAGS, async move {
689 let (rx, addr) = source(None, None, false, Default::default(), true, false).await;
690
691 let timestamp: DateTime<Utc> = Utc::now();
692
693 let res = spawn_send(
694 addr,
695 timestamp,
696 vec![RECORD.as_bytes()],
697 None,
698 true,
699 Compression::None,
700 )
701 .await;
702
703 let events = collect_ready(rx).await;
704 let res = res.await.unwrap().unwrap();
705 assert_eq!(200, res.status().as_u16());
706
707 assert_event_data_eq!(
708 events,
709 vec![log_event! {
710 "source_type" => Bytes::from("aws_kinesis_firehose"),
711 "timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
713 "request_id" => REQUEST_ID,
714 "source_arn" => SOURCE_ARN,
715 },]
716 );
717
718 let response: models::FirehoseResponse = res.json().await.unwrap();
719 assert_eq!(response.request_id, REQUEST_ID);
720 })
721 .await;
722 }
723
724 #[tokio::test]
725 async fn aws_kinesis_firehose_rejects_bad_access_key() {
726 let (_rx, addr) = source(
727 Some("an access key".to_string().into()),
728 Some(vec!["an access key in list".to_string().into()]),
729 Default::default(),
730 Default::default(),
731 true,
732 false,
733 )
734 .await;
735
736 let res = send(
737 addr,
738 Utc::now(),
739 vec![],
740 Some("bad access key"),
741 false,
742 Compression::None,
743 )
744 .await
745 .unwrap();
746 assert_eq!(401, res.status().as_u16());
747
748 let response: models::FirehoseResponse = res.json().await.unwrap();
749 assert_eq!(response.request_id, REQUEST_ID);
750 }
751
752 #[tokio::test]
753 async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
754 let (_rx, addr) = source(
755 None,
756 Some(vec!["an access key in list".to_string().into()]),
757 Default::default(),
758 Default::default(),
759 true,
760 false,
761 )
762 .await;
763
764 let res = send(
765 addr,
766 Utc::now(),
767 vec![],
768 Some("bad access key"),
769 false,
770 Compression::None,
771 )
772 .await
773 .unwrap();
774 assert_eq!(401, res.status().as_u16());
775
776 let response: models::FirehoseResponse = res.json().await.unwrap();
777 assert_eq!(response.request_id, REQUEST_ID);
778 }
779
780 #[tokio::test]
781 async fn aws_kinesis_firehose_accepts_merged_access_keys() {
782 let valid_access_key = SensitiveString::from(String::from("an access key in list"));
783
784 let (_rx, addr) = source(
785 Some(valid_access_key.clone()),
786 Some(vec!["valid access key 2".to_string().into()]),
787 Default::default(),
788 Default::default(),
789 true,
790 false,
791 )
792 .await;
793
794 let res = send(
795 addr,
796 Utc::now(),
797 vec![],
798 Some(valid_access_key.clone().inner()),
799 false,
800 Compression::None,
801 )
802 .await
803 .unwrap();
804
805 assert_eq!(200, res.status().as_u16());
806
807 let response: models::FirehoseResponse = res.json().await.unwrap();
808 assert_eq!(response.request_id, REQUEST_ID);
809 }
810
811 #[tokio::test]
812 async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
813 let valid_access_key = "an access key in list".to_string();
814
815 let (_rx, addr) = source(
816 None,
817 Some(vec![
818 valid_access_key.clone().into(),
819 "valid access key 2".to_string().into(),
820 ]),
821 Default::default(),
822 Default::default(),
823 true,
824 false,
825 )
826 .await;
827
828 let res = send(
829 addr,
830 Utc::now(),
831 vec![],
832 Some(&valid_access_key),
833 false,
834 Compression::None,
835 )
836 .await
837 .unwrap();
838
839 assert_eq!(200, res.status().as_u16());
840
841 let response: models::FirehoseResponse = res.json().await.unwrap();
842 assert_eq!(response.request_id, REQUEST_ID);
843 }
844
845 #[tokio::test]
846 async fn handles_acknowledgement_failure() {
847 let expected = RECORD.as_bytes().to_owned();
848
849 let (rx, addr) = source(None, None, false, Compression::None, false, false).await;
850
851 let timestamp: DateTime<Utc> = Utc::now();
852
853 let res = spawn_send(
854 addr,
855 timestamp,
856 vec![RECORD.as_bytes()],
857 None,
858 false,
859 Compression::None,
860 )
861 .await;
862
863 let events = collect_ready(rx).await;
864
865 let res = res.await.unwrap().unwrap();
866 assert_eq!(406, res.status().as_u16());
867
868 assert_event_data_eq!(
869 events,
870 vec![log_event! {
871 "source_type" => Bytes::from("aws_kinesis_firehose"),
872 "timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
874 "request_id" => REQUEST_ID,
875 "source_arn" => SOURCE_ARN,
876 },]
877 );
878
879 let response: models::FirehoseResponse = res.json().await.unwrap();
880 assert_eq!(response.request_id, REQUEST_ID);
881 }
882
883 #[tokio::test]
884 async fn event_access_key_passthrough_enabled() {
885 let (rx, address) = source(
886 None,
887 Some(vec!["an access key".to_string().into()]),
888 true,
889 Default::default(),
890 true,
891 true,
892 )
893 .await;
894
895 let timestamp: DateTime<Utc> = Utc::now();
896
897 spawn_send(
898 address,
899 timestamp,
900 vec![RECORD.as_bytes()],
901 Some("an access key"),
902 false,
903 Compression::None,
904 )
905 .await;
906
907 let events = collect_ready(rx).await;
908 let access_key = events[0]
909 .metadata()
910 .secrets()
911 .get("aws_kinesis_firehose_access_key")
912 .unwrap();
913 assert_eq!(access_key.to_string(), "an access key".to_string());
914 }
915
916 #[tokio::test]
917 async fn no_authorization_access_key_passthrough_enabled() {
918 let (rx, address) = source(None, None, true, Default::default(), true, true).await;
919
920 let timestamp: DateTime<Utc> = Utc::now();
921
922 spawn_send(
923 address,
924 timestamp,
925 vec![RECORD.as_bytes()],
926 None,
927 false,
928 Compression::None,
929 )
930 .await;
931
932 let events = collect_ready(rx).await;
933
934 assert!(
935 events[0]
936 .metadata()
937 .secrets()
938 .get("aws_kinesis_firehose_access_key")
939 .is_none()
940 );
941 }
942}