vector/sinks/aws_kinesis/firehose/
record.rs

1use aws_sdk_firehose::operation::put_record_batch::PutRecordBatchOutput;
2use aws_smithy_runtime_api::client::result::SdkError;
3use aws_smithy_types::Blob;
4use bytes::Bytes;
5use tracing::Instrument;
6
7use crate::sinks::prelude::*;
8
9use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
10
11#[derive(Clone)]
12pub struct KinesisFirehoseRecord {
13    pub record: KinesisRecord,
14}
15
16impl Record for KinesisFirehoseRecord {
17    type T = KinesisRecord;
18
19    fn new(payload_bytes: &Bytes, _partition_key: &str) -> Self {
20        Self {
21            record: KinesisRecord::builder()
22                .data(Blob::new(&payload_bytes[..]))
23                .build()
24                .expect("all builder records specified"),
25        }
26    }
27
28    fn encoded_length(&self) -> usize {
29        let data_len = self.record.data.as_ref().len();
30        // data is simply base64 encoded, quoted, and comma separated
31        data_len.div_ceil(3) * 4 + 3
32    }
33
34    fn get(self) -> Self::T {
35        self.record
36    }
37}
38
39#[derive(Clone)]
40pub struct KinesisFirehoseClient {
41    pub client: KinesisClient,
42}
43
44impl SendRecord for KinesisFirehoseClient {
45    type T = KinesisRecord;
46    type E = KinesisError;
47
48    async fn send(
49        &self,
50        records: Vec<Self::T>,
51        stream_name: String,
52    ) -> Result<
53        KinesisResponse,
54        SdkError<Self::E, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
55    > {
56        let rec_count = records.len();
57        let total_size = records
58            .iter()
59            .fold(0, |acc, record| acc + record.data().as_ref().len());
60        self.client
61            .put_record_batch()
62            .set_records(Some(records))
63            .delivery_stream_name(stream_name)
64            .send()
65            .instrument(info_span!("request").or_current())
66            .await
67            .map(|output: PutRecordBatchOutput| KinesisResponse {
68                failure_count: output.failed_put_count() as usize,
69                events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(),
70            })
71    }
72}