vector/sources/aws_kinesis_firehose/
filters.rs1use std::{convert::Infallible, io};
2
3use bytes::{Buf, Bytes};
4use chrono::Utc;
5use flate2::read::MultiGzDecoder;
6use snafu::ResultExt;
7use vector_lib::{
8 config::LogNamespace,
9 internal_event::{BytesReceived, Protocol},
10};
11use warp::{Filter, http::StatusCode};
12
13use super::{
14 Compression,
15 errors::{ParseSnafu, RequestError},
16 handlers,
17 models::{FirehoseRequest, FirehoseResponse},
18};
19use crate::{
20 SourceSender, codecs,
21 internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
22};
23
24pub fn firehose(
26 access_keys: Vec<String>,
27 store_access_key: bool,
28 record_compression: Compression,
29 decoder: codecs::Decoder,
30 acknowledgements: bool,
31 out: SourceSender,
32 log_namespace: LogNamespace,
33) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
34 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
35 let context = handlers::Context {
36 compression: record_compression,
37 store_access_key,
38 decoder,
39 acknowledgements,
40 bytes_received,
41 out,
42 log_namespace,
43 };
44 warp::post()
45 .and(emit_received())
46 .and(authenticate(access_keys))
47 .and(warp::header("X-Amz-Firehose-Request-Id"))
48 .and(warp::header("X-Amz-Firehose-Source-Arn"))
49 .and(
50 warp::header("X-Amz-Firehose-Protocol-Version")
51 .and_then(|version: String| async move {
52 match version.as_str() {
53 "1.0" => Ok(()),
54 _ => Err(warp::reject::custom(
55 RequestError::UnsupportedProtocolVersion { version },
56 )),
57 }
58 })
59 .untuple_one(),
60 )
61 .and(parse_body())
62 .and(warp::any().map(move || context.clone()))
63 .and_then(handlers::firehose)
64 .recover(handle_firehose_rejection)
65}
66
67fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::reject::Rejection> + Clone
71{
72 warp::any()
73 .and(warp::header::optional::<String>("Content-Encoding"))
74 .and(warp::header("X-Amz-Firehose-Request-Id"))
75 .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
76 .and(warp::body::bytes())
77 .and_then(
78 |encoding: Option<String>,
79 request_id: String,
80 access_key: Option<String>,
81 body: Bytes| async move {
82 match encoding {
83 Some(s) if s == "gzip" => {
84 Ok(Box::new(MultiGzDecoder::new(body.reader())) as Box<dyn io::Read>)
85 }
86 Some(s) => Err(warp::reject::Rejection::from(
87 RequestError::UnsupportedEncoding {
88 encoding: s,
89 request_id: request_id.clone(),
90 },
91 )),
92 None => Ok(Box::new(body.reader()) as Box<dyn io::Read>),
93 }
94 .and_then(|r| {
95 serde_json::from_reader(r)
96 .context(ParseSnafu {
97 request_id: request_id.clone(),
98 })
99 .map(|request: FirehoseRequest| FirehoseRequest {
100 access_key,
101 ..request
102 })
103 .map_err(warp::reject::custom)
104 })
105 },
106 )
107}
108
109fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
110 warp::any()
111 .and(warp::header::optional("X-Amz-Firehose-Request-Id"))
112 .and(warp::header::optional("X-Amz-Firehose-Source-Arn"))
113 .map(|request_id: Option<String>, source_arn: Option<String>| {
114 emit!(AwsKinesisFirehoseRequestReceived {
115 request_id: request_id.as_deref(),
116 source_arn: source_arn.as_deref(),
117 });
118 })
119 .untuple_one()
120}
121
122fn authenticate(
124 configured_access_keys: Vec<String>,
125) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
126 warp::any()
127 .and(warp::header("X-Amz-Firehose-Request-Id"))
128 .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
129 .and_then(move |request_id: String, access_key: Option<String>| {
130 let configured_access_keys = configured_access_keys.clone();
131
132 async move {
133 match (access_key, configured_access_keys.is_empty()) {
134 (_, true) => Ok(()),
136 (Some(access_key), false) if configured_access_keys.contains(&access_key) => {
138 Ok(())
139 }
140 (Some(_), false) => Err(warp::reject::custom(RequestError::AccessKeyInvalid {
142 request_id,
143 })),
144 (None, false) => Err(warp::reject::custom(RequestError::AccessKeyMissing {
146 request_id,
147 })),
148 }
149 }
150 })
151 .untuple_one()
152}
153
154async fn handle_firehose_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
156 let request_id: Option<&str>;
157 let message: String;
158 let code: StatusCode;
159
160 if let Some(e) = err.find::<RequestError>() {
161 message = e.to_string();
162 code = e.status();
163 request_id = e.request_id();
164 } else if let Some(e) = err.find::<warp::reject::MissingHeader>() {
165 code = StatusCode::BAD_REQUEST;
166 message = format!("Required header missing: {}", e.name());
167 request_id = None;
168 } else {
169 code = StatusCode::INTERNAL_SERVER_ERROR;
170 message = format!("{err:?}");
171 request_id = None;
172 }
173
174 emit!(AwsKinesisFirehoseRequestError::new(
175 code,
176 message.as_str(),
177 request_id
178 ));
179
180 let json = warp::reply::json(&FirehoseResponse {
181 request_id: request_id.unwrap_or_default().to_string(),
182 timestamp: Utc::now(),
183 error_message: Some(message),
184 });
185
186 Ok(warp::reply::with_status(json, code))
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[tokio::test]
194 async fn request_construction() {
195 let parsed = warp::test::request()
196 .header(
197 "x-amzn-trace-id",
198 "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
199 )
200 .header("x-amz-firehose-protocol-version", "1.0")
201 .header(
202 "X-Amz-Firehose-Request-Id",
203 "e17265d6-97af-4938-982e-90d5614c4242",
204 )
205 .header(
206 "x-amz-firehose-source-arn",
207 "arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
208 )
209 .header("x-amz-firehose-access-key", "secret123")
210 .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
211 .header("content-type", "application/json")
212 .header("Content-Encoding", "gzip")
213 .body({
214 let mut gz = flate2::read::GzEncoder::new(
215 io::Cursor::new(
216 serde_json::to_vec(&FirehoseRequest {
217 access_key: None,
218 request_id: "e17265d6-97af-4938-982e-90d5614c4242".to_owned(),
219 records: Vec::new(),
220 timestamp: Utc::now(),
221 })
222 .unwrap(),
223 ),
224 flate2::Compression::fast(),
225 );
226 let mut buffer = Vec::new();
227 io::Read::read_to_end(&mut gz, &mut buffer).unwrap();
228 buffer
229 })
230 .filter(&parse_body())
231 .await
232 .unwrap();
233
234 assert_eq!(parsed.access_key, Some("secret123".to_owned()));
235 }
236}