vector/sources/aws_kinesis_firehose/
handlers.rs1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use vector_common::constants::GZIP_MAGIC;
10use vector_lib::{
11 EstimatedJsonEncodedSizeOf,
12 codecs::{DecoderFramedRead, StreamDecodingError},
13 config::{LegacyKey, LogNamespace},
14 event::BatchNotifier,
15 finalization::AddBatchNotifier,
16 internal_event::{
17 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
18 },
19 lookup::{PathPrefix, metadata_path, path},
20 source_sender::SendError,
21};
22use vrl::compiler::SecretTarget;
23use warp::reject;
24
25use super::{
26 Compression,
27 errors::{ParseRecordsSnafu, RequestError},
28 models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
29};
30use crate::{
31 SourceSender,
32 codecs::Decoder,
33 config::log_schema,
34 event::{BatchStatus, Event},
35 internal_events::{
36 AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
37 },
38 sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
39};
40
41#[derive(Clone)]
42pub(super) struct Context {
43 pub(super) compression: Compression,
44 pub(super) store_access_key: bool,
45 pub(super) decoder: Decoder,
46 pub(super) acknowledgements: bool,
47 pub(super) bytes_received: Registered<BytesReceived>,
48 pub(super) out: SourceSender,
49 pub(super) log_namespace: LogNamespace,
50}
51
52pub(super) async fn firehose(
54 request_id: String,
55 source_arn: String,
56 request: FirehoseRequest,
57 mut context: Context,
58) -> Result<impl warp::Reply, reject::Rejection> {
59 let log_namespace = context.log_namespace;
60 let events_received = register!(EventsReceived);
61
62 for record in request.records {
63 let bytes = decode_record(&record, context.compression)
64 .with_context(|_| ParseRecordsSnafu {
65 request_id: request_id.clone(),
66 })
67 .map_err(reject::custom)?;
68 context.bytes_received.emit(ByteSize(bytes.len()));
69
70 let mut stream = DecoderFramedRead::new(bytes.as_ref(), context.decoder.clone());
71 loop {
72 match stream.next().await {
73 Some(Ok((mut events, _byte_size))) => {
74 events_received.emit(CountByteSize(
75 events.len(),
76 events.estimated_json_encoded_size_of(),
77 ));
78
79 let (batch, receiver) = if context.acknowledgements {
80 {
81 let (batch, receiver) = BatchNotifier::new_with_receiver();
82 (Some(batch), Some(receiver))
83 }
84 } else {
85 (None, None)
86 };
87
88 let now = Utc::now();
89 for event in &mut events {
90 if let Some(batch) = &batch {
91 event.add_batch_notifier(batch.clone());
92 }
93 if let Event::Log(log) = event {
94 log_namespace.insert_vector_metadata(
95 log,
96 log_schema().source_type_key(),
97 path!("source_type"),
98 Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
99 );
100 match log_namespace {
103 LogNamespace::Vector => {
104 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
105 log.insert(
106 metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
107 request.timestamp,
108 );
109 }
110 LogNamespace::Legacy => {
111 if let Some(timestamp_key) = log_schema().timestamp_key() {
112 log.try_insert(
113 (PathPrefix::Event, timestamp_key),
114 request.timestamp,
115 );
116 }
117 }
118 };
119
120 log_namespace.insert_source_metadata(
121 AwsKinesisFirehoseConfig::NAME,
122 log,
123 Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
124 path!("request_id"),
125 request_id.to_owned(),
126 );
127 log_namespace.insert_source_metadata(
128 AwsKinesisFirehoseConfig::NAME,
129 log,
130 Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
131 path!("source_arn"),
132 source_arn.to_owned(),
133 );
134
135 if context.store_access_key
136 && let Some(access_key) = &request.access_key
137 {
138 log.metadata_mut()
139 .secrets_mut()
140 .insert_secret("aws_kinesis_firehose_access_key", access_key);
141 }
142 }
143 }
144
145 let count = events.len();
146 match context.out.send_batch(events).await {
147 Ok(()) => (),
148 Err(SendError::Closed) => {
149 emit!(StreamClosedError { count });
150 let error = RequestError::ShuttingDown {
151 request_id: request_id.clone(),
152 };
153 warp::reject::custom(error);
154 }
155 Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
156 }
157
158 drop(batch);
159 if let Some(receiver) = receiver {
160 match receiver.await {
161 BatchStatus::Delivered => Ok(()),
162 BatchStatus::Rejected => {
163 Err(warp::reject::custom(RequestError::DeliveryFailed {
164 request_id: request_id.clone(),
165 }))
166 }
167 BatchStatus::Errored => {
168 Err(warp::reject::custom(RequestError::DeliveryErrored {
169 request_id: request_id.clone(),
170 }))
171 }
172 }?;
173 }
174 }
175 Some(Err(error)) => {
176 if !error.can_continue() {
179 break;
180 }
181 }
182 None => break,
183 }
184 }
185 }
186
187 Ok(warp::reply::json(&FirehoseResponse {
188 request_id: request_id.clone(),
189 timestamp: Utc::now(),
190 error_message: None,
191 }))
192}
193
194#[derive(Debug, Snafu)]
195pub enum RecordDecodeError {
196 #[snafu(display("Could not base64 decode request data: {}", source))]
197 Base64 { source: base64::DecodeError },
198 #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
199 Decompression {
200 source: std::io::Error,
201 compression: Compression,
202 },
203}
204
205fn decode_record(
207 record: &EncodedFirehoseRecord,
208 compression: Compression,
209) -> Result<Bytes, RecordDecodeError> {
210 let buf = BASE64_STANDARD
211 .decode(record.data.as_bytes())
212 .context(Base64Snafu {})?;
213
214 if buf.is_empty() {
215 return Ok(Bytes::default());
216 }
217
218 match compression {
219 Compression::None => Ok(Bytes::from(buf)),
220 Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
221 compression: compression.to_owned(),
222 }),
223 Compression::Auto => {
224 if is_gzip(&buf) {
225 decode_gzip(&buf[..]).or_else(|error| {
226 emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
227 compression: Compression::Gzip,
228 error
229 });
230 Ok(Bytes::from(buf))
231 })
232 } else {
233 Ok(Bytes::from(buf))
235 }
236 }
237 }
238}
239
240fn is_gzip(data: &[u8]) -> bool {
241 data.starts_with(GZIP_MAGIC)
248}
249
250fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
251 let mut decoded = Vec::new();
252
253 let mut gz = MultiGzDecoder::new(data);
254 gz.read_to_end(&mut decoded)?;
255
256 Ok(Bytes::from(decoded))
257}
258
259#[cfg(test)]
260mod tests {
261 use std::io::Write as _;
262
263 use flate2::{Compression, write::GzEncoder};
264
265 use super::*;
266
267 const CONTENT: &[u8] = b"Example";
268
269 #[test]
270 fn correctly_detects_gzipped_content() {
271 assert!(!is_gzip(CONTENT));
272 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
273 encoder.write_all(CONTENT).unwrap();
274 let compressed = encoder.finish().unwrap();
275 assert!(is_gzip(&compressed));
276 }
277}