vector/sources/aws_kinesis_firehose/
errors.rs

1use snafu::Snafu;
2use warp::http::StatusCode;
3
4use super::handlers::RecordDecodeError;
5
6#[derive(Debug, Snafu)]
7#[snafu(visibility(pub(crate)))]
8pub enum RequestError {
9    #[snafu(display(
10        "Missing access key. X-Amz-Firehose-Access-Key required for request: {}",
11        request_id
12    ))]
13    AccessKeyMissing { request_id: String },
14    #[snafu(display(
15        "Invalid access key. X-Amz-Firehose-Access-Key does not match configured access_key for request: {}",
16        request_id
17    ))]
18    AccessKeyInvalid { request_id: String },
19    #[snafu(display("Could not parse incoming request {}: {}", request_id, source))]
20    Parse {
21        source: serde_json::error::Error,
22        request_id: String,
23    },
24    #[snafu(display(
25        "Could not parse records from incoming request {}: {}",
26        request_id,
27        source
28    ))]
29    ParseRecords {
30        source: RecordDecodeError,
31        request_id: String,
32    },
33    #[snafu(display("Could not decode record for request {}: {}", request_id, source))]
34    Decode {
35        source: std::io::Error,
36        request_id: String,
37    },
38    #[snafu(display(
39        "Could not forward events for request {}, downstream is closed: {}",
40        request_id,
41        source
42    ))]
43    ShuttingDown {
44        source: crate::source_sender::ClosedError,
45        request_id: String,
46    },
47    #[snafu(display("Unsupported encoding: {}", encoding))]
48    UnsupportedEncoding {
49        encoding: String,
50        request_id: String,
51    },
52    #[snafu(display("Unsupported protocol version: {}", version))]
53    UnsupportedProtocolVersion { version: String },
54    #[snafu(display("Delivery errored"))]
55    DeliveryErrored { request_id: String },
56    #[snafu(display("Delivery failed"))]
57    DeliveryFailed { request_id: String },
58}
59
60impl warp::reject::Reject for RequestError {}
61
62impl RequestError {
63    pub const fn status(&self) -> StatusCode {
64        use RequestError::*;
65        match *self {
66            AccessKeyMissing { .. } => StatusCode::UNAUTHORIZED,
67            AccessKeyInvalid { .. } => StatusCode::UNAUTHORIZED,
68            Parse { .. } => StatusCode::UNAUTHORIZED,
69            UnsupportedEncoding { .. } => StatusCode::BAD_REQUEST,
70            ParseRecords { .. } => StatusCode::BAD_REQUEST,
71            Decode { .. } => StatusCode::BAD_REQUEST,
72            ShuttingDown { .. } => StatusCode::SERVICE_UNAVAILABLE,
73            UnsupportedProtocolVersion { .. } => StatusCode::BAD_REQUEST,
74            DeliveryErrored { .. } => StatusCode::INTERNAL_SERVER_ERROR,
75            DeliveryFailed { .. } => StatusCode::NOT_ACCEPTABLE,
76        }
77    }
78
79    #[allow(clippy::missing_const_for_fn)] // Adding `const` results in https://doc.rust-lang.org/error_codes/E0015.html
80    pub fn request_id(&self) -> Option<&str> {
81        use RequestError::*;
82        match *self {
83            AccessKeyMissing { ref request_id, .. } => Some(request_id),
84            AccessKeyInvalid { ref request_id, .. } => Some(request_id),
85            Parse { ref request_id, .. } => Some(request_id),
86            UnsupportedEncoding { ref request_id, .. } => Some(request_id),
87            ParseRecords { ref request_id, .. } => Some(request_id),
88            Decode { ref request_id, .. } => Some(request_id),
89            ShuttingDown { ref request_id, .. } => Some(request_id),
90            UnsupportedProtocolVersion { .. } => None,
91            DeliveryErrored { ref request_id } => Some(request_id),
92            DeliveryFailed { ref request_id } => Some(request_id),
93        }
94    }
95}