vector/sources/aws_kinesis_firehose/
filters.rsuse std::{convert::Infallible, io};
use bytes::{Buf, Bytes};
use chrono::Utc;
use flate2::read::MultiGzDecoder;
use snafu::ResultExt;
use vector_lib::config::LogNamespace;
use vector_lib::internal_event::{BytesReceived, Protocol};
use warp::{http::StatusCode, Filter};
use super::{
errors::{ParseSnafu, RequestError},
handlers,
models::{FirehoseRequest, FirehoseResponse},
Compression,
};
use crate::{
codecs,
internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
SourceSender,
};
pub fn firehose(
access_keys: Vec<String>,
store_access_key: bool,
record_compression: Compression,
decoder: codecs::Decoder,
acknowledgements: bool,
out: SourceSender,
log_namespace: LogNamespace,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
let context = handlers::Context {
compression: record_compression,
store_access_key,
decoder,
acknowledgements,
bytes_received,
out,
log_namespace,
};
warp::post()
.and(emit_received())
.and(authenticate(access_keys))
.and(warp::header("X-Amz-Firehose-Request-Id"))
.and(warp::header("X-Amz-Firehose-Source-Arn"))
.and(
warp::header("X-Amz-Firehose-Protocol-Version")
.and_then(|version: String| async move {
match version.as_str() {
"1.0" => Ok(()),
_ => Err(warp::reject::custom(
RequestError::UnsupportedProtocolVersion { version },
)),
}
})
.untuple_one(),
)
.and(parse_body())
.and(warp::any().map(move || context.clone()))
.and_then(handlers::firehose)
.recover(handle_firehose_rejection)
}
fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::reject::Rejection> + Clone
{
warp::any()
.and(warp::header::optional::<String>("Content-Encoding"))
.and(warp::header("X-Amz-Firehose-Request-Id"))
.and(warp::header::optional("X-Amz-Firehose-Access-Key"))
.and(warp::body::bytes())
.and_then(
|encoding: Option<String>,
request_id: String,
access_key: Option<String>,
body: Bytes| async move {
match encoding {
Some(s) if s == "gzip" => {
Ok(Box::new(MultiGzDecoder::new(body.reader())) as Box<dyn io::Read>)
}
Some(s) => Err(warp::reject::Rejection::from(
RequestError::UnsupportedEncoding {
encoding: s,
request_id: request_id.clone(),
},
)),
None => Ok(Box::new(body.reader()) as Box<dyn io::Read>),
}
.and_then(|r| {
serde_json::from_reader(r)
.context(ParseSnafu {
request_id: request_id.clone(),
})
.map(|request: FirehoseRequest| FirehoseRequest {
access_key,
..request
})
.map_err(warp::reject::custom)
})
},
)
}
fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
warp::any()
.and(warp::header::optional("X-Amz-Firehose-Request-Id"))
.and(warp::header::optional("X-Amz-Firehose-Source-Arn"))
.map(|request_id: Option<String>, source_arn: Option<String>| {
emit!(AwsKinesisFirehoseRequestReceived {
request_id: request_id.as_deref(),
source_arn: source_arn.as_deref(),
});
})
.untuple_one()
}
fn authenticate(
configured_access_keys: Vec<String>,
) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
warp::any()
.and(warp::header("X-Amz-Firehose-Request-Id"))
.and(warp::header::optional("X-Amz-Firehose-Access-Key"))
.and_then(move |request_id: String, access_key: Option<String>| {
let configured_access_keys = configured_access_keys.clone();
async move {
match (access_key, configured_access_keys.is_empty()) {
(_, true) => Ok(()),
(Some(access_key), false) if configured_access_keys.contains(&access_key) => {
Ok(())
}
(Some(_), false) => Err(warp::reject::custom(RequestError::AccessKeyInvalid {
request_id,
})),
(None, false) => Err(warp::reject::custom(RequestError::AccessKeyMissing {
request_id,
})),
}
}
})
.untuple_one()
}
async fn handle_firehose_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
let request_id: Option<&str>;
let message: String;
let code: StatusCode;
if let Some(e) = err.find::<RequestError>() {
message = e.to_string();
code = e.status();
request_id = e.request_id();
} else if let Some(e) = err.find::<warp::reject::MissingHeader>() {
code = StatusCode::BAD_REQUEST;
message = format!("Required header missing: {}", e.name());
request_id = None;
} else {
code = StatusCode::INTERNAL_SERVER_ERROR;
message = format!("{:?}", err);
request_id = None;
}
emit!(AwsKinesisFirehoseRequestError::new(
code,
message.as_str(),
request_id
));
let json = warp::reply::json(&FirehoseResponse {
request_id: request_id.unwrap_or_default().to_string(),
timestamp: Utc::now(),
error_message: Some(message),
});
Ok(warp::reply::with_status(json, code))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn request_construction() {
let parsed = warp::test::request()
.header(
"x-amzn-trace-id",
"Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
)
.header("x-amz-firehose-protocol-version", "1.0")
.header(
"X-Amz-Firehose-Request-Id",
"e17265d6-97af-4938-982e-90d5614c4242",
)
.header(
"x-amz-firehose-source-arn",
"arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
)
.header("x-amz-firehose-access-key", "secret123")
.header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
.header("content-type", "application/json")
.header("Content-Encoding", "gzip")
.body({
let mut gz = flate2::read::GzEncoder::new(
io::Cursor::new(
serde_json::to_vec(&FirehoseRequest {
access_key: None,
request_id: "e17265d6-97af-4938-982e-90d5614c4242".to_owned(),
records: Vec::new(),
timestamp: Utc::now(),
})
.unwrap(),
),
flate2::Compression::fast(),
);
let mut buffer = Vec::new();
io::Read::read_to_end(&mut gz, &mut buffer).unwrap();
buffer
})
.filter(&parse_body())
.await
.unwrap();
assert_eq!(parsed.access_key, Some("secret123".to_owned()));
}
}