vector/sinks/aws_kinesis/firehose/
record.rs

1use aws_sdk_firehose::operation::put_record_batch::PutRecordBatchOutput;
2use aws_smithy_runtime_api::client::result::SdkError;
3use aws_smithy_types::Blob;
4use bytes::Bytes;
5use tracing::Instrument;
6
7use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
8use crate::sinks::prelude::*;
9
10#[derive(Clone)]
11pub struct KinesisFirehoseRecord {
12    pub record: KinesisRecord,
13}
14
15impl Record for KinesisFirehoseRecord {
16    type T = KinesisRecord;
17
18    fn new(payload_bytes: &Bytes, _partition_key: &str) -> Self {
19        Self {
20            record: KinesisRecord::builder()
21                .data(Blob::new(&payload_bytes[..]))
22                .build()
23                .expect("all builder records specified"),
24        }
25    }
26
27    fn encoded_length(&self) -> usize {
28        let data_len = self.record.data.as_ref().len();
29        // data is simply base64 encoded, quoted, and comma separated
30        data_len.div_ceil(3) * 4 + 3
31    }
32
33    fn get(self) -> Self::T {
34        self.record
35    }
36}
37
38#[derive(Clone)]
39pub struct KinesisFirehoseClient {
40    pub client: KinesisClient,
41}
42
43impl SendRecord for KinesisFirehoseClient {
44    type T = KinesisRecord;
45    type E = KinesisError;
46
47    async fn send(
48        &self,
49        records: Vec<Self::T>,
50        stream_name: String,
51    ) -> Result<
52        KinesisResponse,
53        SdkError<Self::E, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
54    > {
55        let rec_count = records.len();
56        let total_size = records
57            .iter()
58            .fold(0, |acc, record| acc + record.data().as_ref().len());
59        self.client
60            .put_record_batch()
61            .set_records(Some(records))
62            .delivery_stream_name(stream_name)
63            .send()
64            .instrument(info_span!("request").or_current())
65            .await
66            .map(|output: PutRecordBatchOutput| KinesisResponse {
67                failure_count: output.failed_put_count() as usize,
68                events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(),
69                #[cfg(feature = "sinks-aws_kinesis_streams")]
70                failed_records: vec![], // Firehose doesn't support partial failure retry
71            })
72    }
73}