1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9 codecs::decoding::{DeserializerConfig, FramingConfig},
10 config::{LegacyKey, LogNamespace},
11 configurable::configurable_component,
12 lookup::owned_value_path,
13 sensitive_string::SensitiveString,
14 tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19 codecs::DecodingConfig,
20 config::{
21 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22 SourceOutput,
23 },
24 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25 serde::{bool_or_struct, default_decoding, default_framing_message_based},
26 tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34#[configurable_component(source(
36 "aws_kinesis_firehose",
37 "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41 #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43 #[configurable(metadata(docs::examples = "localhost:443"))]
44 address: SocketAddr,
45
46 #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52 access_key: Option<SensitiveString>,
53
54 #[configurable(metadata(docs::examples = "access_keys_example()"))]
59 access_keys: Option<Vec<SensitiveString>>,
60
61 #[configurable(derived)]
66 store_access_key: bool,
67
68 #[serde(default)]
80 record_compression: Compression,
81
82 #[configurable(derived)]
83 tls: Option<TlsEnableableConfig>,
84
85 #[configurable(derived)]
86 #[configurable(metadata(docs::advanced))]
87 #[serde(default = "default_framing_message_based")]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[configurable(metadata(docs::advanced))]
92 #[serde(default = "default_decoding")]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110 ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118pub enum Compression {
119 #[default]
130 Auto,
131
132 None,
134
135 Gzip,
137}
138
139impl fmt::Display for Compression {
140 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
141 match self {
142 Compression::Auto => write!(fmt, "auto"),
143 Compression::None => write!(fmt, "none"),
144 Compression::Gzip => write!(fmt, "gzip"),
145 }
146 }
147}
148
149#[async_trait::async_trait]
150#[typetag::serde(name = "aws_kinesis_firehose")]
151impl SourceConfig for AwsKinesisFirehoseConfig {
152 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
153 let log_namespace = cx.log_namespace(self.log_namespace);
154 let decoder =
155 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
156 .build()?;
157
158 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
159
160 if self.access_key.is_some() {
161 warn!("DEPRECATION `access_key`, use `access_keys` instead.")
162 }
163
164 let access_keys = self
166 .access_keys
167 .iter()
168 .flatten()
169 .chain(self.access_key.iter());
170
171 let svc = filters::firehose(
172 access_keys.map(|key| key.inner().to_string()).collect(),
173 self.store_access_key,
174 self.record_compression,
175 decoder,
176 acknowledgements,
177 cx.out,
178 log_namespace,
179 );
180
181 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
182 let listener = tls.bind(&self.address).await?;
183
184 let keepalive_settings = self.keepalive.clone();
185 let shutdown = cx.shutdown;
186 Ok(Box::pin(async move {
187 let span = Span::current();
188 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
189 let svc = ServiceBuilder::new()
190 .layer(build_http_trace_layer(span.clone()))
191 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
192 MaxConnectionAgeLayer::new(
193 Duration::from_secs(secs),
194 keepalive_settings.max_connection_age_jitter_factor,
195 conn.peer_addr(),
196 )
197 }))
198 .service(warp::service(svc.clone()));
199 futures_util::future::ok::<_, Infallible>(svc)
200 });
201
202 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
203 .serve(make_svc)
204 .with_graceful_shutdown(shutdown.map(|_| ()))
205 .await
206 .map_err(|err| {
207 error!("An error occurred: {:?}.", err);
208 })?;
209
210 Ok(())
211 }))
212 }
213
214 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
215 let schema_definition = self
216 .decoding
217 .schema_definition(global_log_namespace.merge(self.log_namespace))
218 .with_standard_vector_source_metadata()
219 .with_source_metadata(
220 Self::NAME,
221 Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
222 &owned_value_path!("request_id"),
223 Kind::bytes(),
224 None,
225 )
226 .with_source_metadata(
227 Self::NAME,
228 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
229 &owned_value_path!("source_arn"),
230 Kind::bytes(),
231 None,
232 );
233
234 vec![SourceOutput::new_maybe_logs(
235 self.decoding.output_type(),
236 schema_definition,
237 )]
238 }
239
240 fn resources(&self) -> Vec<Resource> {
241 vec![Resource::tcp(self.address)]
242 }
243
244 fn can_acknowledge(&self) -> bool {
245 true
246 }
247}
248
249impl GenerateConfig for AwsKinesisFirehoseConfig {
250 fn generate_config() -> toml::Value {
251 toml::Value::try_from(Self {
252 address: "0.0.0.0:443".parse().unwrap(),
253 access_key: None,
254 access_keys: None,
255 store_access_key: false,
256 tls: None,
257 record_compression: Default::default(),
258 framing: default_framing_message_based(),
259 decoding: default_decoding(),
260 acknowledgements: Default::default(),
261 log_namespace: None,
262 keepalive: Default::default(),
263 })
264 .unwrap()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 #![allow(clippy::print_stdout)] use std::{
273 io::{Cursor, Read},
274 net::SocketAddr,
275 };
276
277 use base64::prelude::{BASE64_STANDARD, Engine as _};
278 use bytes::Bytes;
279 use chrono::{DateTime, SubsecRound, Utc};
280 use flate2::read::GzEncoder;
281 use futures::Stream;
282 use similar_asserts::assert_eq;
283 use tokio::time::{Duration, sleep};
284 use vector_lib::{assert_event_data_eq, lookup::path};
285 use vrl::value;
286
287 use super::*;
288 use crate::{
289 SourceSender,
290 event::{Event, EventStatus},
291 log_event,
292 test_util::{
293 addr::{PortGuard, next_addr},
294 collect_ready,
295 components::{SOURCE_TAGS, assert_source_compliance},
296 wait_for_tcp,
297 },
298 };
299
300 const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
301 const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
302 const RECORD: &str = r#"
304 {
305 "messageType": "DATA_MESSAGE",
306 "owner": "071959437513",
307 "logGroup": "/jesse/test",
308 "logStream": "test",
309 "subscriptionFilters": ["Destination"],
310 "logEvents": [
311 {
312 "id": "35683658089614582423604394983260738922885519999578275840",
313 "timestamp": 1600110569039,
314 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
315 },
316 {
317 "id": "35683658089659183914001456229543810359430816722590236673",
318 "timestamp": 1600110569041,
319 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
320 }
321 ]
322 }
323 "#;
324
325 #[test]
326 fn generate_config() {
327 crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
328 }
329
330 async fn source(
331 access_key: Option<SensitiveString>,
332 access_keys: Option<Vec<SensitiveString>>,
333 store_access_key: bool,
334 record_compression: Compression,
335 delivered: bool,
336 log_namespace: bool,
337 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
338 use EventStatus::*;
339 let status = if delivered { Delivered } else { Rejected };
340 let (sender, recv) = SourceSender::new_test_finalize(status);
341 let (_guard, address) = next_addr();
342 let cx = SourceContext::new_test(sender, None);
343 tokio::spawn(async move {
344 AwsKinesisFirehoseConfig {
345 address,
346 tls: None,
347 access_key,
348 access_keys,
349 store_access_key,
350 record_compression,
351 framing: default_framing_message_based(),
352 decoding: default_decoding(),
353 acknowledgements: true.into(),
354 log_namespace: Some(log_namespace),
355 keepalive: Default::default(),
356 }
357 .build(cx)
358 .await
359 .unwrap()
360 .await
361 .unwrap()
362 });
363 wait_for_tcp(address).await;
365 (recv, address, _guard)
366 }
367
368 async fn send(
372 address: SocketAddr,
373 timestamp: DateTime<Utc>,
374 records: Vec<&[u8]>,
375 key: Option<&str>,
376 gzip: bool,
377 record_compression: Compression,
378 ) -> reqwest::Result<reqwest::Response> {
379 let request = models::FirehoseRequest {
380 access_key: key.map(|s| s.to_string()),
381 request_id: REQUEST_ID.to_string(),
382 timestamp,
383 records: records
384 .into_iter()
385 .map(|record| models::EncodedFirehoseRecord {
386 data: encode_record(record, record_compression).unwrap(),
387 })
388 .collect(),
389 };
390
391 let mut builder = reqwest::Client::new()
392 .post(format!("http://{address}"))
393 .header("host", address.to_string())
394 .header(
395 "x-amzn-trace-id",
396 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
397 )
398 .header("x-amz-firehose-protocol-version", "1.0")
399 .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
400 .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
401 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
402 .header("content-type", "application/json");
403
404 if let Some(key) = key {
405 builder = builder.header("x-amz-firehose-access-key", key);
406 }
407
408 if gzip {
409 let mut gz = GzEncoder::new(
410 Cursor::new(serde_json::to_vec(&request).unwrap()),
411 flate2::Compression::fast(),
412 );
413 let mut buffer = Vec::new();
414 gz.read_to_end(&mut buffer).unwrap();
415 builder = builder.header("content-encoding", "gzip").body(buffer);
416 } else {
417 builder = builder.json(&request);
418 }
419
420 builder.send().await
421 }
422
423 async fn spawn_send(
424 address: SocketAddr,
425 timestamp: DateTime<Utc>,
426 records: Vec<&'static [u8]>,
427 key: Option<&'static str>,
428 gzip: bool,
429 record_compression: Compression,
430 ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
431 let handle = tokio::spawn(async move {
432 send(address, timestamp, records, key, gzip, record_compression).await
433 });
434 sleep(Duration::from_millis(500)).await;
435 handle
436 }
437
438 fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
441 let compressed = match compression {
442 Compression::Auto => panic!("cannot encode records as Auto"),
443 Compression::Gzip => {
444 let mut buffer = Vec::new();
445 if !record.is_empty() {
446 let mut gz = GzEncoder::new(record, flate2::Compression::fast());
447 gz.read_to_end(&mut buffer)?;
448 }
449 buffer
450 }
451 Compression::None => record.to_vec(),
452 };
453
454 Ok(BASE64_STANDARD.encode(compressed))
455 }
456
457 #[tokio::test]
458 async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
459 let gzipped_record = {
460 let mut buf = Vec::new();
461 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
462 gz.read_to_end(&mut buf).unwrap();
463 buf
464 };
465
466 for (source_record_compression, record_compression, success, record, expected) in [
467 (
468 Compression::Auto,
469 Compression::Gzip,
470 true,
471 RECORD.as_bytes(),
472 RECORD.as_bytes().to_owned(),
473 ),
474 (
475 Compression::Auto,
476 Compression::None,
477 true,
478 RECORD.as_bytes(),
479 RECORD.as_bytes().to_owned(),
480 ),
481 (
482 Compression::None,
483 Compression::Gzip,
484 true,
485 RECORD.as_bytes(),
486 gzipped_record,
487 ),
488 (
489 Compression::None,
490 Compression::None,
491 true,
492 RECORD.as_bytes(),
493 RECORD.as_bytes().to_owned(),
494 ),
495 (
496 Compression::Gzip,
497 Compression::Gzip,
498 true,
499 RECORD.as_bytes(),
500 RECORD.as_bytes().to_owned(),
501 ),
502 (
503 Compression::Gzip,
504 Compression::None,
505 false,
506 RECORD.as_bytes(),
507 RECORD.as_bytes().to_owned(),
508 ),
509 (
510 Compression::Gzip,
511 Compression::Gzip,
512 true,
513 "".as_bytes(),
514 Vec::new(),
515 ),
516 ] {
517 let (rx, addr, _guard) =
518 source(None, None, false, source_record_compression, true, false).await;
519
520 let timestamp: DateTime<Utc> = Utc::now();
521
522 let res = spawn_send(
523 addr,
524 timestamp,
525 vec![record],
526 None,
527 false,
528 record_compression,
529 )
530 .await;
531
532 if success {
533 let events = collect_ready(rx).await;
534
535 let res = res.await.unwrap().unwrap();
536 assert_eq!(200, res.status().as_u16());
537
538 assert_event_data_eq!(
539 events,
540 vec![log_event! {
541 "source_type" => Bytes::from("aws_kinesis_firehose"),
542 "timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
544 "request_id" => REQUEST_ID,
545 "source_arn" => SOURCE_ARN,
546 },]
547 );
548
549 let response: models::FirehoseResponse = res.json().await.unwrap();
550 assert_eq!(response.request_id, REQUEST_ID);
551 } else {
552 let res = res.await.unwrap().unwrap();
553 assert_eq!(400, res.status().as_u16());
554 }
555 }
556 }
557
558 #[tokio::test]
559 async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
560 let gzipped_record = {
561 let mut buf = Vec::new();
562 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
563 gz.read_to_end(&mut buf).unwrap();
564 buf
565 };
566
567 for (source_record_compression, record_compression, success, record, expected) in [
568 (
569 Compression::Auto,
570 Compression::Gzip,
571 true,
572 RECORD.as_bytes(),
573 RECORD.as_bytes().to_owned(),
574 ),
575 (
576 Compression::Auto,
577 Compression::None,
578 true,
579 RECORD.as_bytes(),
580 RECORD.as_bytes().to_owned(),
581 ),
582 (
583 Compression::None,
584 Compression::Gzip,
585 true,
586 RECORD.as_bytes(),
587 gzipped_record,
588 ),
589 (
590 Compression::None,
591 Compression::None,
592 true,
593 RECORD.as_bytes(),
594 RECORD.as_bytes().to_owned(),
595 ),
596 (
597 Compression::Gzip,
598 Compression::Gzip,
599 true,
600 RECORD.as_bytes(),
601 RECORD.as_bytes().to_owned(),
602 ),
603 (
604 Compression::Gzip,
605 Compression::None,
606 false,
607 RECORD.as_bytes(),
608 RECORD.as_bytes().to_owned(),
609 ),
610 (
611 Compression::Gzip,
612 Compression::Gzip,
613 true,
614 "".as_bytes(),
615 Vec::new(),
616 ),
617 ] {
618 let (rx, addr, _guard) =
619 source(None, None, false, source_record_compression, true, true).await;
620
621 let timestamp: DateTime<Utc> = Utc::now();
622
623 let res = spawn_send(
624 addr,
625 timestamp,
626 vec![record],
627 None,
628 false,
629 record_compression,
630 )
631 .await;
632
633 if success {
634 let events = collect_ready(rx).await;
635
636 let res = res.await.unwrap().unwrap();
637 assert_eq!(200, res.status().as_u16());
638
639 for event in events {
640 let log = event.as_log();
641 let meta = log.metadata();
642
643 assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
645
646 assert_eq!(
648 meta.value().get(path!("vector", "source_type")).unwrap(),
649 &value!("aws_kinesis_firehose")
650 );
651 assert!(
652 meta.value()
653 .get(path!("vector", "ingest_timestamp"))
654 .unwrap()
655 .is_timestamp()
656 );
657
658 assert_eq!(
660 meta.value()
661 .get(path!("aws_kinesis_firehose", "request_id"))
662 .unwrap(),
663 &value!(REQUEST_ID)
664 );
665 assert_eq!(
666 meta.value()
667 .get(path!("aws_kinesis_firehose", "source_arn"))
668 .unwrap(),
669 &value!(SOURCE_ARN)
670 );
671 assert_eq!(
672 meta.value()
673 .get(path!("aws_kinesis_firehose", "timestamp"))
674 .unwrap(),
675 &value!(timestamp.trunc_subsecs(3))
676 );
677 }
678
679 let response: models::FirehoseResponse = res.json().await.unwrap();
680 assert_eq!(response.request_id, REQUEST_ID);
681 } else {
682 let res = res.await.unwrap().unwrap();
683 assert_eq!(400, res.status().as_u16());
684 }
685 }
686 }
687
688 #[tokio::test]
689 async fn aws_kinesis_firehose_forwards_events_gzip_request() {
690 assert_source_compliance(&SOURCE_TAGS, async move {
691 let (rx, addr, _guard) =
692 source(None, None, false, Default::default(), true, false).await;
693
694 let timestamp: DateTime<Utc> = Utc::now();
695
696 let res = spawn_send(
697 addr,
698 timestamp,
699 vec![RECORD.as_bytes()],
700 None,
701 true,
702 Compression::None,
703 )
704 .await;
705
706 let events = collect_ready(rx).await;
707 let res = res.await.unwrap().unwrap();
708 assert_eq!(200, res.status().as_u16());
709
710 assert_event_data_eq!(
711 events,
712 vec![log_event! {
713 "source_type" => Bytes::from("aws_kinesis_firehose"),
714 "timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
716 "request_id" => REQUEST_ID,
717 "source_arn" => SOURCE_ARN,
718 },]
719 );
720
721 let response: models::FirehoseResponse = res.json().await.unwrap();
722 assert_eq!(response.request_id, REQUEST_ID);
723 })
724 .await;
725 }
726
727 #[tokio::test]
728 async fn aws_kinesis_firehose_rejects_bad_access_key() {
729 let (_rx, addr, _guard) = source(
730 Some("an access key".to_string().into()),
731 Some(vec!["an access key in list".to_string().into()]),
732 Default::default(),
733 Default::default(),
734 true,
735 false,
736 )
737 .await;
738
739 let res = send(
740 addr,
741 Utc::now(),
742 vec![],
743 Some("bad access key"),
744 false,
745 Compression::None,
746 )
747 .await
748 .unwrap();
749 assert_eq!(401, res.status().as_u16());
750
751 let response: models::FirehoseResponse = res.json().await.unwrap();
752 assert_eq!(response.request_id, REQUEST_ID);
753 }
754
755 #[tokio::test]
756 async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
757 let (_rx, addr, _guard) = source(
758 None,
759 Some(vec!["an access key in list".to_string().into()]),
760 Default::default(),
761 Default::default(),
762 true,
763 false,
764 )
765 .await;
766
767 let res = send(
768 addr,
769 Utc::now(),
770 vec![],
771 Some("bad access key"),
772 false,
773 Compression::None,
774 )
775 .await
776 .unwrap();
777 assert_eq!(401, res.status().as_u16());
778
779 let response: models::FirehoseResponse = res.json().await.unwrap();
780 assert_eq!(response.request_id, REQUEST_ID);
781 }
782
783 #[tokio::test]
784 async fn aws_kinesis_firehose_accepts_merged_access_keys() {
785 let valid_access_key = SensitiveString::from(String::from("an access key in list"));
786
787 let (_rx, addr, _guard) = source(
788 Some(valid_access_key.clone()),
789 Some(vec!["valid access key 2".to_string().into()]),
790 Default::default(),
791 Default::default(),
792 true,
793 false,
794 )
795 .await;
796
797 let res = send(
798 addr,
799 Utc::now(),
800 vec![],
801 Some(valid_access_key.clone().inner()),
802 false,
803 Compression::None,
804 )
805 .await
806 .unwrap();
807
808 assert_eq!(200, res.status().as_u16());
809
810 let response: models::FirehoseResponse = res.json().await.unwrap();
811 assert_eq!(response.request_id, REQUEST_ID);
812 }
813
814 #[tokio::test]
815 async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
816 let valid_access_key = "an access key in list".to_string();
817
818 let (_rx, addr, _guard) = source(
819 None,
820 Some(vec![
821 valid_access_key.clone().into(),
822 "valid access key 2".to_string().into(),
823 ]),
824 Default::default(),
825 Default::default(),
826 true,
827 false,
828 )
829 .await;
830
831 let res = send(
832 addr,
833 Utc::now(),
834 vec![],
835 Some(&valid_access_key),
836 false,
837 Compression::None,
838 )
839 .await
840 .unwrap();
841
842 assert_eq!(200, res.status().as_u16());
843
844 let response: models::FirehoseResponse = res.json().await.unwrap();
845 assert_eq!(response.request_id, REQUEST_ID);
846 }
847
848 #[tokio::test]
849 async fn handles_acknowledgement_failure() {
850 let expected = RECORD.as_bytes().to_owned();
851
852 let (rx, addr, _guard) = source(None, None, false, Compression::None, false, false).await;
853
854 let timestamp: DateTime<Utc> = Utc::now();
855
856 let res = spawn_send(
857 addr,
858 timestamp,
859 vec![RECORD.as_bytes()],
860 None,
861 false,
862 Compression::None,
863 )
864 .await;
865
866 let events = collect_ready(rx).await;
867
868 let res = res.await.unwrap().unwrap();
869 assert_eq!(406, res.status().as_u16());
870
871 assert_event_data_eq!(
872 events,
873 vec![log_event! {
874 "source_type" => Bytes::from("aws_kinesis_firehose"),
875 "timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
877 "request_id" => REQUEST_ID,
878 "source_arn" => SOURCE_ARN,
879 },]
880 );
881
882 let response: models::FirehoseResponse = res.json().await.unwrap();
883 assert_eq!(response.request_id, REQUEST_ID);
884 }
885
886 #[tokio::test]
887 async fn event_access_key_passthrough_enabled() {
888 let (rx, address, _guard) = source(
889 None,
890 Some(vec!["an access key".to_string().into()]),
891 true,
892 Default::default(),
893 true,
894 true,
895 )
896 .await;
897
898 let timestamp: DateTime<Utc> = Utc::now();
899
900 spawn_send(
901 address,
902 timestamp,
903 vec![RECORD.as_bytes()],
904 Some("an access key"),
905 false,
906 Compression::None,
907 )
908 .await;
909
910 let events = collect_ready(rx).await;
911 let access_key = events[0]
912 .metadata()
913 .secrets()
914 .get("aws_kinesis_firehose_access_key")
915 .unwrap();
916 assert_eq!(access_key.to_string(), "an access key".to_string());
917 }
918
919 #[tokio::test]
920 async fn no_authorization_access_key_passthrough_enabled() {
921 let (rx, address, _guard) = source(None, None, true, Default::default(), true, true).await;
922
923 let timestamp: DateTime<Utc> = Utc::now();
924
925 spawn_send(
926 address,
927 timestamp,
928 vec![RECORD.as_bytes()],
929 None,
930 false,
931 Compression::None,
932 )
933 .await;
934
935 let events = collect_ready(rx).await;
936
937 assert!(
938 events[0]
939 .metadata()
940 .secrets()
941 .get("aws_kinesis_firehose_access_key")
942 .is_none()
943 );
944 }
945}