vector/sources/aws_kinesis_firehose/
handlers.rs1use std::io::Read;
2
3use base64::prelude::{Engine as _, BASE64_STANDARD};
4use bytes::Bytes;
5use chrono::Utc;
6use flate2::read::MultiGzDecoder;
7use futures::StreamExt;
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::FramedRead;
10use vector_common::constants::GZIP_MAGIC;
11use vector_lib::codecs::StreamDecodingError;
12use vector_lib::lookup::{metadata_path, path, PathPrefix};
13use vector_lib::{
14 config::{LegacyKey, LogNamespace},
15 event::BatchNotifier,
16 EstimatedJsonEncodedSizeOf,
17};
18use vector_lib::{
19 finalization::AddBatchNotifier,
20 internal_event::{
21 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
22 },
23};
24use vrl::compiler::SecretTarget;
25use warp::reject;
26
27use super::{
28 errors::{ParseRecordsSnafu, RequestError},
29 models::{EncodedFirehoseRecord, FirehoseRequest, FirehoseResponse},
30 Compression,
31};
32use crate::{
33 codecs::Decoder,
34 config::log_schema,
35 event::{BatchStatus, Event},
36 internal_events::{
37 AwsKinesisFirehoseAutomaticRecordDecodeError, EventsReceived, StreamClosedError,
38 },
39 sources::aws_kinesis_firehose::AwsKinesisFirehoseConfig,
40 SourceSender,
41};
42
43#[derive(Clone)]
44pub(super) struct Context {
45 pub(super) compression: Compression,
46 pub(super) store_access_key: bool,
47 pub(super) decoder: Decoder,
48 pub(super) acknowledgements: bool,
49 pub(super) bytes_received: Registered<BytesReceived>,
50 pub(super) out: SourceSender,
51 pub(super) log_namespace: LogNamespace,
52}
53
54pub(super) async fn firehose(
56 request_id: String,
57 source_arn: String,
58 request: FirehoseRequest,
59 mut context: Context,
60) -> Result<impl warp::Reply, reject::Rejection> {
61 let log_namespace = context.log_namespace;
62 let events_received = register!(EventsReceived);
63
64 for record in request.records {
65 let bytes = decode_record(&record, context.compression)
66 .with_context(|_| ParseRecordsSnafu {
67 request_id: request_id.clone(),
68 })
69 .map_err(reject::custom)?;
70 context.bytes_received.emit(ByteSize(bytes.len()));
71
72 let mut stream = FramedRead::new(bytes.as_ref(), context.decoder.clone());
73 loop {
74 match stream.next().await {
75 Some(Ok((mut events, _byte_size))) => {
76 events_received.emit(CountByteSize(
77 events.len(),
78 events.estimated_json_encoded_size_of(),
79 ));
80
81 let (batch, receiver) = if context.acknowledgements {
82 {
83 let (batch, receiver) = BatchNotifier::new_with_receiver();
84 (Some(batch), Some(receiver))
85 }
86 } else {
87 (None, None)
88 };
89
90 let now = Utc::now();
91 for event in &mut events {
92 if let Some(batch) = &batch {
93 event.add_batch_notifier(batch.clone());
94 }
95 if let Event::Log(log) = event {
96 log_namespace.insert_vector_metadata(
97 log,
98 log_schema().source_type_key(),
99 path!("source_type"),
100 Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
101 );
102 match log_namespace {
105 LogNamespace::Vector => {
106 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
107 log.insert(
108 metadata_path!(AwsKinesisFirehoseConfig::NAME, "timestamp"),
109 request.timestamp,
110 );
111 }
112 LogNamespace::Legacy => {
113 if let Some(timestamp_key) = log_schema().timestamp_key() {
114 log.try_insert(
115 (PathPrefix::Event, timestamp_key),
116 request.timestamp,
117 );
118 }
119 }
120 };
121
122 log_namespace.insert_source_metadata(
123 AwsKinesisFirehoseConfig::NAME,
124 log,
125 Some(LegacyKey::InsertIfEmpty(path!("request_id"))),
126 path!("request_id"),
127 request_id.to_owned(),
128 );
129 log_namespace.insert_source_metadata(
130 AwsKinesisFirehoseConfig::NAME,
131 log,
132 Some(LegacyKey::InsertIfEmpty(path!("source_arn"))),
133 path!("source_arn"),
134 source_arn.to_owned(),
135 );
136
137 if context.store_access_key {
138 if let Some(access_key) = &request.access_key {
139 log.metadata_mut().secrets_mut().insert_secret(
140 "aws_kinesis_firehose_access_key",
141 access_key,
142 );
143 }
144 }
145 }
146 }
147
148 let count = events.len();
149 if let Err(error) = context.out.send_batch(events).await {
150 emit!(StreamClosedError { count });
151 let error = RequestError::ShuttingDown {
152 request_id: request_id.clone(),
153 source: error,
154 };
155 warp::reject::custom(error);
156 }
157
158 drop(batch);
159 if let Some(receiver) = receiver {
160 match receiver.await {
161 BatchStatus::Delivered => Ok(()),
162 BatchStatus::Rejected => {
163 Err(warp::reject::custom(RequestError::DeliveryFailed {
164 request_id: request_id.clone(),
165 }))
166 }
167 BatchStatus::Errored => {
168 Err(warp::reject::custom(RequestError::DeliveryErrored {
169 request_id: request_id.clone(),
170 }))
171 }
172 }?;
173 }
174 }
175 Some(Err(error)) => {
176 if !error.can_continue() {
179 break;
180 }
181 }
182 None => break,
183 }
184 }
185 }
186
187 Ok(warp::reply::json(&FirehoseResponse {
188 request_id: request_id.clone(),
189 timestamp: Utc::now(),
190 error_message: None,
191 }))
192}
193
194#[derive(Debug, Snafu)]
195pub enum RecordDecodeError {
196 #[snafu(display("Could not base64 decode request data: {}", source))]
197 Base64 { source: base64::DecodeError },
198 #[snafu(display("Could not decompress request data as {}: {}", compression, source))]
199 Decompression {
200 source: std::io::Error,
201 compression: Compression,
202 },
203}
204
205fn decode_record(
207 record: &EncodedFirehoseRecord,
208 compression: Compression,
209) -> Result<Bytes, RecordDecodeError> {
210 let buf = BASE64_STANDARD
211 .decode(record.data.as_bytes())
212 .context(Base64Snafu {})?;
213
214 if buf.is_empty() {
215 return Ok(Bytes::default());
216 }
217
218 match compression {
219 Compression::None => Ok(Bytes::from(buf)),
220 Compression::Gzip => decode_gzip(&buf[..]).with_context(|_| DecompressionSnafu {
221 compression: compression.to_owned(),
222 }),
223 Compression::Auto => {
224 if is_gzip(&buf) {
225 decode_gzip(&buf[..]).or_else(|error| {
226 emit!(AwsKinesisFirehoseAutomaticRecordDecodeError {
227 compression: Compression::Gzip,
228 error
229 });
230 Ok(Bytes::from(buf))
231 })
232 } else {
233 Ok(Bytes::from(buf))
235 }
236 }
237 }
238}
239
240fn is_gzip(data: &[u8]) -> bool {
241 data.starts_with(GZIP_MAGIC)
248}
249
250fn decode_gzip(data: &[u8]) -> std::io::Result<Bytes> {
251 let mut decoded = Vec::new();
252
253 let mut gz = MultiGzDecoder::new(data);
254 gz.read_to_end(&mut decoded)?;
255
256 Ok(Bytes::from(decoded))
257}
258
259#[cfg(test)]
260mod tests {
261 use flate2::{write::GzEncoder, Compression};
262 use std::io::Write as _;
263
264 use super::*;
265
266 const CONTENT: &[u8] = b"Example";
267
268 #[test]
269 fn correctly_detects_gzipped_content() {
270 assert!(!is_gzip(CONTENT));
271 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
272 encoder.write_all(CONTENT).unwrap();
273 let compressed = encoder.finish().unwrap();
274 assert!(is_gzip(&compressed));
275 }
276}