use std::time::Duration;
use std::{convert::Infallible, fmt, net::SocketAddr};
use futures::FutureExt;
use hyper::{service::make_service_fn, Server};
use tokio::net::TcpStream;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::owned_value_path;
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::tls::MaybeTlsIncomingStream;
use vrl::value::Kind;
use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
use crate::{
codecs::DecodingConfig,
config::{
GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
SourceOutput,
},
http::build_http_trace_layer,
serde::{bool_or_struct, default_decoding, default_framing_message_based},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};
pub mod errors;
mod filters;
mod handlers;
mod models;
#[configurable_component(source(
"aws_kinesis_firehose",
"Collect logs from AWS Kinesis Firehose."
))]
#[derive(Clone, Debug)]
pub struct AwsKinesisFirehoseConfig {
#[configurable(metadata(docs::examples = "0.0.0.0:443"))]
#[configurable(metadata(docs::examples = "localhost:443"))]
address: SocketAddr,
#[configurable(deprecated = "This option has been deprecated, use `access_keys` instead.")]
#[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
access_key: Option<SensitiveString>,
#[configurable(metadata(docs::examples = "access_keys_example()"))]
access_keys: Option<Vec<SensitiveString>>,
#[configurable(derived)]
store_access_key: bool,
#[serde(default)]
record_compression: Compression,
#[configurable(derived)]
tls: Option<TlsEnableableConfig>,
#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
#[serde(default = "default_framing_message_based")]
framing: FramingConfig,
#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
#[serde(default = "default_decoding")]
decoding: DeserializerConfig,
#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: SourceAcknowledgementsConfig,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
#[configurable(derived)]
#[serde(default)]
keepalive: KeepaliveConfig,
}
const fn access_keys_example() -> [&'static str; 2] {
["A94A8FE5CCB19BA61C4C08", "B94B8FE5CCB19BA61C4C12"]
}
#[configurable_component]
#[configurable(metadata(docs::advanced))]
#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
#[derivative(Default)]
pub enum Compression {
#[derivative(Default)]
Auto,
None,
Gzip,
}
impl fmt::Display for Compression {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
Compression::Auto => write!(fmt, "auto"),
Compression::None => write!(fmt, "none"),
Compression::Gzip => write!(fmt, "gzip"),
}
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "aws_kinesis_firehose")]
impl SourceConfig for AwsKinesisFirehoseConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
if self.access_key.is_some() {
warn!("DEPRECATION `access_key`, use `access_keys` instead.")
}
let access_keys = self
.access_keys
.iter()
.flatten()
.chain(self.access_key.iter());
let svc = filters::firehose(
access_keys.map(|key| key.inner().to_string()).collect(),
self.store_access_key,
self.record_compression,
decoder,
acknowledgements,
cx.out,
log_namespace,
);
let tls = MaybeTlsSettings::from_config(&self.tls, true)?;
let listener = tls.bind(&self.address).await?;
let keepalive_settings = self.keepalive.clone();
let shutdown = cx.shutdown;
Ok(Box::pin(async move {
let span = Span::current();
let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
let svc = ServiceBuilder::new()
.layer(build_http_trace_layer(span.clone()))
.option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
MaxConnectionAgeLayer::new(
Duration::from_secs(secs),
keepalive_settings.max_connection_age_jitter_factor,
conn.peer_addr(),
)
}))
.service(warp::service(svc.clone()));
futures_util::future::ok::<_, Infallible>(svc)
});
Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
.serve(make_svc)
.with_graceful_shutdown(shutdown.map(|_| ()))
.await
.map_err(|err| {
error!("An error occurred: {:?}.", err);
})?;
Ok(())
}))
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let schema_definition = self
.decoding
.schema_definition(global_log_namespace.merge(self.log_namespace))
.with_standard_vector_source_metadata()
.with_source_metadata(
Self::NAME,
Some(LegacyKey::InsertIfEmpty(owned_value_path!("request_id"))),
&owned_value_path!("request_id"),
Kind::bytes(),
None,
)
.with_source_metadata(
Self::NAME,
Some(LegacyKey::InsertIfEmpty(owned_value_path!("source_arn"))),
&owned_value_path!("source_arn"),
Kind::bytes(),
None,
);
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
}
fn resources(&self) -> Vec<Resource> {
vec![Resource::tcp(self.address)]
}
fn can_acknowledge(&self) -> bool {
true
}
}
impl GenerateConfig for AwsKinesisFirehoseConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
address: "0.0.0.0:443".parse().unwrap(),
access_key: None,
access_keys: None,
store_access_key: false,
tls: None,
record_compression: Default::default(),
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: Default::default(),
log_namespace: None,
keepalive: Default::default(),
})
.unwrap()
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::print_stdout)] use std::{
io::{Cursor, Read},
net::SocketAddr,
};
use base64::prelude::{Engine as _, BASE64_STANDARD};
use bytes::Bytes;
use chrono::{DateTime, SubsecRound, Utc};
use flate2::read::GzEncoder;
use futures::Stream;
use similar_asserts::assert_eq;
use tokio::time::{sleep, Duration};
use vector_lib::assert_event_data_eq;
use vector_lib::lookup::path;
use vrl::value;
use super::*;
use crate::{
event::{Event, EventStatus},
log_event,
test_util::{
collect_ready,
components::{assert_source_compliance, SOURCE_TAGS},
next_addr, wait_for_tcp,
},
SourceSender,
};
const SOURCE_ARN: &str = "arn:aws:firehose:us-east-1:111111111111:deliverystream/test";
const REQUEST_ID: &str = "e17265d6-97af-4938-982e-90d5614c4242";
const RECORD: &str = r#"
{
"messageType": "DATA_MESSAGE",
"owner": "071959437513",
"logGroup": "/jesse/test",
"logStream": "test",
"subscriptionFilters": ["Destination"],
"logEvents": [
{
"id": "35683658089614582423604394983260738922885519999578275840",
"timestamp": 1600110569039,
"message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
},
{
"id": "35683658089659183914001456229543810359430816722590236673",
"timestamp": 1600110569041,
"message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
}
]
}
"#;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<AwsKinesisFirehoseConfig>();
}
async fn source(
access_key: Option<SensitiveString>,
access_keys: Option<Vec<SensitiveString>>,
store_access_key: bool,
record_compression: Compression,
delivered: bool,
log_namespace: bool,
) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
use EventStatus::*;
let status = if delivered { Delivered } else { Rejected };
let (sender, recv) = SourceSender::new_test_finalize(status);
let address = next_addr();
let cx = SourceContext::new_test(sender, None);
tokio::spawn(async move {
AwsKinesisFirehoseConfig {
address,
tls: None,
access_key,
access_keys,
store_access_key,
record_compression,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: true.into(),
log_namespace: Some(log_namespace),
keepalive: Default::default(),
}
.build(cx)
.await
.unwrap()
.await
.unwrap()
});
wait_for_tcp(address).await;
(recv, address)
}
async fn send(
address: SocketAddr,
timestamp: DateTime<Utc>,
records: Vec<&[u8]>,
key: Option<&str>,
gzip: bool,
record_compression: Compression,
) -> reqwest::Result<reqwest::Response> {
let request = models::FirehoseRequest {
access_key: key.map(|s| s.to_string()),
request_id: REQUEST_ID.to_string(),
timestamp,
records: records
.into_iter()
.map(|record| models::EncodedFirehoseRecord {
data: encode_record(record, record_compression).unwrap(),
})
.collect(),
};
let mut builder = reqwest::Client::new()
.post(format!("http://{}", address))
.header("host", address.to_string())
.header(
"x-amzn-trace-id",
"Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
)
.header("x-amz-firehose-protocol-version", "1.0")
.header("x-amz-firehose-request-id", REQUEST_ID.to_string())
.header("x-amz-firehose-source-arn", SOURCE_ARN.to_string())
.header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
.header("content-type", "application/json");
if let Some(key) = key {
builder = builder.header("x-amz-firehose-access-key", key);
}
if gzip {
let mut gz = GzEncoder::new(
Cursor::new(serde_json::to_vec(&request).unwrap()),
flate2::Compression::fast(),
);
let mut buffer = Vec::new();
gz.read_to_end(&mut buffer).unwrap();
builder = builder.header("content-encoding", "gzip").body(buffer);
} else {
builder = builder.json(&request);
}
builder.send().await
}
async fn spawn_send(
address: SocketAddr,
timestamp: DateTime<Utc>,
records: Vec<&'static [u8]>,
key: Option<&'static str>,
gzip: bool,
record_compression: Compression,
) -> tokio::task::JoinHandle<reqwest::Result<reqwest::Response>> {
let handle = tokio::spawn(async move {
send(address, timestamp, records, key, gzip, record_compression).await
});
sleep(Duration::from_millis(100)).await;
handle
}
fn encode_record(record: &[u8], compression: Compression) -> std::io::Result<String> {
let compressed = match compression {
Compression::Auto => panic!("cannot encode records as Auto"),
Compression::Gzip => {
let mut buffer = Vec::new();
if !record.is_empty() {
let mut gz = GzEncoder::new(record, flate2::Compression::fast());
gz.read_to_end(&mut buffer)?;
}
buffer
}
Compression::None => record.to_vec(),
};
Ok(BASE64_STANDARD.encode(compressed))
}
#[tokio::test]
async fn aws_kinesis_firehose_forwards_events_legacy_namespace() {
let gzipped_record = {
let mut buf = Vec::new();
let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
gz.read_to_end(&mut buf).unwrap();
buf
};
for (source_record_compression, record_compression, success, record, expected) in [
(
Compression::Auto,
Compression::Gzip,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Auto,
Compression::None,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::None,
Compression::Gzip,
true,
RECORD.as_bytes(),
gzipped_record,
),
(
Compression::None,
Compression::None,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::Gzip,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::None,
false,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::Gzip,
true,
"".as_bytes(),
Vec::new(),
),
] {
let (rx, addr) =
source(None, None, false, source_record_compression, true, false).await;
let timestamp: DateTime<Utc> = Utc::now();
let res = spawn_send(
addr,
timestamp,
vec![record],
None,
false,
record_compression,
)
.await;
if success {
let events = collect_ready(rx).await;
let res = res.await.unwrap().unwrap();
assert_eq!(200, res.status().as_u16());
assert_event_data_eq!(
events,
vec![log_event! {
"source_type" => Bytes::from("aws_kinesis_firehose"),
"timestamp" => timestamp.trunc_subsecs(3), "message" => Bytes::from(expected),
"request_id" => REQUEST_ID,
"source_arn" => SOURCE_ARN,
},]
);
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
} else {
let res = res.await.unwrap().unwrap();
assert_eq!(400, res.status().as_u16());
}
}
}
#[tokio::test]
async fn aws_kinesis_firehose_forwards_events_vector_namespace() {
let gzipped_record = {
let mut buf = Vec::new();
let mut gz = GzEncoder::new(RECORD.as_bytes(), flate2::Compression::fast());
gz.read_to_end(&mut buf).unwrap();
buf
};
for (source_record_compression, record_compression, success, record, expected) in [
(
Compression::Auto,
Compression::Gzip,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Auto,
Compression::None,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::None,
Compression::Gzip,
true,
RECORD.as_bytes(),
gzipped_record,
),
(
Compression::None,
Compression::None,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::Gzip,
true,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::None,
false,
RECORD.as_bytes(),
RECORD.as_bytes().to_owned(),
),
(
Compression::Gzip,
Compression::Gzip,
true,
"".as_bytes(),
Vec::new(),
),
] {
let (rx, addr) = source(None, None, false, source_record_compression, true, true).await;
let timestamp: DateTime<Utc> = Utc::now();
let res = spawn_send(
addr,
timestamp,
vec![record],
None,
false,
record_compression,
)
.await;
if success {
let events = collect_ready(rx).await;
let res = res.await.unwrap().unwrap();
assert_eq!(200, res.status().as_u16());
for event in events {
let log = event.as_log();
let meta = log.metadata();
assert_eq!(log.value(), &value!(Bytes::from(expected.to_owned())));
assert_eq!(
meta.value().get(path!("vector", "source_type")).unwrap(),
&value!("aws_kinesis_firehose")
);
assert!(meta
.value()
.get(path!("vector", "ingest_timestamp"))
.unwrap()
.is_timestamp());
assert_eq!(
meta.value()
.get(path!("aws_kinesis_firehose", "request_id"))
.unwrap(),
&value!(REQUEST_ID)
);
assert_eq!(
meta.value()
.get(path!("aws_kinesis_firehose", "source_arn"))
.unwrap(),
&value!(SOURCE_ARN)
);
assert_eq!(
meta.value()
.get(path!("aws_kinesis_firehose", "timestamp"))
.unwrap(),
&value!(timestamp.trunc_subsecs(3))
);
}
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
} else {
let res = res.await.unwrap().unwrap();
assert_eq!(400, res.status().as_u16());
}
}
}
#[tokio::test]
async fn aws_kinesis_firehose_forwards_events_gzip_request() {
assert_source_compliance(&SOURCE_TAGS, async move {
let (rx, addr) = source(None, None, false, Default::default(), true, false).await;
let timestamp: DateTime<Utc> = Utc::now();
let res = spawn_send(
addr,
timestamp,
vec![RECORD.as_bytes()],
None,
true,
Compression::None,
)
.await;
let events = collect_ready(rx).await;
let res = res.await.unwrap().unwrap();
assert_eq!(200, res.status().as_u16());
assert_event_data_eq!(
events,
vec![log_event! {
"source_type" => Bytes::from("aws_kinesis_firehose"),
"timestamp" => timestamp.trunc_subsecs(3), "message"=> RECORD,
"request_id" => REQUEST_ID,
"source_arn" => SOURCE_ARN,
},]
);
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
})
.await;
}
#[tokio::test]
async fn aws_kinesis_firehose_rejects_bad_access_key() {
let (_rx, addr) = source(
Some("an access key".to_string().into()),
Some(vec!["an access key in list".to_string().into()]),
Default::default(),
Default::default(),
true,
false,
)
.await;
let res = send(
addr,
Utc::now(),
vec![],
Some("bad access key"),
false,
Compression::None,
)
.await
.unwrap();
assert_eq!(401, res.status().as_u16());
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
}
#[tokio::test]
async fn aws_kinesis_firehose_rejects_bad_access_key_from_list() {
let (_rx, addr) = source(
None,
Some(vec!["an access key in list".to_string().into()]),
Default::default(),
Default::default(),
true,
false,
)
.await;
let res = send(
addr,
Utc::now(),
vec![],
Some("bad access key"),
false,
Compression::None,
)
.await
.unwrap();
assert_eq!(401, res.status().as_u16());
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
}
#[tokio::test]
async fn aws_kinesis_firehose_accepts_merged_access_keys() {
let valid_access_key = SensitiveString::from(String::from("an access key in list"));
let (_rx, addr) = source(
Some(valid_access_key.clone()),
Some(vec!["valid access key 2".to_string().into()]),
Default::default(),
Default::default(),
true,
false,
)
.await;
let res = send(
addr,
Utc::now(),
vec![],
Some(valid_access_key.clone().inner()),
false,
Compression::None,
)
.await
.unwrap();
assert_eq!(200, res.status().as_u16());
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
}
#[tokio::test]
async fn aws_kinesis_firehose_accepts_access_keys_from_list() {
let valid_access_key = "an access key in list".to_string();
let (_rx, addr) = source(
None,
Some(vec![
valid_access_key.clone().into(),
"valid access key 2".to_string().into(),
]),
Default::default(),
Default::default(),
true,
false,
)
.await;
let res = send(
addr,
Utc::now(),
vec![],
Some(&valid_access_key),
false,
Compression::None,
)
.await
.unwrap();
assert_eq!(200, res.status().as_u16());
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
}
#[tokio::test]
async fn handles_acknowledgement_failure() {
let expected = RECORD.as_bytes().to_owned();
let (rx, addr) = source(None, None, false, Compression::None, false, false).await;
let timestamp: DateTime<Utc> = Utc::now();
let res = spawn_send(
addr,
timestamp,
vec![RECORD.as_bytes()],
None,
false,
Compression::None,
)
.await;
let events = collect_ready(rx).await;
let res = res.await.unwrap().unwrap();
assert_eq!(406, res.status().as_u16());
assert_event_data_eq!(
events,
vec![log_event! {
"source_type" => Bytes::from("aws_kinesis_firehose"),
"timestamp" => timestamp.trunc_subsecs(3), "message"=> Bytes::from(expected),
"request_id" => REQUEST_ID,
"source_arn" => SOURCE_ARN,
},]
);
let response: models::FirehoseResponse = res.json().await.unwrap();
assert_eq!(response.request_id, REQUEST_ID);
}
#[tokio::test]
async fn event_access_key_passthrough_enabled() {
let (rx, address) = source(
None,
Some(vec!["an access key".to_string().into()]),
true,
Default::default(),
true,
true,
)
.await;
let timestamp: DateTime<Utc> = Utc::now();
spawn_send(
address,
timestamp,
vec![RECORD.as_bytes()],
Some("an access key"),
false,
Compression::None,
)
.await;
let events = collect_ready(rx).await;
let access_key = events[0]
.metadata()
.secrets()
.get("aws_kinesis_firehose_access_key")
.unwrap();
assert_eq!(access_key.to_string(), "an access key".to_string());
}
#[tokio::test]
async fn no_authorization_access_key_passthrough_enabled() {
let (rx, address) = source(None, None, true, Default::default(), true, true).await;
let timestamp: DateTime<Utc> = Utc::now();
spawn_send(
address,
timestamp,
vec![RECORD.as_bytes()],
None,
false,
Compression::None,
)
.await;
let events = collect_ready(rx).await;
assert!(events[0]
.metadata()
.secrets()
.get("aws_kinesis_firehose_access_key")
.is_none());
}
}