vector/sources/aws_kinesis_firehose/
handlers.rs

1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::StreamDecodingError,
14    config::{LegacyKey, LogNamespace},
15    event::BatchNotifier,
16    finalization::AddBatchNotifier,
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    lookup::{PathPrefix, metadata_path, path},
21    source_sender::SendError,
22};
23use vrl::compiler::SecretTarget;
24use warp::reject;
25
26use super::{
27    Compression,
28    errors::{ParseRecordsSnafu, RequestError},
29    models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
30};
31use crate::{
32    SourceSender,
33    codecs::Decoder,
34    config::log_schema,
35    event::{BatchStatus, Event},
36    internal_events::{
37        AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
38    },
39    sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
40};
41
42#[derive(Clone)]
43pub(super) struct Context {
44    pub(super) compression: Compression,
45    pub(super) store_access_key: bool,
46    pub(super) decoder: Decoder,
47    pub(super) acknowledgements: bool,
48    pub(super) bytes_received: Registered<BytesReceived>,
49    pub(super) out: SourceSender,
50    pub(super) log_namespace: LogNamespace,
51}
52
53/// Publishes decoded events from the FirehoseRequest to the pipeline
54pub(super) async fn firehose(
55    request_id: String,
56    source_arn: String,
57    request: FirehoseRequest,
58    mut context: Context,
59) -> Result<impl warp::Reply, reject::Rejection> {
60    let log_namespace = context.log_namespace;
61    let events_received = register!(EventsReceived);
62
63    for record in request.records {
64        let bytes = decode_record(&record, context.compression)
65            .with_context(|_| ParseRecordsSnafu {
66                request_id: request_id.clone(),
67            })
68            .map_err(reject::custom)?;
69        context.bytes_received.emit(ByteSize(bytes.len()));
70
71        let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
72        loop {
73            match stream.next().await {
74                Some(Ok((mut events, _byte_size))) => {
75                    events_received.emit(CountByteSize(
76                        events.len(),
77                        events.estimated_json_encoded_size_of(),
78                    ));
79
80                    let (batch, receiver) = if context.acknowledgements {
81                        {
82                            let (batch, receiver) = BatchNotifier::new_with_receiver();
83                            (Some(batch), Some(receiver))
84                        }
85                    } else {
86                        (None, None)
87                    };
88
89                    let now = Utc::now();
90                    for event in &mut events {
91                        if let Some(batch) = &batch {
92                            event.add_batch_notifier(batch.clone());
93                        }
94                        if let Event::Log(log) = event {
95                            log_namespace.insert_vector_metadata(
96                                log,
97                                log_schema().source_type_key(),
98                                path!("source_type"),
99                                Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
100                            );
101                            // This handles the transition from the original timestamp logic. Originally the
102                            // `timestamp_key` was always populated by the `request.timestamp` time.
103                            match log_namespace {
104                                LogNamespace::Vector => {
105                                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
106                                    log.insert(
107                                        metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
108                                        request.timestamp,
109                                    );
110                                }
111                                LogNamespace::Legacy => {
112                                    if let Some(timestamp_key) = log_schema().timestamp_key() {
113                                        log.try_insert(
114                                            (PathPrefix::Event, timestamp_key),
115                                            request.timestamp,
116                                        );
117                                    }
118                                }
119                            };
120
121                            log_namespace.insert_source_metadata(
122                                AwsKinesisFirehoseConfig::NAME,
123                                log,
124                                Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
125                                path!("request_id"),
126                                request_id.to_owned(),
127                            );
128                            log_namespace.insert_source_metadata(
129                                AwsKinesisFirehoseConfig::NAME,
130                                log,
131                                Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
132                                path!("source_arn"),
133                                source_arn.to_owned(),
134                            );
135
136                            if context.store_access_key
137                                && let Some(access_key) = &request.access_key
138                            {
139                                log.metadata_mut()
140                                    .secrets_mut()
141                                    .insert_secret("aws_kinesis_firehose_access_key", access_key);
142                            }
143                        }
144                    }
145
146                    let count = events.len();
147                    match context.out.send_batch(events).await {
148                        Ok(()) => (),
149                        Err(SendError::Closed) => {
150                            emit!(StreamClosedError { count });
151                            let error = RequestError::ShuttingDown {
152                                request_id: request_id.clone(),
153                            };
154                            warp::reject::custom(error);
155                        }
156                        Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
157                    }
158
159                    drop(batch);
160                    if let Some(receiver) = receiver {
161                        match receiver.await {
162                            BatchStatus::Delivered => Ok(()),
163                            BatchStatus::Rejected => {
164                                Err(warp::reject::custom(RequestError::DeliveryFailed {
165                                    request_id: request_id.clone(),
166                                }))
167                            }
168                            BatchStatus::Errored => {
169                                Err(warp::reject::custom(RequestError::DeliveryErrored {
170                                    request_id: request_id.clone(),
171                                }))
172                            }
173                        }?;
174                    }
175                }
176                Some(Err(error)) => {
177                    // Error is logged by `crate::codecs::Decoder`, no further
178                    // handling is needed here.
179                    if !error.can_continue() {
180                        break;
181                    }
182                }
183                None => break,
184            }
185        }
186    }
187
188    Ok(warp::reply::json(&FirehoseResponse {
189        request_id: request_id.clone(),
190        timestamp: Utc::now(),
191        error_message: None,
192    }))
193}
194
195#[derive(Debug, Snafu)]
196pub enum RecordDecodeError {
197    #[snafu(display("Could not base64 decode request data: {}", source))]
198    Base64 { source: base64::DecodeError },
199    #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
200    Decompression {
201        source: std::io::Error,
202        compression: Compression,
203    },
204}
205
206/// Decodes a Firehose record.
207fn decode_record(
208    record: &EncodedFirehoseRecord,
209    compression: Compression,
210) -> Result<Bytes, RecordDecodeError> {
211    let buf = BASE64_STANDARD
212        .decode(record.data.as_bytes())
213        .context(Base64Snafu {})?;
214
215    if buf.is_empty() {
216        return Ok(Bytes::default());
217    }
218
219    match compression {
220        Compression::None => Ok(Bytes::from(buf)),
221        Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
222            compression: compression.to_owned(),
223        }),
224        Compression::Auto => {
225            if is_gzip(&buf) {
226                decode_gzip(&buf[..]).or_else(|error| {
227                    emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
228                        compression: Compression::Gzip,
229                        error
230                    });
231                    Ok(Bytes::from(buf))
232                })
233            } else {
234                // only support gzip for now
235                Ok(Bytes::from(buf))
236            }
237        }
238    }
239}
240
241fn is_gzip(data: &[u8]) -> bool {
242    // The header length of a GZIP file is 10 bytes. The first two bytes of the constant comes from
243    // the GZIP file format specification, which is the fixed member header identification bytes.
244    // The third byte is the compression method, of which only one is defined which is 8 for the
245    // deflate algorithm.
246    //
247    // Reference: https://datatracker.ietf.org/doc/html/rfc1952 Section 2.3
248    data.starts_with(GZIP_MAGIC)
249}
250
251fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
252    let mut decoded = Vec::new();
253
254    let mut gz = MultiGzDecoder::new(data);
255    gz.read_to_end(&mut decoded)?;
256
257    Ok(Bytes::from(decoded))
258}
259
260#[cfg(test)]
261mod tests {
262    use std::io::Write as _;
263
264    use flate2::{Compression, write::GzEncoder};
265
266    use super::*;
267
268    const CONTENT: &[u8] = b"Example";
269
270    #[test]
271    fn correctly_detects_gzipped_content() {
272        assert!(!is_gzip(CONTENT));
273        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
274        encoder.write_all(CONTENT).unwrap();
275        let compressed = encoder.finish().unwrap();
276        assert!(is_gzip(&compressed));
277    }
278}