vector/sources/aws_kinesis_firehose/
handlers.rs

1use std::io::Read;
2
3use base64::prelude::{Engine as _, BASE64_STANDARD};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::codecs::StreamDecodingError;
12use vector_lib::lookup::{metadata_path, path, PathPrefix};
13use vector_lib::{
14    config::{LegacyKey, LogNamespace},
15    event::BatchNotifier,
16    EstimatedJsonEncodedSizeOf,
17};
18use vector_lib::{
19    finalization::AddBatchNotifier,
20    internal_event::{
21        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
22    },
23};
24use vrl::compiler::SecretTarget;
25use warp::reject;
26
27use super::{
28    errors::{ParseRecordsSnafu, RequestError},
29    models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
30    Compression,
31};
32use crate::{
33    codecs::Decoder,
34    config::log_schema,
35    event::{BatchStatus, Event},
36    internal_events::{
37        AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
38    },
39    sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
40    SourceSender,
41};
42
43#[derive(Clone)]
44pub(super) struct Context {
45    pub(super) compression: Compression,
46    pub(super) store_access_key: bool,
47    pub(super) decoder: Decoder,
48    pub(super) acknowledgements: bool,
49    pub(super) bytes_received: Registered<BytesReceived>,
50    pub(super) out: SourceSender,
51    pub(super) log_namespace: LogNamespace,
52}
53
54/// Publishes decoded events from the FirehoseRequest to the pipeline
55pub(super) async fn firehose(
56    request_id: String,
57    source_arn: String,
58    request: FirehoseRequest,
59    mut context: Context,
60) -> Result<impl warp::Reply, reject::Rejection> {
61    let log_namespace = context.log_namespace;
62    let events_received = register!(EventsReceived);
63
64    for record in request.records {
65        let bytes = decode_record(&record, context.compression)
66            .with_context(|_| ParseRecordsSnafu {
67                request_id: request_id.clone(),
68            })
69            .map_err(reject::custom)?;
70        context.bytes_received.emit(ByteSize(bytes.len()));
71
72        let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
73        loop {
74            match stream.next().await {
75                Some(Ok((mut events, _byte_size))) => {
76                    events_received.emit(CountByteSize(
77                        events.len(),
78                        events.estimated_json_encoded_size_of(),
79                    ));
80
81                    let (batch, receiver) = if context.acknowledgements {
82                        {
83                            let (batch, receiver) = BatchNotifier::new_with_receiver();
84                            (Some(batch), Some(receiver))
85                        }
86                    } else {
87                        (None, None)
88                    };
89
90                    let now = Utc::now();
91                    for event in &mut events {
92                        if let Some(batch) = &batch {
93                            event.add_batch_notifier(batch.clone());
94                        }
95                        if let Event::Log(log) = event {
96                            log_namespace.insert_vector_metadata(
97                                log,
98                                log_schema().source_type_key(),
99                                path!("source_type"),
100                                Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
101                            );
102                            // This handles the transition from the original timestamp logic. Originally the
103                            // `timestamp_key` was always populated by the `request.timestamp` time.
104                            match log_namespace {
105                                LogNamespace::Vector => {
106                                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
107                                    log.insert(
108                                        metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
109                                        request.timestamp,
110                                    );
111                                }
112                                LogNamespace::Legacy => {
113                                    if let Some(timestamp_key) = log_schema().timestamp_key() {
114                                        log.try_insert(
115                                            (PathPrefix::Event, timestamp_key),
116                                            request.timestamp,
117                                        );
118                                    }
119                                }
120                            };
121
122                            log_namespace.insert_source_metadata(
123                                AwsKinesisFirehoseConfig::NAME,
124                                log,
125                                Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
126                                path!("request_id"),
127                                request_id.to_owned(),
128                            );
129                            log_namespace.insert_source_metadata(
130                                AwsKinesisFirehoseConfig::NAME,
131                                log,
132                                Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
133                                path!("source_arn"),
134                                source_arn.to_owned(),
135                            );
136
137                            if context.store_access_key {
138                                if let Some(access_key) = &request.access_key {
139                                    log.metadata_mut().secrets_mut().insert_secret(
140                                        "aws_kinesis_firehose_access_key",
141                                        access_key,
142                                    );
143                                }
144                            }
145                        }
146                    }
147
148                    let count = events.len();
149                    if let Err(error) = context.out.send_batch(events).await {
150                        emit!(StreamClosedError { count });
151                        let error = RequestError::ShuttingDown {
152                            request_id: request_id.clone(),
153                            source: error,
154                        };
155                        warp::reject::custom(error);
156                    }
157
158                    drop(batch);
159                    if let Some(receiver) = receiver {
160                        match receiver.await {
161                            BatchStatus::Delivered => Ok(()),
162                            BatchStatus::Rejected => {
163                                Err(warp::reject::custom(RequestError::DeliveryFailed {
164                                    request_id: request_id.clone(),
165                                }))
166                            }
167                            BatchStatus::Errored => {
168                                Err(warp::reject::custom(RequestError::DeliveryErrored {
169                                    request_id: request_id.clone(),
170                                }))
171                            }
172                        }?;
173                    }
174                }
175                Some(Err(error)) => {
176                    // Error is logged by `crate::codecs::Decoder`, no further
177                    // handling is needed here.
178                    if !error.can_continue() {
179                        break;
180                    }
181                }
182                None => break,
183            }
184        }
185    }
186
187    Ok(warp::reply::json(&FirehoseResponse {
188        request_id: request_id.clone(),
189        timestamp: Utc::now(),
190        error_message: None,
191    }))
192}
193
194#[derive(Debug, Snafu)]
195pub enum RecordDecodeError {
196    #[snafu(display("Could not base64 decode request data: {}", source))]
197    Base64 { source: base64::DecodeError },
198    #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
199    Decompression {
200        source: std::io::Error,
201        compression: Compression,
202    },
203}
204
205/// Decodes a Firehose record.
206fn decode_record(
207    record: &EncodedFirehoseRecord,
208    compression: Compression,
209) -> Result<Bytes, RecordDecodeError> {
210    let buf = BASE64_STANDARD
211        .decode(record.data.as_bytes())
212        .context(Base64Snafu {})?;
213
214    if buf.is_empty() {
215        return Ok(Bytes::default());
216    }
217
218    match compression {
219        Compression::None => Ok(Bytes::from(buf)),
220        Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
221            compression: compression.to_owned(),
222        }),
223        Compression::Auto => {
224            if is_gzip(&buf) {
225                decode_gzip(&buf[..]).or_else(|error| {
226                    emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
227                        compression: Compression::Gzip,
228                        error
229                    });
230                    Ok(Bytes::from(buf))
231                })
232            } else {
233                // only support gzip for now
234                Ok(Bytes::from(buf))
235            }
236        }
237    }
238}
239
240fn is_gzip(data: &[u8]) -> bool {
241    // The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from
242    // the GZIP file format specification, which is the fixed member header identification bytes.
243    // The third byte is the compression method, of which only one is defined which is 8 for the
244    // deflate algorithm.
245    //
246    // Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3
247    data.starts_with(GZIP_MAGIC)
248}
249
250fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
251    let mut decoded = Vec::new();
252
253    let mut gz = MultiGzDecoder::new(data);
254    gz.read_to_end(&mut decoded)?;
255
256    Ok(Bytes::from(decoded))
257}
258
259#[cfg(test)]
260mod tests {
261    use flate2::{write::GzEncoder, Compression};
262    use std::io::Write as _;
263
264    use super::*;
265
266    const CONTENT: &[u8] = b"Example";
267
268    #[test]
269    fn correctly_detects_gzipped_content() {
270        assert!(!is_gzip(CONTENT));
271        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
272        encoder.write_all(CONTENT).unwrap();
273        let compressed = encoder.finish().unwrap();
274        assert!(is_gzip(&compressed));
275    }
276}