1use std::{convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use futures::FutureExt;
4use hyper::{Server, service::make_service_fn};
5use tokio::net::TcpStream;
6use tower::ServiceBuilder;
7use tracing::Span;
8use vector_lib::{
9 codecs::decoding::{DeserializerConfig, FramingConfig},
10 config::{LegacyKey, LogNamespace},
11 configurable::configurable_component,
12 lookup::owned_value_path,
13 sensitive_string::SensitiveString,
14 tls::MaybeTlsIncomingStream,
15};
16use vrl::value::Kind;
17
18use crate::{
19 codecs::DecodingConfig,
20 config::{
21 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
22 SourceOutput,
23 },
24 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
25 serde::{bool_or_struct, default_decoding, default_framing_message_based},
26 tls::{MaybeTlsSettings, TlsEnableableConfig},
27};
28
29pub mod errors;
30mod filters;
31mod handlers;
32mod models;
33
34#[configurable_component(source(
36 "aws_kinesis_firehose",
37 "Collect logs from AWS Kinesis Firehose."
38))]
39#[derive(Clone, Debug)]
40pub struct AwsKinesisFirehoseConfig {
41 #[configurable(metadata(docs::examples = "0.0.0.0:443"))]
43 #[configurable(metadata(docs::examples = "localhost:443"))]
44 address: SocketAddr,
45
46 #[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
51 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
52 access_key: Option<SensitiveString>,
53
54 #[configurable(metadata(docs::examples = "access_keys_example()"))]
59 access_keys: Option<Vec<SensitiveString>>,
60
61 #[configurable(derived)]
66 store_access_key: bool,
67
68 #[serde(default)]
80 record_compression: Compression,
81
82 #[configurable(derived)]
83 tls: Option<TlsEnableableConfig>,
84
85 #[configurable(derived)]
86 #[configurable(metadata(docs::advanced))]
87 #[serde(default = "default_framing_message_based")]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[configurable(metadata(docs::advanced))]
92 #[serde(default = "default_decoding")]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 keepalive: KeepaliveConfig,
107}
108
109const fn access_keys_example() -> [&'static str; 2] {
110 ["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
111}
112
113#[configurable_component]
115#[configurable(metadata(docs::advanced))]
116#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
117#[serde(rename_all = "lowercase")]
118#[derivative(Default)]
119pub enum Compression {
120 #[derivative(Default)]
131 Auto,
132
133 None,
135
136 Gzip,
138}
139
140impl fmt::Display for Compression {
141 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
142 match self {
143 Compression::Auto => write!(fmt, "auto"),
144 Compression::None => write!(fmt, "none"),
145 Compression::Gzip => write!(fmt, "gzip"),
146 }
147 }
148}
149
150#[async_trait::async_trait]
151#[typetag::serde(name = "aws_kinesis_firehose")]
152impl SourceConfig for AwsKinesisFirehoseConfig {
153 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
154 let log_namespace = cx.log_namespace(self.log_namespace);
155 let decoder =
156 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
157 .build()?;
158
159 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
160
161 if self.access_key.is_some() {
162 warn!("DEPRECATION `access_key`, use `access_keys` instead.")
163 }
164
165 let access_keys = self
167 .access_keys
168 .iter()
169 .flatten()
170 .chain(self.access_key.iter());
171
172 let svc = filters::firehose(
173 access_keys.map(|key| key.inner().to_string()).collect(),
174 self.store_access_key,
175 self.record_compression,
176 decoder,
177 acknowledgements,
178 cx.out,
179 log_namespace,
180 );
181
182 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
183 let listener = tls.bind(&self.address).await?;
184
185 let keepalive_settings = self.keepalive.clone();
186 let shutdown = cx.shutdown;
187 Ok(Box::pin(async move {
188 let span = Span::current();
189 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190 let svc = ServiceBuilder::new()
191 .layer(build_http_trace_layer(span.clone()))
192 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
193 MaxConnectionAgeLayer::new(
194 Duration::from_secs(secs),
195 keepalive_settings.max_connection_age_jitter_factor,
196 conn.peer_addr(),
197 )
198 }))
199 .service(warp::service(svc.clone()));
200 futures_util::future::ok::<_, Infallible>(svc)
201 });
202
203 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
204 .serve(make_svc)
205 .with_graceful_shutdown(shutdown.map(|_| ()))
206 .await
207 .map_err(|err| {
208 error!("An error occurred: {:?}.", err);
209 })?;
210
211 Ok(())
212 }))
213 }
214
215 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
216 let schema_definition = self
217 .decoding
218 .schema_definition(global_log_namespace.merge(self.log_namespace))
219 .with_standard_vector_source_metadata()
220 .with_source_metadata(
221 Self::NAME,
222 Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
223 &owned_value_path!("request_id"),
224 Kind::bytes(),
225 None,
226 )
227 .with_source_metadata(
228 Self::NAME,
229 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
230 &owned_value_path!("source_arn"),
231 Kind::bytes(),
232 None,
233 );
234
235 vec![SourceOutput::new_maybe_logs(
236 self.decoding.output_type(),
237 schema_definition,
238 )]
239 }
240
241 fn resources(&self) -> Vec<Resource> {
242 vec![Resource::tcp(self.address)]
243 }
244
245 fn can_acknowledge(&self) -> bool {
246 true
247 }
248}
249
250impl GenerateConfig for AwsKinesisFirehoseConfig {
251 fn generate_config() -> toml::Value {
252 toml::Value::try_from(Self {
253 address: "0.0.0.0:443".parse().unwrap(),
254 access_key: None,
255 access_keys: None,
256 store_access_key: false,
257 tls: None,
258 record_compression: Default::default(),
259 framing: default_framing_message_based(),
260 decoding: default_decoding(),
261 acknowledgements: Default::default(),
262 log_namespace: None,
263 keepalive: Default::default(),
264 })
265 .unwrap()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 #![allow(clippy::print_stdout)] use std::{
274 io::{Cursor, Read},
275 net::SocketAddr,
276 };
277
278 use base64::prelude::{BASE64_STANDARD, Engine as _};
279 use bytes::Bytes;
280 use chrono::{DateTime, SubsecRound, Utc};
281 use flate2::read::GzEncoder;
282 use futures::Stream;
283 use similar_asserts::assert_eq;
284 use tokio::time::{Duration, sleep};
285 use vector_lib::{assert_event_data_eq, lookup::path};
286 use vrl::value;
287
288 use super::*;
289 use crate::{
290 SourceSender,
291 event::{Event, EventStatus},
292 log_event,
293 test_util::{
294 addr::{PortGuard, next_addr},
295 collect_ready,
296 components::{SOURCE_TAGS, assert_source_compliance},
297 wait_for_tcp,
298 },
299 };
300
301 const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
302 const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
303 const RECORD: &str = r#"
305 {
306 "messageType": "DATA_MESSAGE",
307 "owner": "071959437513",
308 "logGroup": "/jesse/test",
309 "logStream": "test",
310 "subscriptionFilters": ["Destination"],
311 "logEvents": [
312 {
313 "id": "35683658089614582423604394983260738922885519999578275840",
314 "timestamp": 1600110569039,
315 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
316 },
317 {
318 "id": "35683658089659183914001456229543810359430816722590236673",
319 "timestamp": 1600110569041,
320 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
321 }
322 ]
323 }
324 "#;
325
326 #[test]
327 fn generate_config() {
328 crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
329 }
330
331 async fn source(
332 access_key: Option<SensitiveString>,
333 access_keys: Option<Vec<SensitiveString>>,
334 store_access_key: bool,
335 record_compression: Compression,
336 delivered: bool,
337 log_namespace: bool,
338 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
339 use EventStatus::*;
340 let status = if delivered { Delivered } else { Rejected };
341 let (sender, recv) = SourceSender::new_test_finalize(status);
342 let (_guard, address) = next_addr();
343 let cx = SourceContext::new_test(sender, None);
344 tokio::spawn(async move {
345 AwsKinesisFirehoseConfig {
346 address,
347 tls: None,
348 access_key,
349 access_keys,
350 store_access_key,
351 record_compression,
352 framing: default_framing_message_based(),
353 decoding: default_decoding(),
354 acknowledgements: true.into(),
355 log_namespace: Some(log_namespace),
356 keepalive: Default::default(),
357 }
358 .build(cx)
359 .await
360 .unwrap()
361 .await
362 .unwrap()
363 });
364 wait_for_tcp(address).await;
366 (recv, address, _guard)
367 }
368
369 async fn send(
373 address: SocketAddr,
374 timestamp: DateTime<Utc>,
375 records: Vec<&[u8]>,
376 key: Option<&str>,
377 gzip: bool,
378 record_compression: Compression,
379 ) -> reqwest::Result<reqwest::Response> {
380 let request = models::FirehoseRequest {
381 access_key: key.map(|s| s.to_string()),
382 request_id: REQUEST_ID.to_string(),
383 timestamp,
384 records: records
385 .into_iter()
386 .map(|record| models::EncodedFirehoseRecord {
387 data: encode_record(record, record_compression).unwrap(),
388 })
389 .collect(),
390 };
391
392 let mut builder = reqwest::Client::new()
393 .post(format!("http://{address}"))
394 .header("host", address.to_string())
395 .header(
396 "x-amzn-trace-id",
397 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
398 )
399 .header("x-amz-firehose-protocol-version", "1.0")
400 .header("x-amz-firehose-request-id", REQUEST_ID.to_string())
401 .header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
402 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
403 .header("content-type", "application/json");
404
405 if let Some(key) = key {
406 builder = builder.header("x-amz-firehose-access-key", key);
407 }
408
409 if gzip {
410 let mut gz = GzEncoder::new(
411 Cursor::new(serde_json::to_vec(&request).unwrap()),
412 flate2::Compression::fast(),
413 );
414 let mut buffer = Vec::new();
415 gz.read_to_end(&mut buffer).unwrap();
416 builder = builder.header("content-encoding", "gzip").body(buffer);
417 } else {
418 builder = builder.json(&request);
419 }
420
421 builder.send().await
422 }
423
424 async fn spawn_send(
425 address: SocketAddr,
426 timestamp: DateTime<Utc>,
427 records: Vec<&'static [u8]>,
428 key: Option<&'static str>,
429 gzip: bool,
430 record_compression: Compression,
431 ) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
432 let handle = tokio::spawn(async move {
433 send(address, timestamp, records, key, gzip, record_compression).await
434 });
435 sleep(Duration::from_millis(500)).await;
436 handle
437 }
438
439 fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
442 let compressed = match compression {
443 Compression::Auto => panic!("cannot encode records as Auto"),
444 Compression::Gzip => {
445 let mut buffer = Vec::new();
446 if !record.is_empty() {
447 let mut gz = GzEncoder::new(record, flate2::Compression::fast());
448 gz.read_to_end(&mut buffer)?;
449 }
450 buffer
451 }
452 Compression::None => record.to_vec(),
453 };
454
455 Ok(BASE64_STANDARD.encode(compressed))
456 }
457
458 #[tokio::test]
459 async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
460 let gzipped_record = {
461 let mut buf = Vec::new();
462 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
463 gz.read_to_end(&mut buf).unwrap();
464 buf
465 };
466
467 for (source_record_compression, record_compression, success, record, expected) in [
468 (
469 Compression::Auto,
470 Compression::Gzip,
471 true,
472 RECORD.as_bytes(),
473 RECORD.as_bytes().to_owned(),
474 ),
475 (
476 Compression::Auto,
477 Compression::None,
478 true,
479 RECORD.as_bytes(),
480 RECORD.as_bytes().to_owned(),
481 ),
482 (
483 Compression::None,
484 Compression::Gzip,
485 true,
486 RECORD.as_bytes(),
487 gzipped_record,
488 ),
489 (
490 Compression::None,
491 Compression::None,
492 true,
493 RECORD.as_bytes(),
494 RECORD.as_bytes().to_owned(),
495 ),
496 (
497 Compression::Gzip,
498 Compression::Gzip,
499 true,
500 RECORD.as_bytes(),
501 RECORD.as_bytes().to_owned(),
502 ),
503 (
504 Compression::Gzip,
505 Compression::None,
506 false,
507 RECORD.as_bytes(),
508 RECORD.as_bytes().to_owned(),
509 ),
510 (
511 Compression::Gzip,
512 Compression::Gzip,
513 true,
514 "".as_bytes(),
515 Vec::new(),
516 ),
517 ] {
518 let (rx, addr, _guard) =
519 source(None, None, false, source_record_compression, true, false).await;
520
521 let timestamp: DateTime<Utc> = Utc::now();
522
523 let res = spawn_send(
524 addr,
525 timestamp,
526 vec![record],
527 None,
528 false,
529 record_compression,
530 )
531 .await;
532
533 if success {
534 let events = collect_ready(rx).await;
535
536 let res = res.await.unwrap().unwrap();
537 assert_eq!(200, res.status().as_u16());
538
539 assert_event_data_eq!(
540 events,
541 vec![log_event! {
542 "source_type" => Bytes::from("aws_kinesis_firehose"),
543 "timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
545 "request_id" => REQUEST_ID,
546 "source_arn" => SOURCE_ARN,
547 },]
548 );
549
550 let response: models::FirehoseResponse = res.json().await.unwrap();
551 assert_eq!(response.request_id, REQUEST_ID);
552 } else {
553 let res = res.await.unwrap().unwrap();
554 assert_eq!(400, res.status().as_u16());
555 }
556 }
557 }
558
559 #[tokio::test]
560 async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
561 let gzipped_record = {
562 let mut buf = Vec::new();
563 let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
564 gz.read_to_end(&mut buf).unwrap();
565 buf
566 };
567
568 for (source_record_compression, record_compression, success, record, expected) in [
569 (
570 Compression::Auto,
571 Compression::Gzip,
572 true,
573 RECORD.as_bytes(),
574 RECORD.as_bytes().to_owned(),
575 ),
576 (
577 Compression::Auto,
578 Compression::None,
579 true,
580 RECORD.as_bytes(),
581 RECORD.as_bytes().to_owned(),
582 ),
583 (
584 Compression::None,
585 Compression::Gzip,
586 true,
587 RECORD.as_bytes(),
588 gzipped_record,
589 ),
590 (
591 Compression::None,
592 Compression::None,
593 true,
594 RECORD.as_bytes(),
595 RECORD.as_bytes().to_owned(),
596 ),
597 (
598 Compression::Gzip,
599 Compression::Gzip,
600 true,
601 RECORD.as_bytes(),
602 RECORD.as_bytes().to_owned(),
603 ),
604 (
605 Compression::Gzip,
606 Compression::None,
607 false,
608 RECORD.as_bytes(),
609 RECORD.as_bytes().to_owned(),
610 ),
611 (
612 Compression::Gzip,
613 Compression::Gzip,
614 true,
615 "".as_bytes(),
616 Vec::new(),
617 ),
618 ] {
619 let (rx, addr, _guard) =
620 source(None, None, false, source_record_compression, true, true).await;
621
622 let timestamp: DateTime<Utc> = Utc::now();
623
624 let res = spawn_send(
625 addr,
626 timestamp,
627 vec![record],
628 None,
629 false,
630 record_compression,
631 )
632 .await;
633
634 if success {
635 let events = collect_ready(rx).await;
636
637 let res = res.await.unwrap().unwrap();
638 assert_eq!(200, res.status().as_u16());
639
640 for event in events {
641 let log = event.as_log();
642 let meta = log.metadata();
643
644 assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
646
647 assert_eq!(
649 meta.value().get(path!("vector", "source_type")).unwrap(),
650 &value!("aws_kinesis_firehose")
651 );
652 assert!(
653 meta.value()
654 .get(path!("vector", "ingest_timestamp"))
655 .unwrap()
656 .is_timestamp()
657 );
658
659 assert_eq!(
661 meta.value()
662 .get(path!("aws_kinesis_firehose", "request_id"))
663 .unwrap(),
664 &value!(REQUEST_ID)
665 );
666 assert_eq!(
667 meta.value()
668 .get(path!("aws_kinesis_firehose", "source_arn"))
669 .unwrap(),
670 &value!(SOURCE_ARN)
671 );
672 assert_eq!(
673 meta.value()
674 .get(path!("aws_kinesis_firehose", "timestamp"))
675 .unwrap(),
676 &value!(timestamp.trunc_subsecs(3))
677 );
678 }
679
680 let response: models::FirehoseResponse = res.json().await.unwrap();
681 assert_eq!(response.request_id, REQUEST_ID);
682 } else {
683 let res = res.await.unwrap().unwrap();
684 assert_eq!(400, res.status().as_u16());
685 }
686 }
687 }
688
689 #[tokio::test]
690 async fn aws_kinesis_firehose_forwards_events_gzip_request() {
691 assert_source_compliance(&SOURCE_TAGS, async move {
692 let (rx, addr, _guard) =
693 source(None, None, false, Default::default(), true, false).await;
694
695 let timestamp: DateTime<Utc> = Utc::now();
696
697 let res = spawn_send(
698 addr,
699 timestamp,
700 vec![RECORD.as_bytes()],
701 None,
702 true,
703 Compression::None,
704 )
705 .await;
706
707 let events = collect_ready(rx).await;
708 let res = res.await.unwrap().unwrap();
709 assert_eq!(200, res.status().as_u16());
710
711 assert_event_data_eq!(
712 events,
713 vec![log_event! {
714 "source_type" => Bytes::from("aws_kinesis_firehose"),
715 "timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
717 "request_id" => REQUEST_ID,
718 "source_arn" => SOURCE_ARN,
719 },]
720 );
721
722 let response: models::FirehoseResponse = res.json().await.unwrap();
723 assert_eq!(response.request_id, REQUEST_ID);
724 })
725 .await;
726 }
727
728 #[tokio::test]
729 async fn aws_kinesis_firehose_rejects_bad_access_key() {
730 let (_rx, addr, _guard) = source(
731 Some("an access key".to_string().into()),
732 Some(vec!["an access key in list".to_string().into()]),
733 Default::default(),
734 Default::default(),
735 true,
736 false,
737 )
738 .await;
739
740 let res = send(
741 addr,
742 Utc::now(),
743 vec![],
744 Some("bad access key"),
745 false,
746 Compression::None,
747 )
748 .await
749 .unwrap();
750 assert_eq!(401, res.status().as_u16());
751
752 let response: models::FirehoseResponse = res.json().await.unwrap();
753 assert_eq!(response.request_id, REQUEST_ID);
754 }
755
756 #[tokio::test]
757 async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
758 let (_rx, addr, _guard) = source(
759 None,
760 Some(vec!["an access key in list".to_string().into()]),
761 Default::default(),
762 Default::default(),
763 true,
764 false,
765 )
766 .await;
767
768 let res = send(
769 addr,
770 Utc::now(),
771 vec![],
772 Some("bad access key"),
773 false,
774 Compression::None,
775 )
776 .await
777 .unwrap();
778 assert_eq!(401, res.status().as_u16());
779
780 let response: models::FirehoseResponse = res.json().await.unwrap();
781 assert_eq!(response.request_id, REQUEST_ID);
782 }
783
784 #[tokio::test]
785 async fn aws_kinesis_firehose_accepts_merged_access_keys() {
786 let valid_access_key = SensitiveString::from(String::from("an access key in list"));
787
788 let (_rx, addr, _guard) = source(
789 Some(valid_access_key.clone()),
790 Some(vec!["valid access key 2".to_string().into()]),
791 Default::default(),
792 Default::default(),
793 true,
794 false,
795 )
796 .await;
797
798 let res = send(
799 addr,
800 Utc::now(),
801 vec![],
802 Some(valid_access_key.clone().inner()),
803 false,
804 Compression::None,
805 )
806 .await
807 .unwrap();
808
809 assert_eq!(200, res.status().as_u16());
810
811 let response: models::FirehoseResponse = res.json().await.unwrap();
812 assert_eq!(response.request_id, REQUEST_ID);
813 }
814
815 #[tokio::test]
816 async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
817 let valid_access_key = "an access key in list".to_string();
818
819 let (_rx, addr, _guard) = source(
820 None,
821 Some(vec![
822 valid_access_key.clone().into(),
823 "valid access key 2".to_string().into(),
824 ]),
825 Default::default(),
826 Default::default(),
827 true,
828 false,
829 )
830 .await;
831
832 let res = send(
833 addr,
834 Utc::now(),
835 vec![],
836 Some(&valid_access_key),
837 false,
838 Compression::None,
839 )
840 .await
841 .unwrap();
842
843 assert_eq!(200, res.status().as_u16());
844
845 let response: models::FirehoseResponse = res.json().await.unwrap();
846 assert_eq!(response.request_id, REQUEST_ID);
847 }
848
849 #[tokio::test]
850 async fn handles_acknowledgement_failure() {
851 let expected = RECORD.as_bytes().to_owned();
852
853 let (rx, addr, _guard) = source(None, None, false, Compression::None, false, false).await;
854
855 let timestamp: DateTime<Utc> = Utc::now();
856
857 let res = spawn_send(
858 addr,
859 timestamp,
860 vec![RECORD.as_bytes()],
861 None,
862 false,
863 Compression::None,
864 )
865 .await;
866
867 let events = collect_ready(rx).await;
868
869 let res = res.await.unwrap().unwrap();
870 assert_eq!(406, res.status().as_u16());
871
872 assert_event_data_eq!(
873 events,
874 vec![log_event! {
875 "source_type" => Bytes::from("aws_kinesis_firehose"),
876 "timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
878 "request_id" => REQUEST_ID,
879 "source_arn" => SOURCE_ARN,
880 },]
881 );
882
883 let response: models::FirehoseResponse = res.json().await.unwrap();
884 assert_eq!(response.request_id, REQUEST_ID);
885 }
886
887 #[tokio::test]
888 async fn event_access_key_passthrough_enabled() {
889 let (rx, address, _guard) = source(
890 None,
891 Some(vec!["an access key".to_string().into()]),
892 true,
893 Default::default(),
894 true,
895 true,
896 )
897 .await;
898
899 let timestamp: DateTime<Utc> = Utc::now();
900
901 spawn_send(
902 address,
903 timestamp,
904 vec![RECORD.as_bytes()],
905 Some("an access key"),
906 false,
907 Compression::None,
908 )
909 .await;
910
911 let events = collect_ready(rx).await;
912 let access_key = events[0]
913 .metadata()
914 .secrets()
915 .get("aws_kinesis_firehose_access_key")
916 .unwrap();
917 assert_eq!(access_key.to_string(), "an access key".to_string());
918 }
919
920 #[tokio::test]
921 async fn no_authorization_access_key_passthrough_enabled() {
922 let (rx, address, _guard) = source(None, None, true, Default::default(), true, true).await;
923
924 let timestamp: DateTime<Utc> = Utc::now();
925
926 spawn_send(
927 address,
928 timestamp,
929 vec![RECORD.as_bytes()],
930 None,
931 false,
932 Compression::None,
933 )
934 .await;
935
936 let events = collect_ready(rx).await;
937
938 assert!(
939 events[0]
940 .metadata()
941 .secrets()
942 .get("aws_kinesis_firehose_access_key")
943 .is_none()
944 );
945 }
946}