vector/sources/aws_kinesis_firehose/
handlers.rs

1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::StreamDecodingError,
14    config::{LegacyKey, LogNamespace},
15    event::BatchNotifier,
16    finalization::AddBatchNotifier,
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    lookup::{PathPrefix, metadata_path, path},
21};
22use vrl::compiler::SecretTarget;
23use warp::reject;
24
25use super::{
26    Compression,
27    errors::{ParseRecordsSnafu, RequestError},
28    models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
29};
30use crate::{
31    SourceSender,
32    codecs::Decoder,
33    config::log_schema,
34    event::{BatchStatus, Event},
35    internal_events::{
36        AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
37    },
38    sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
39};
40
41#[derive(Clone)]
42pub(super) struct Context {
43    pub(super) compression: Compression,
44    pub(super) store_access_key: bool,
45    pub(super) decoder: Decoder,
46    pub(super) acknowledgements: bool,
47    pub(super) bytes_received: Registered<BytesReceived>,
48    pub(super) out: SourceSender,
49    pub(super) log_namespace: LogNamespace,
50}
51
52/// Publishes decoded events from the FirehoseRequest to the pipeline
53pub(super) async fn firehose(
54    request_id: String,
55    source_arn: String,
56    request: FirehoseRequest,
57    mut context: Context,
58) -> Result<impl warp::Reply, reject::Rejection> {
59    let log_namespace = context.log_namespace;
60    let events_received = register!(EventsReceived);
61
62    for record in request.records {
63        let bytes = decode_record(&record, context.compression)
64            .with_context(|_| ParseRecordsSnafu {
65                request_id: request_id.clone(),
66            })
67            .map_err(reject::custom)?;
68        context.bytes_received.emit(ByteSize(bytes.len()));
69
70        let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
71        loop {
72            match stream.next().await {
73                Some(Ok((mut events, _byte_size))) => {
74                    events_received.emit(CountByteSize(
75                        events.len(),
76                        events.estimated_json_encoded_size_of(),
77                    ));
78
79                    let (batch, receiver) = if context.acknowledgements {
80                        {
81                            let (batch, receiver) = BatchNotifier::new_with_receiver();
82                            (Some(batch), Some(receiver))
83                        }
84                    } else {
85                        (None, None)
86                    };
87
88                    let now = Utc::now();
89                    for event in &mut events {
90                        if let Some(batch) = &batch {
91                            event.add_batch_notifier(batch.clone());
92                        }
93                        if let Event::Log(log) = event {
94                            log_namespace.insert_vector_metadata(
95                                log,
96                                log_schema().source_type_key(),
97                                path!("source_type"),
98                                Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
99                            );
100                            // This handles the transition from the original timestamp logic. Originally the
101                            // `timestamp_key` was always populated by the `request.timestamp` time.
102                            match log_namespace {
103                                LogNamespace::Vector => {
104                                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
105                                    log.insert(
106                                        metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
107                                        request.timestamp,
108                                    );
109                                }
110                                LogNamespace::Legacy => {
111                                    if let Some(timestamp_key) = log_schema().timestamp_key() {
112                                        log.try_insert(
113                                            (PathPrefix::Event, timestamp_key),
114                                            request.timestamp,
115                                        );
116                                    }
117                                }
118                            };
119
120                            log_namespace.insert_source_metadata(
121                                AwsKinesisFirehoseConfig::NAME,
122                                log,
123                                Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
124                                path!("request_id"),
125                                request_id.to_owned(),
126                            );
127                            log_namespace.insert_source_metadata(
128                                AwsKinesisFirehoseConfig::NAME,
129                                log,
130                                Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
131                                path!("source_arn"),
132                                source_arn.to_owned(),
133                            );
134
135                            if context.store_access_key
136                                && let Some(access_key) = &request.access_key
137                            {
138                                log.metadata_mut()
139                                    .secrets_mut()
140                                    .insert_secret("aws_kinesis_firehose_access_key", access_key);
141                            }
142                        }
143                    }
144
145                    let count = events.len();
146                    if let Err(error) = context.out.send_batch(events).await {
147                        emit!(StreamClosedError { count });
148                        let error = RequestError::ShuttingDown {
149                            request_id: request_id.clone(),
150                            source: error,
151                        };
152                        warp::reject::custom(error);
153                    }
154
155                    drop(batch);
156                    if let Some(receiver) = receiver {
157                        match receiver.await {
158                            BatchStatus::Delivered => Ok(()),
159                            BatchStatus::Rejected => {
160                                Err(warp::reject::custom(RequestError::DeliveryFailed {
161                                    request_id: request_id.clone(),
162                                }))
163                            }
164                            BatchStatus::Errored => {
165                                Err(warp::reject::custom(RequestError::DeliveryErrored {
166                                    request_id: request_id.clone(),
167                                }))
168                            }
169                        }?;
170                    }
171                }
172                Some(Err(error)) => {
173                    // Error is logged by `crate::codecs::Decoder`, no further
174                    // handling is needed here.
175                    if !error.can_continue() {
176                        break;
177                    }
178                }
179                None => break,
180            }
181        }
182    }
183
184    Ok(warp::reply::json(&FirehoseResponse {
185        request_id: request_id.clone(),
186        timestamp: Utc::now(),
187        error_message: None,
188    }))
189}
190
191#[derive(Debug, Snafu)]
192pub enum RecordDecodeError {
193    #[snafu(display("Could not base64 decode request data: {}", source))]
194    Base64 { source: base64::DecodeError },
195    #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
196    Decompression {
197        source: std::io::Error,
198        compression: Compression,
199    },
200}
201
202/// Decodes a Firehose record.
203fn decode_record(
204    record: &EncodedFirehoseRecord,
205    compression: Compression,
206) -> Result<Bytes, RecordDecodeError> {
207    let buf = BASE64_STANDARD
208        .decode(record.data.as_bytes())
209        .context(Base64Snafu {})?;
210
211    if buf.is_empty() {
212        return Ok(Bytes::default());
213    }
214
215    match compression {
216        Compression::None => Ok(Bytes::from(buf)),
217        Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
218            compression: compression.to_owned(),
219        }),
220        Compression::Auto => {
221            if is_gzip(&buf) {
222                decode_gzip(&buf[..]).or_else(|error| {
223                    emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
224                        compression: Compression::Gzip,
225                        error
226                    });
227                    Ok(Bytes::from(buf))
228                })
229            } else {
230                // only support gzip for now
231                Ok(Bytes::from(buf))
232            }
233        }
234    }
235}
236
237fn is_gzip(data: &[u8]) -> bool {
238    // The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from
239    // the GZIP file format specification, which is the fixed member header identification bytes.
240    // The third byte is the compression method, of which only one is defined which is 8 for the
241    // deflate algorithm.
242    //
243    // Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3
244    data.starts_with(GZIP_MAGIC)
245}
246
247fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
248    let mut decoded = Vec::new();
249
250    let mut gz = MultiGzDecoder::new(data);
251    gz.read_to_end(&mut decoded)?;
252
253    Ok(Bytes::from(decoded))
254}
255
256#[cfg(test)]
257mod tests {
258    use std::io::Write as _;
259
260    use flate2::{Compression, write::GzEncoder};
261
262    use super::*;
263
264    const CONTENT: &[u8] = b"Example";
265
266    #[test]
267    fn correctly_detects_gzipped_content() {
268        assert!(!is_gzip(CONTENT));
269        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
270        encoder.write_all(CONTENT).unwrap();
271        let compressed = encoder.finish().unwrap();
272        assert!(is_gzip(&compressed));
273    }
274}