vector/sources/aws_kinesis_firehose/
handlers.rs

1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use vector_common::constants::GZIP_MAGIC;
10use vector_lib::{
11    EstimatedJsonEncodedSizeOf,
12    codecs::{DecoderFramedRead, StreamDecodingError},
13    config::{LegacyKey, LogNamespace},
14    event::BatchNotifier,
15    finalization::AddBatchNotifier,
16    internal_event::{
17        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
18    },
19    lookup::{PathPrefix, metadata_path, path},
20    source_sender::SendError,
21};
22use vrl::compiler::SecretTarget;
23use warp::reject;
24
25use super::{
26    Compression,
27    errors::{ParseRecordsSnafu, RequestError},
28    models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
29};
30use crate::{
31    SourceSender,
32    codecs::Decoder,
33    config::log_schema,
34    event::{BatchStatus, Event},
35    internal_events::{
36        AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
37    },
38    sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
39};
40
41#[derive(Clone)]
42pub(super) struct Context {
43    pub(super) compression: Compression,
44    pub(super) store_access_key: bool,
45    pub(super) decoder: Decoder,
46    pub(super) acknowledgements: bool,
47    pub(super) bytes_received: Registered<BytesReceived>,
48    pub(super) out: SourceSender,
49    pub(super) log_namespace: LogNamespace,
50}
51
52/// Publishes decoded events from the FirehoseRequest to the pipeline
53pub(super) async fn firehose(
54    request_id: String,
55    source_arn: String,
56    request: FirehoseRequest,
57    mut context: Context,
58) -> Result<impl warp::Reply, reject::Rejection> {
59    let log_namespace = context.log_namespace;
60    let events_received = register!(EventsReceived);
61
62    for record in request.records {
63        let bytes = decode_record(&record, context.compression)
64            .with_context(|_| ParseRecordsSnafu {
65                request_id: request_id.clone(),
66            })
67            .map_err(reject::custom)?;
68        context.bytes_received.emit(ByteSize(bytes.len()));
69
70        let mut stream = DecoderFramedRead::new(bytes.as_ref(), context.decoder.clone());
71        loop {
72            match stream.next().await {
73                Some(Ok((mut events, _byte_size))) => {
74                    events_received.emit(CountByteSize(
75                        events.len(),
76                        events.estimated_json_encoded_size_of(),
77                    ));
78
79                    let (batch, receiver) = if context.acknowledgements {
80                        {
81                            let (batch, receiver) = BatchNotifier::new_with_receiver();
82                            (Some(batch), Some(receiver))
83                        }
84                    } else {
85                        (None, None)
86                    };
87
88                    let now = Utc::now();
89                    for event in &mut events {
90                        if let Some(batch) = &batch {
91                            event.add_batch_notifier(batch.clone());
92                        }
93                        if let Event::Log(log) = event {
94                            log_namespace.insert_vector_metadata(
95                                log,
96                                log_schema().source_type_key(),
97                                path!("source_type"),
98                                Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
99                            );
100                            // This handles the transition from the original timestamp logic. Originally the
101                            // `timestamp_key` was always populated by the `request.timestamp` time.
102                            match log_namespace {
103                                LogNamespace::Vector => {
104                                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
105                                    log.insert(
106                                        metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
107                                        request.timestamp,
108                                    );
109                                }
110                                LogNamespace::Legacy => {
111                                    if let Some(timestamp_key) = log_schema().timestamp_key() {
112                                        log.try_insert(
113                                            (PathPrefix::Event, timestamp_key),
114                                            request.timestamp,
115                                        );
116                                    }
117                                }
118                            };
119
120                            log_namespace.insert_source_metadata(
121                                AwsKinesisFirehoseConfig::NAME,
122                                log,
123                                Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
124                                path!("request_id"),
125                                request_id.to_owned(),
126                            );
127                            log_namespace.insert_source_metadata(
128                                AwsKinesisFirehoseConfig::NAME,
129                                log,
130                                Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
131                                path!("source_arn"),
132                                source_arn.to_owned(),
133                            );
134
135                            if context.store_access_key
136                                && let Some(access_key) = &request.access_key
137                            {
138                                log.metadata_mut()
139                                    .secrets_mut()
140                                    .insert_secret("aws_kinesis_firehose_access_key", access_key);
141                            }
142                        }
143                    }
144
145                    let count = events.len();
146                    match context.out.send_batch(events).await {
147                        Ok(()) => (),
148                        Err(SendError::Closed) => {
149                            emit!(StreamClosedError { count });
150                            let error = RequestError::ShuttingDown {
151                                request_id: request_id.clone(),
152                            };
153                            warp::reject::custom(error);
154                        }
155                        Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
156                    }
157
158                    drop(batch);
159                    if let Some(receiver) = receiver {
160                        match receiver.await {
161                            BatchStatus::Delivered => Ok(()),
162                            BatchStatus::Rejected => {
163                                Err(warp::reject::custom(RequestError::DeliveryFailed {
164                                    request_id: request_id.clone(),
165                                }))
166                            }
167                            BatchStatus::Errored => {
168                                Err(warp::reject::custom(RequestError::DeliveryErrored {
169                                    request_id: request_id.clone(),
170                                }))
171                            }
172                        }?;
173                    }
174                }
175                Some(Err(error)) => {
176                    // Error is logged by `vector_lib::codecs::Decoder`, no further
177                    // handling is needed here.
178                    if !error.can_continue() {
179                        break;
180                    }
181                }
182                None => break,
183            }
184        }
185    }
186
187    Ok(warp::reply::json(&FirehoseResponse {
188        request_id: request_id.clone(),
189        timestamp: Utc::now(),
190        error_message: None,
191    }))
192}
193
194#[derive(Debug, Snafu)]
195pub enum RecordDecodeError {
196    #[snafu(display("Could not base64 decode request data: {}", source))]
197    Base64 { source: base64::DecodeError },
198    #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
199    Decompression {
200        source: std::io::Error,
201        compression: Compression,
202    },
203}
204
205/// Decodes a Firehose record.
206fn decode_record(
207    record: &EncodedFirehoseRecord,
208    compression: Compression,
209) -> Result<Bytes, RecordDecodeError> {
210    let buf = BASE64_STANDARD
211        .decode(record.data.as_bytes())
212        .context(Base64Snafu {})?;
213
214    if buf.is_empty() {
215        return Ok(Bytes::default());
216    }
217
218    match compression {
219        Compression::None => Ok(Bytes::from(buf)),
220        Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
221            compression: compression.to_owned(),
222        }),
223        Compression::Auto => {
224            if is_gzip(&buf) {
225                decode_gzip(&buf[..]).or_else(|error| {
226                    emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
227                        compression: Compression::Gzip,
228                        error
229                    });
230                    Ok(Bytes::from(buf))
231                })
232            } else {
233                // only support gzip for now
234                Ok(Bytes::from(buf))
235            }
236        }
237    }
238}
239
240fn is_gzip(data: &[u8]) -> bool {
241    // The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from
242    // the GZIP file format specification, which is the fixed member header identification bytes.
243    // The third byte is the compression method, of which only one is defined which is 8 for the
244    // deflate algorithm.
245    //
246    // Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3
247    data.starts_with(GZIP_MAGIC)
248}
249
250fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
251    let mut decoded = Vec::new();
252
253    let mut gz = MultiGzDecoder::new(data);
254    gz.read_to_end(&mut decoded)?;
255
256    Ok(Bytes::from(decoded))
257}
258
259#[cfg(test)]
260mod tests {
261    use std::io::Write as _;
262
263    use flate2::{Compression, write::GzEncoder};
264
265    use super::*;
266
267    const CONTENT: &[u8] = b"Example";
268
269    #[test]
270    fn correctly_detects_gzipped_content() {
271        assert!(!is_gzip(CONTENT));
272        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
273        encoder.write_all(CONTENT).unwrap();
274        let compressed = encoder.finish().unwrap();
275        assert!(is_gzip(&compressed));
276    }
277}