vector/sources/aws_kinesis_firehose/
handlers.rs1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::StreamDecodingError,
14 config::{LegacyKey, LogNamespace},
15 event::BatchNotifier,
16 finalization::AddBatchNotifier,
17 internal_event::{
18 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19 },
20 lookup::{PathPrefix, metadata_path, path},
21};
22use vrl::compiler::SecretTarget;
23use warp::reject;
24
25use super::{
26 Compression,
27 errors::{ParseRecordsSnafu, RequestError},
28 models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
29};
30use crate::{
31 SourceSender,
32 codecs::Decoder,
33 config::log_schema,
34 event::{BatchStatus, Event},
35 internal_events::{
36 AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
37 },
38 sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
39};
40
41#[derive(Clone)]
42pub(super) struct Context {
43 pub(super) compression: Compression,
44 pub(super) store_access_key: bool,
45 pub(super) decoder: Decoder,
46 pub(super) acknowledgements: bool,
47 pub(super) bytes_received: Registered<BytesReceived>,
48 pub(super) out: SourceSender,
49 pub(super) log_namespace: LogNamespace,
50}
51
52pub(super) async fn firehose(
54 request_id: String,
55 source_arn: String,
56 request: FirehoseRequest,
57 mut context: Context,
58) -> Result<impl warp::Reply, reject::Rejection> {
59 let log_namespace = context.log_namespace;
60 let events_received = register!(EventsReceived);
61
62 for record in request.records {
63 let bytes = decode_record(&record, context.compression)
64 .with_context(|_| ParseRecordsSnafu {
65 request_id: request_id.clone(),
66 })
67 .map_err(reject::custom)?;
68 context.bytes_received.emit(ByteSize(bytes.len()));
69
70 let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
71 loop {
72 match stream.next().await {
73 Some(Ok((mut events, _byte_size))) => {
74 events_received.emit(CountByteSize(
75 events.len(),
76 events.estimated_json_encoded_size_of(),
77 ));
78
79 let (batch, receiver) = if context.acknowledgements {
80 {
81 let (batch, receiver) = BatchNotifier::new_with_receiver();
82 (Some(batch), Some(receiver))
83 }
84 } else {
85 (None, None)
86 };
87
88 let now = Utc::now();
89 for event in &mut events {
90 if let Some(batch) = &batch {
91 event.add_batch_notifier(batch.clone());
92 }
93 if let Event::Log(log) = event {
94 log_namespace.insert_vector_metadata(
95 log,
96 log_schema().source_type_key(),
97 path!("source_type"),
98 Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
99 );
100 match log_namespace {
103 LogNamespace::Vector => {
104 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
105 log.insert(
106 metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
107 request.timestamp,
108 );
109 }
110 LogNamespace::Legacy => {
111 if let Some(timestamp_key) = log_schema().timestamp_key() {
112 log.try_insert(
113 (PathPrefix::Event, timestamp_key),
114 request.timestamp,
115 );
116 }
117 }
118 };
119
120 log_namespace.insert_source_metadata(
121 AwsKinesisFirehoseConfig::NAME,
122 log,
123 Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
124 path!("request_id"),
125 request_id.to_owned(),
126 );
127 log_namespace.insert_source_metadata(
128 AwsKinesisFirehoseConfig::NAME,
129 log,
130 Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
131 path!("source_arn"),
132 source_arn.to_owned(),
133 );
134
135 if context.store_access_key
136 && let Some(access_key) = &request.access_key
137 {
138 log.metadata_mut()
139 .secrets_mut()
140 .insert_secret("aws_kinesis_firehose_access_key", access_key);
141 }
142 }
143 }
144
145 let count = events.len();
146 if let Err(error) = context.out.send_batch(events).await {
147 emit!(StreamClosedError { count });
148 let error = RequestError::ShuttingDown {
149 request_id: request_id.clone(),
150 source: error,
151 };
152 warp::reject::custom(error);
153 }
154
155 drop(batch);
156 if let Some(receiver) = receiver {
157 match receiver.await {
158 BatchStatus::Delivered => Ok(()),
159 BatchStatus::Rejected => {
160 Err(warp::reject::custom(RequestError::DeliveryFailed {
161 request_id: request_id.clone(),
162 }))
163 }
164 BatchStatus::Errored => {
165 Err(warp::reject::custom(RequestError::DeliveryErrored {
166 request_id: request_id.clone(),
167 }))
168 }
169 }?;
170 }
171 }
172 Some(Err(error)) => {
173 if !error.can_continue() {
176 break;
177 }
178 }
179 None => break,
180 }
181 }
182 }
183
184 Ok(warp::reply::json(&FirehoseResponse {
185 request_id: request_id.clone(),
186 timestamp: Utc::now(),
187 error_message: None,
188 }))
189}
190
191#[derive(Debug, Snafu)]
192pub enum RecordDecodeError {
193 #[snafu(display("Could not base64 decode request data: {}", source))]
194 Base64 { source: base64::DecodeError },
195 #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
196 Decompression {
197 source: std::io::Error,
198 compression: Compression,
199 },
200}
201
202fn decode_record(
204 record: &EncodedFirehoseRecord,
205 compression: Compression,
206) -> Result<Bytes, RecordDecodeError> {
207 let buf = BASE64_STANDARD
208 .decode(record.data.as_bytes())
209 .context(Base64Snafu {})?;
210
211 if buf.is_empty() {
212 return Ok(Bytes::default());
213 }
214
215 match compression {
216 Compression::None => Ok(Bytes::from(buf)),
217 Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
218 compression: compression.to_owned(),
219 }),
220 Compression::Auto => {
221 if is_gzip(&buf) {
222 decode_gzip(&buf[..]).or_else(|error| {
223 emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
224 compression: Compression::Gzip,
225 error
226 });
227 Ok(Bytes::from(buf))
228 })
229 } else {
230 Ok(Bytes::from(buf))
232 }
233 }
234 }
235}
236
237fn is_gzip(data: &[u8]) -> bool {
238 data.starts_with(GZIP_MAGIC)
245}
246
247fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
248 let mut decoded = Vec::new();
249
250 let mut gz = MultiGzDecoder::new(data);
251 gz.read_to_end(&mut decoded)?;
252
253 Ok(Bytes::from(decoded))
254}
255
256#[cfg(test)]
257mod tests {
258 use std::io::Write as _;
259
260 use flate2::{Compression, write::GzEncoder};
261
262 use super::*;
263
264 const CONTENT: &[u8] = b"Example";
265
266 #[test]
267 fn correctly_detects_gzipped_content() {
268 assert!(!is_gzip(CONTENT));
269 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
270 encoder.write_all(CONTENT).unwrap();
271 let compressed = encoder.finish().unwrap();
272 assert!(is_gzip(&compressed));
273 }
274}