vector/sources/aws_kinesis_firehose/
handlers.rs1use std::io::Read;
2
3use base64::prelude::{BASE64_STANDARD, Engine as _};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::StreamDecodingError,
14 config::{LegacyKey, LogNamespace},
15 event::BatchNotifier,
16 finalization::AddBatchNotifier,
17 internal_event::{
18 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19 },
20 lookup::{PathPrefix, metadata_path, path},
21 source_sender::SendError,
22};
23use vrl::compiler::SecretTarget;
24use warp::reject;
25
26use super::{
27 Compression,
28 errors::{ParseRecordsSnafu, RequestError},
29 models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
30};
31use crate::{
32 SourceSender,
33 codecs::Decoder,
34 config::log_schema,
35 event::{BatchStatus, Event},
36 internal_events::{
37 AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
38 },
39 sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
40};
41
42#[derive(Clone)]
43pub(super) struct Context {
44 pub(super) compression: Compression,
45 pub(super) store_access_key: bool,
46 pub(super) decoder: Decoder,
47 pub(super) acknowledgements: bool,
48 pub(super) bytes_received: Registered<BytesReceived>,
49 pub(super) out: SourceSender,
50 pub(super) log_namespace: LogNamespace,
51}
52
53pub(super) async fn firehose(
55 request_id: String,
56 source_arn: String,
57 request: FirehoseRequest,
58 mut context: Context,
59) -> Result<impl warp::Reply, reject::Rejection> {
60 let log_namespace = context.log_namespace;
61 let events_received = register!(EventsReceived);
62
63 for record in request.records {
64 let bytes = decode_record(&record, context.compression)
65 .with_context(|_| ParseRecordsSnafu {
66 request_id: request_id.clone(),
67 })
68 .map_err(reject::custom)?;
69 context.bytes_received.emit(ByteSize(bytes.len()));
70
71 let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
72 loop {
73 match stream.next().await {
74 Some(Ok((mut events, _byte_size))) => {
75 events_received.emit(CountByteSize(
76 events.len(),
77 events.estimated_json_encoded_size_of(),
78 ));
79
80 let (batch, receiver) = if context.acknowledgements {
81 {
82 let (batch, receiver) = BatchNotifier::new_with_receiver();
83 (Some(batch), Some(receiver))
84 }
85 } else {
86 (None, None)
87 };
88
89 let now = Utc::now();
90 for event in &mut events {
91 if let Some(batch) = &batch {
92 event.add_batch_notifier(batch.clone());
93 }
94 if let Event::Log(log) = event {
95 log_namespace.insert_vector_metadata(
96 log,
97 log_schema().source_type_key(),
98 path!("source_type"),
99 Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
100 );
101 match log_namespace {
104 LogNamespace::Vector => {
105 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
106 log.insert(
107 metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
108 request.timestamp,
109 );
110 }
111 LogNamespace::Legacy => {
112 if let Some(timestamp_key) = log_schema().timestamp_key() {
113 log.try_insert(
114 (PathPrefix::Event, timestamp_key),
115 request.timestamp,
116 );
117 }
118 }
119 };
120
121 log_namespace.insert_source_metadata(
122 AwsKinesisFirehoseConfig::NAME,
123 log,
124 Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
125 path!("request_id"),
126 request_id.to_owned(),
127 );
128 log_namespace.insert_source_metadata(
129 AwsKinesisFirehoseConfig::NAME,
130 log,
131 Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
132 path!("source_arn"),
133 source_arn.to_owned(),
134 );
135
136 if context.store_access_key
137 && let Some(access_key) = &request.access_key
138 {
139 log.metadata_mut()
140 .secrets_mut()
141 .insert_secret("aws_kinesis_firehose_access_key", access_key);
142 }
143 }
144 }
145
146 let count = events.len();
147 match context.out.send_batch(events).await {
148 Ok(()) => (),
149 Err(SendError::Closed) => {
150 emit!(StreamClosedError { count });
151 let error = RequestError::ShuttingDown {
152 request_id: request_id.clone(),
153 };
154 warp::reject::custom(error);
155 }
156 Err(SendError::Timeout) => unreachable!("No timeout is configured here"),
157 }
158
159 drop(batch);
160 if let Some(receiver) = receiver {
161 match receiver.await {
162 BatchStatus::Delivered => Ok(()),
163 BatchStatus::Rejected => {
164 Err(warp::reject::custom(RequestError::DeliveryFailed {
165 request_id: request_id.clone(),
166 }))
167 }
168 BatchStatus::Errored => {
169 Err(warp::reject::custom(RequestError::DeliveryErrored {
170 request_id: request_id.clone(),
171 }))
172 }
173 }?;
174 }
175 }
176 Some(Err(error)) => {
177 if !error.can_continue() {
180 break;
181 }
182 }
183 None => break,
184 }
185 }
186 }
187
188 Ok(warp::reply::json(&FirehoseResponse {
189 request_id: request_id.clone(),
190 timestamp: Utc::now(),
191 error_message: None,
192 }))
193}
194
195#[derive(Debug, Snafu)]
196pub enum RecordDecodeError {
197 #[snafu(display("Could not base64 decode request data: {}", source))]
198 Base64 { source: base64::DecodeError },
199 #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
200 Decompression {
201 source: std::io::Error,
202 compression: Compression,
203 },
204}
205
206fn decode_record(
208 record: &EncodedFirehoseRecord,
209 compression: Compression,
210) -> Result<Bytes, RecordDecodeError> {
211 let buf = BASE64_STANDARD
212 .decode(record.data.as_bytes())
213 .context(Base64Snafu {})?;
214
215 if buf.is_empty() {
216 return Ok(Bytes::default());
217 }
218
219 match compression {
220 Compression::None => Ok(Bytes::from(buf)),
221 Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
222 compression: compression.to_owned(),
223 }),
224 Compression::Auto => {
225 if is_gzip(&buf) {
226 decode_gzip(&buf[..]).or_else(|error| {
227 emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
228 compression: Compression::Gzip,
229 error
230 });
231 Ok(Bytes::from(buf))
232 })
233 } else {
234 Ok(Bytes::from(buf))
236 }
237 }
238 }
239}
240
241fn is_gzip(data: &[u8]) -> bool {
242 data.starts_with(GZIP_MAGIC)
249}
250
251fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
252 let mut decoded = Vec::new();
253
254 let mut gz = MultiGzDecoder::new(data);
255 gz.read_to_end(&mut decoded)?;
256
257 Ok(Bytes::from(decoded))
258}
259
260#[cfg(test)]
261mod tests {
262 use std::io::Write as _;
263
264 use flate2::{Compression, write::GzEncoder};
265
266 use super::*;
267
268 const CONTENT: &[u8] = b"Example";
269
270 #[test]
271 fn correctly_detects_gzipped_content() {
272 assert!(!is_gzip(CONTENT));
273 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
274 encoder.write_all(CONTENT).unwrap();
275 let compressed = encoder.finish().unwrap();
276 assert!(is_gzip(&compressed));
277 }
278}