vector/sources/aws_kinesis_firehose/
filters.rs

1use std::{convert::Infallible, io};
2
3use bytes::{Buf, Bytes};
4use chrono::Utc;
5use flate2::read::MultiGzDecoder;
6use snafu::ResultExt;
7use vector_lib::{
8    config::LogNamespace,
9    internal_event::{BytesReceived, Protocol},
10};
11use warp::{Filter, http::StatusCode};
12
13use super::{
14    Compression,
15    errors::{ParseSnafu, RequestError},
16    handlers,
17    models::{FirehoseRequest, FirehoseResponse},
18};
19use crate::{
20    SourceSender, codecs,
21    internal_events::{AwsKinesisFirehoseRequestError, AwsKinesisFirehoseRequestReceived},
22};
23
24/// Handles routing of incoming HTTP requests from AWS Kinesis Firehose
25pub fn firehose(
26    access_keys: Vec<String>,
27    store_access_key: bool,
28    record_compression: Compression,
29    decoder: codecs::Decoder,
30    acknowledgements: bool,
31    out: SourceSender,
32    log_namespace: LogNamespace,
33) -> impl Filter<Extract = (impl warp::Reply,), Error = Infallible> + Clone {
34    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
35    let context = handlers::Context {
36        compression: record_compression,
37        store_access_key,
38        decoder,
39        acknowledgements,
40        bytes_received,
41        out,
42        log_namespace,
43    };
44    warp::post()
45        .and(emit_received())
46        .and(authenticate(access_keys))
47        .and(warp::header("X-Amz-Firehose-Request-Id"))
48        .and(warp::header("X-Amz-Firehose-Source-Arn"))
49        .and(
50            warp::header("X-Amz-Firehose-Protocol-Version")
51                .and_then(|version: String| async move {
52                    match version.as_str() {
53                        "1.0" => Ok(()),
54                        _ => Err(warp::reject::custom(
55                            RequestError::UnsupportedProtocolVersion { version },
56                        )),
57                    }
58                })
59                .untuple_one(),
60        )
61        .and(parse_body())
62        .and(warp::any().map(move || context.clone()))
63        .and_then(handlers::firehose)
64        .recover(handle_firehose_rejection)
65}
66
67/// Decode (if needed) and parse request body
68///
69/// Firehose can be configured to gzip compress messages so we handle this here
70fn parse_body() -> impl Filter<Extract = (FirehoseRequest,), Error = warp::reject::Rejection> + Clone
71{
72    warp::any()
73        .and(warp::header::optional::<String>("Content-Encoding"))
74        .and(warp::header("X-Amz-Firehose-Request-Id"))
75        .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
76        .and(warp::body::bytes())
77        .and_then(
78            |encoding: Option<String>,
79             request_id: String,
80             access_key: Option<String>,
81             body: Bytes| async move {
82                match encoding {
83                    Some(s) if s == "gzip" => {
84                        Ok(Box::new(MultiGzDecoder::new(body.reader())) as Box<dyn io::Read>)
85                    }
86                    Some(s) => Err(warp::reject::Rejection::from(
87                        RequestError::UnsupportedEncoding {
88                            encoding: s,
89                            request_id: request_id.clone(),
90                        },
91                    )),
92                    None => Ok(Box::new(body.reader()) as Box<dyn io::Read>),
93                }
94                .and_then(|r| {
95                    serde_json::from_reader(r)
96                        .context(ParseSnafu {
97                            request_id: request_id.clone(),
98                        })
99                        .map(|request: FirehoseRequest| FirehoseRequest {
100                            access_key,
101                            ..request
102                        })
103                        .map_err(warp::reject::custom)
104                })
105            },
106        )
107}
108
109fn emit_received() -> impl Filter<Extract = (), Error = warp::reject::Rejection> + Clone {
110    warp::any()
111        .and(warp::header::optional("X-Amz-Firehose-Request-Id"))
112        .and(warp::header::optional("X-Amz-Firehose-Source-Arn"))
113        .map(|request_id: Option<String>, source_arn: Option<String>| {
114            emit!(AwsKinesisFirehoseRequestReceived {
115                request_id: request_id.as_deref(),
116                source_arn: source_arn.as_deref(),
117            });
118        })
119        .untuple_one()
120}
121
122/// If there is a configured access key, validate that the request key matches it
123fn authenticate(
124    configured_access_keys: Vec<String>,
125) -> impl Filter<Extract = (), Error = warp::Rejection> + Clone {
126    warp::any()
127        .and(warp::header("X-Amz-Firehose-Request-Id"))
128        .and(warp::header::optional("X-Amz-Firehose-Access-Key"))
129        .and_then(move |request_id: String, access_key: Option<String>| {
130            let configured_access_keys = configured_access_keys.clone();
131
132            async move {
133                match (access_key, configured_access_keys.is_empty()) {
134                    // No configured access keys
135                    (_, true) => Ok(()),
136                    // Passed access key is present in configured access keys
137                    (Some(access_key), false) if configured_access_keys.contains(&access_key) => {
138                        Ok(())
139                    }
140                    // No configured access keys, but passed with the request
141                    (Some(_), false) => Err(warp::reject::custom(RequestError::AccessKeyInvalid {
142                        request_id,
143                    })),
144                    // Access keys are configured, but missing from the request
145                    (None, false) => Err(warp::reject::custom(RequestError::AccessKeyMissing {
146                        request_id,
147                    })),
148                }
149            }
150        })
151        .untuple_one()
152}
153
154/// Maps RequestError and warp errors to AWS Kinesis Firehose response structure
155async fn handle_firehose_rejection(err: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
156    let request_id: Option<&str>;
157    let message: String;
158    let code: StatusCode;
159
160    if let Some(e) = err.find::<RequestError>() {
161        message = e.to_string();
162        code = e.status();
163        request_id = e.request_id();
164    } else if let Some(e) = err.find::<warp::reject::MissingHeader>() {
165        code = StatusCode::BAD_REQUEST;
166        message = format!("Required header missing: {}", e.name());
167        request_id = None;
168    } else {
169        code = StatusCode::INTERNAL_SERVER_ERROR;
170        message = format!("{err:?}");
171        request_id = None;
172    }
173
174    emit!(AwsKinesisFirehoseRequestError::new(
175        code,
176        message.as_str(),
177        request_id
178    ));
179
180    let json = warp::reply::json(&FirehoseResponse {
181        request_id: request_id.unwrap_or_default().to_string(),
182        timestamp: Utc::now(),
183        error_message: Some(message),
184    });
185
186    Ok(warp::reply::with_status(json, code))
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[tokio::test]
194    async fn request_construction() {
195        let parsed = warp::test::request()
196            .header(
197                "x-amzn-trace-id",
198                "Root=1-5f5fbf1c-877c68cace58bea222ddbeec",
199            )
200            .header("x-amz-firehose-protocol-version", "1.0")
201            .header(
202                "X-Amz-Firehose-Request-Id",
203                "e17265d6-97af-4938-982e-90d5614c4242",
204            )
205            .header(
206                "x-amz-firehose-source-arn",
207                "arn:aws:firehose:us-east-1:111111111111:deliverystream/test",
208            )
209            .header("x-amz-firehose-access-key", "secret123")
210            .header("user-agent", "Amazon Kinesis Data Firehose Agent/1.0")
211            .header("content-type", "application/json")
212            .header("Content-Encoding", "gzip")
213            .body({
214                let mut gz = flate2::read::GzEncoder::new(
215                    io::Cursor::new(
216                        serde_json::to_vec(&FirehoseRequest {
217                            access_key: None,
218                            request_id: "e17265d6-97af-4938-982e-90d5614c4242".to_owned(),
219                            records: Vec::new(),
220                            timestamp: Utc::now(),
221                        })
222                        .unwrap(),
223                    ),
224                    flate2::Compression::fast(),
225                );
226                let mut buffer = Vec::new();
227                io::Read::read_to_end(&mut gz, &mut buffer).unwrap();
228                buffer
229            })
230            .filter(&parse_body())
231            .await
232            .unwrap();
233
234        assert_eq!(parsed.access_key, Some("secret123".to_owned()));
235    }
236}