vector/sources/aws_kinesis_firehose/
filters.rs1use std::{convert::Infallible, io};
2
3use bytes::{Buf, Bytes};
4use chrono::Utc;
5use flate2::read::MultiGzDecoder;
6use snafu::ResultExt;
7use vector_lib::config::LogNamespace;
8use vector_lib::internal_event::{BytesReceived, Protocol};
9use warp::{http::StatusCode, Filter};
10
11use super::{
12 errors::{ParseSnafu, RequestError},
13 handlers,
14 models::{FirehoseRequest, FirehoseResponse},
15 Compression,
16};
17use crate::{
18 codecs,
19 internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
20 SourceSender,
21};
22
23pub fn firehose(
25 access_keys: Vec<String>,
26 store_access_key: bool,
27 record_compression: Compression,
28 decoder: codecs::Decoder,
29 acknowledgements: bool,
30 out: SourceSender,
31 log_namespace: LogNamespace,
32) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
33 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
34 let context = handlers::Context {
35 compression: record_compression,
36 store_access_key,
37 decoder,
38 acknowledgements,
39 bytes_received,
40 out,
41 log_namespace,
42 };
43 warp::post()
44 .and(emit_received())
45 .and(authenticate(access_keys))
46 .and(warp::header("X-Amz-Firehose-Request-Id"))
47 .and(warp::header("X-Amz-Firehose-Source-Arn"))
48 .and(
49 warp::header("X-Amz-Firehose-Protocol-Version")
50 .and_then(|version: String| async move {
51 match version.as_str() {
52 "1.0" => Ok(()),
53 _ => Err(warp::reject::custom(
54 RequestError::UnsupportedProtocolVersion { version },
55 )),
56 }
57 })
58 .untuple_one(),
59 )
60 .and(parse_body())
61 .and(warp::any().map(move || context.clone()))
62 .and_then(handlers::firehose)
63 .recover(handle_firehose_rejection)
64}
65
66fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::reject::Rejection> + Clone
70{
71 warp::any()
72 .and(warp::header::optional::<String>("Content-Encoding"))
73 .and(warp::header("X-Amz-Firehose-Request-Id"))
74 .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
75 .and(warp::body::bytes())
76 .and_then(
77 |encoding: Option<String>,
78 request_id: String,
79 access_key: Option<String>,
80 body: Bytes| async move {
81 match encoding {
82 Some(s) if s == "gzip" => {
83 Ok(Box::new(MultiGzDecoder::new(body.reader())) as Box<dyn io::Read>)
84 }
85 Some(s) => Err(warp::reject::Rejection::from(
86 RequestError::UnsupportedEncoding {
87 encoding: s,
88 request_id: request_id.clone(),
89 },
90 )),
91 None => Ok(Box::new(body.reader()) as Box<dyn io::Read>),
92 }
93 .and_then(|r| {
94 serde_json::from_reader(r)
95 .context(ParseSnafu {
96 request_id: request_id.clone(),
97 })
98 .map(|request: FirehoseRequest| FirehoseRequest {
99 access_key,
100 ..request
101 })
102 .map_err(warp::reject::custom)
103 })
104 },
105 )
106}
107
108fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
109 warp::any()
110 .and(warp::header::optional("X-Amz-Firehose-Request-Id"))
111 .and(warp::header::optional("X-Amz-Firehose-Source-Arn"))
112 .map(|request_id: Option<String>, source_arn: Option<String>| {
113 emit!(AwsKinesisFirehoseRequestReceived {
114 request_id: request_id.as_deref(),
115 source_arn: source_arn.as_deref(),
116 });
117 })
118 .untuple_one()
119}
120
121fn authenticate(
123 configured_access_keys: Vec<String>,
124) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
125 warp::any()
126 .and(warp::header("X-Amz-Firehose-Request-Id"))
127 .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
128 .and_then(move |request_id: String, access_key: Option<String>| {
129 let configured_access_keys = configured_access_keys.clone();
130
131 async move {
132 match (access_key, configured_access_keys.is_empty()) {
133 (_, true) => Ok(()),
135 (Some(access_key), false) if configured_access_keys.contains(&access_key) => {
137 Ok(())
138 }
139 (Some(_), false) => Err(warp::reject::custom(RequestError::AccessKeyInvalid {
141 request_id,
142 })),
143 (None, false) => Err(warp::reject::custom(RequestError::AccessKeyMissing {
145 request_id,
146 })),
147 }
148 }
149 })
150 .untuple_one()
151}
152
153async fn handle_firehose_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
155 let request_id: Option<&str>;
156 let message: String;
157 let code: StatusCode;
158
159 if let Some(e) = err.find::<RequestError>() {
160 message = e.to_string();
161 code = e.status();
162 request_id = e.request_id();
163 } else if let Some(e) = err.find::<warp::reject::MissingHeader>() {
164 code = StatusCode::BAD_REQUEST;
165 message = format!("Required header missing: {}", e.name());
166 request_id = None;
167 } else {
168 code = StatusCode::INTERNAL_SERVER_ERROR;
169 message = format!("{err:?}");
170 request_id = None;
171 }
172
173 emit!(AwsKinesisFirehoseRequestError::new(
174 code,
175 message.as_str(),
176 request_id
177 ));
178
179 let json = warp::reply::json(&FirehoseResponse {
180 request_id: request_id.unwrap_or_default().to_string(),
181 timestamp: Utc::now(),
182 error_message: Some(message),
183 });
184
185 Ok(warp::reply::with_status(json, code))
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[tokio::test]
193 async fn request_construction() {
194 let parsed = warp::test::request()
195 .header(
196 "x-amzn-trace-id",
197 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
198 )
199 .header("x-amz-firehose-protocol-version", "1.0")
200 .header(
201 "X-Amz-Firehose-Request-Id",
202 "e17265d6-97af-4938-982e-90d5614c4242",
203 )
204 .header(
205 "x-amz-firehose-source-arn",
206 "arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
207 )
208 .header("x-amz-firehose-access-key", "secret123")
209 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
210 .header("content-type", "application/json")
211 .header("Content-Encoding", "gzip")
212 .body({
213 let mut gz = flate2::read::GzEncoder::new(
214 io::Cursor::new(
215 serde_json::to_vec(&FirehoseRequest {
216 access_key: None,
217 request_id: "e17265d6-97af-4938-982e-90d5614c4242".to_owned(),
218 records: Vec::new(),
219 timestamp: Utc::now(),
220 })
221 .unwrap(),
222 ),
223 flate2::Compression::fast(),
224 );
225 let mut buffer = Vec::new();
226 io::Read::read_to_end(&mut gz, &mut buffer).unwrap();
227 buffer
228 })
229 .filter(&parse_body())
230 .await
231 .unwrap();
232
233 assert_eq!(parsed.access_key, Some("secret123".to_owned()));
234 }
235}