vector/sources/aws_kinesis_firehose/
filters.rs

1use std::{convert::Infallible, io};
2
3use bytes::{Buf, Bytes};
4use chrono::Utc;
5use flate2::read::MultiGzDecoder;
6use snafu::ResultExt;
7use vector_lib::config::LogNamespace;
8use vector_lib::internal_event::{BytesReceived, Protocol};
9use warp::{http::StatusCode, Filter};
10
11use super::{
12    errors::{ParseSnafu, RequestError},
13    handlers,
14    models::{FirehoseRequest, FirehoseResponse},
15    Compression,
16};
17use crate::{
18    codecs,
19    internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
20    SourceSender,
21};
22
23/// Handles routing of incoming HTTP requests from AWS Kinesis Firehose
24pub fn firehose(
25    access_keys: Vec<String>,
26    store_access_key: bool,
27    record_compression: Compression,
28    decoder: codecs::Decoder,
29    acknowledgements: bool,
30    out: SourceSender,
31    log_namespace: LogNamespace,
32) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
33    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
34    let context = handlers::Context {
35        compression: record_compression,
36        store_access_key,
37        decoder,
38        acknowledgements,
39        bytes_received,
40        out,
41        log_namespace,
42    };
43    warp::post()
44        .and(emit_received())
45        .and(authenticate(access_keys))
46        .and(warp::header("X-Amz-Firehose-Request-Id"))
47        .and(warp::header("X-Amz-Firehose-Source-Arn"))
48        .and(
49            warp::header("X-Amz-Firehose-Protocol-Version")
50                .and_then(|version: String| async move {
51                    match version.as_str() {
52                        "1.0" => Ok(()),
53                        _ => Err(warp::reject::custom(
54                            RequestError::UnsupportedProtocolVersion { version },
55                        )),
56                    }
57                })
58                .untuple_one(),
59        )
60        .and(parse_body())
61        .and(warp::any().map(move || context.clone()))
62        .and_then(handlers::firehose)
63        .recover(handle_firehose_rejection)
64}
65
66/// Decode (if needed) and parse request body
67///
68/// Firehose can be configured to gzip compress messages so we handle this here
69fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::reject::Rejection> + Clone
70{
71    warp::any()
72        .and(warp::header::optional::<String>("Content-Encoding"))
73        .and(warp::header("X-Amz-Firehose-Request-Id"))
74        .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
75        .and(warp::body::bytes())
76        .and_then(
77            |encoding: Option<String>,
78             request_id: String,
79             access_key: Option<String>,
80             body: Bytes| async move {
81                match encoding {
82                    Some(s) if s == "gzip" => {
83                        Ok(Box::new(MultiGzDecoder::new(body.reader())) as Box<dyn io::Read>)
84                    }
85                    Some(s) => Err(warp::reject::Rejection::from(
86                        RequestError::UnsupportedEncoding {
87                            encoding: s,
88                            request_id: request_id.clone(),
89                        },
90                    )),
91                    None => Ok(Box::new(body.reader()) as Box<dyn io::Read>),
92                }
93                .and_then(|r| {
94                    serde_json::from_reader(r)
95                        .context(ParseSnafu {
96                            request_id: request_id.clone(),
97                        })
98                        .map(|request: FirehoseRequest| FirehoseRequest {
99                            access_key,
100                            ..request
101                        })
102                        .map_err(warp::reject::custom)
103                })
104            },
105        )
106}
107
108fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
109    warp::any()
110        .and(warp::header::optional("X-Amz-Firehose-Request-Id"))
111        .and(warp::header::optional("X-Amz-Firehose-Source-Arn"))
112        .map(|request_id: Option<String>, source_arn: Option<String>| {
113            emit!(AwsKinesisFirehoseRequestReceived {
114                request_id: request_id.as_deref(),
115                source_arn: source_arn.as_deref(),
116            });
117        })
118        .untuple_one()
119}
120
121/// If there is a configured access key, validate that the request key matches it
122fn authenticate(
123    configured_access_keys: Vec<String>,
124) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
125    warp::any()
126        .and(warp::header("X-Amz-Firehose-Request-Id"))
127        .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
128        .and_then(move |request_id: String, access_key: Option<String>| {
129            let configured_access_keys = configured_access_keys.clone();
130
131            async move {
132                match (access_key, configured_access_keys.is_empty()) {
133                    // No configured access keys
134                    (_, true) => Ok(()),
135                    // Passed access key is present in configured access keys
136                    (Some(access_key), false) if configured_access_keys.contains(&access_key) => {
137                        Ok(())
138                    }
139                    // No configured access keys, but passed with the request
140                    (Some(_), false) => Err(warp::reject::custom(RequestError::AccessKeyInvalid {
141                        request_id,
142                    })),
143                    // Access keys are configured, but missing from the request
144                    (None, false) => Err(warp::reject::custom(RequestError::AccessKeyMissing {
145                        request_id,
146                    })),
147                }
148            }
149        })
150        .untuple_one()
151}
152
153/// Maps RequestError and warp errors to AWS Kinesis Firehose response structure
154async fn handle_firehose_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
155    let request_id: Option<&str>;
156    let message: String;
157    let code: StatusCode;
158
159    if let Some(e) = err.find::<RequestError>() {
160        message = e.to_string();
161        code = e.status();
162        request_id = e.request_id();
163    } else if let Some(e) = err.find::<warp::reject::MissingHeader>() {
164        code = StatusCode::BAD_REQUEST;
165        message = format!("Required header missing: {}", e.name());
166        request_id = None;
167    } else {
168        code = StatusCode::INTERNAL_SERVER_ERROR;
169        message = format!("{err:?}");
170        request_id = None;
171    }
172
173    emit!(AwsKinesisFirehoseRequestError::new(
174        code,
175        message.as_str(),
176        request_id
177    ));
178
179    let json = warp::reply::json(&FirehoseResponse {
180        request_id: request_id.unwrap_or_default().to_string(),
181        timestamp: Utc::now(),
182        error_message: Some(message),
183    });
184
185    Ok(warp::reply::with_status(json, code))
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[tokio::test]
193    async fn request_construction() {
194        let parsed = warp::test::request()
195            .header(
196                "x-amzn-trace-id",
197                "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
198            )
199            .header("x-amz-firehose-protocol-version", "1.0")
200            .header(
201                "X-Amz-Firehose-Request-Id",
202                "e17265d6-97af-4938-982e-90d5614c4242",
203            )
204            .header(
205                "x-amz-firehose-source-arn",
206                "arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
207            )
208            .header("x-amz-firehose-access-key", "secret123")
209            .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
210            .header("content-type", "application/json")
211            .header("Content-Encoding", "gzip")
212            .body({
213                let mut gz = flate2::read::GzEncoder::new(
214                    io::Cursor::new(
215                        serde_json::to_vec(&FirehoseRequest {
216                            access_key: None,
217                            request_id: "e17265d6-97af-4938-982e-90d5614c4242".to_owned(),
218                            records: Vec::new(),
219                            timestamp: Utc::now(),
220                        })
221                        .unwrap(),
222                    ),
223                    flate2::Compression::fast(),
224                );
225                let mut buffer = Vec::new();
226                io::Read::read_to_end(&mut gz, &mut buffer).unwrap();
227                buffer
228            })
229            .filter(&parse_body())
230            .await
231            .unwrap();
232
233        assert_eq!(parsed.access_key, Some("secret123".to_owned()));
234    }
235}