vector/sinks/aws_kinesis/firehose/
record.rs1use aws_sdk_firehose::operation::put_record_batch::PutRecordBatchOutput;
2use aws_smithy_runtime_api::client::result::SdkError;
3use aws_smithy_types::Blob;
4use bytes::Bytes;
5use tracing::Instrument;
6
7use crate::sinks::prelude::*;
8
9use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
10
11#[derive(Clone)]
12pub struct KinesisFirehoseRecord {
13 pub record: KinesisRecord,
14}
15
16impl Record for KinesisFirehoseRecord {
17 type T = KinesisRecord;
18
19 fn new(payload_bytes: &Bytes, _partition_key: &str) -> Self {
20 Self {
21 record: KinesisRecord::builder()
22 .data(Blob::new(&payload_bytes[..]))
23 .build()
24 .expect("all builder records specified"),
25 }
26 }
27
28 fn encoded_length(&self) -> usize {
29 let data_len = self.record.data.as_ref().len();
30 data_len.div_ceil(3) * 4 + 3
32 }
33
34 fn get(self) -> Self::T {
35 self.record
36 }
37}
38
39#[derive(Clone)]
40pub struct KinesisFirehoseClient {
41 pub client: KinesisClient,
42}
43
44impl SendRecord for KinesisFirehoseClient {
45 type T = KinesisRecord;
46 type E = KinesisError;
47
48 async fn send(
49 &self,
50 records: Vec<Self::T>,
51 stream_name: String,
52 ) -> Result<
53 KinesisResponse,
54 SdkError<Self::E, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
55 > {
56 let rec_count = records.len();
57 let total_size = records
58 .iter()
59 .fold(0, |acc, record| acc + record.data().as_ref().len());
60 self.client
61 .put_record_batch()
62 .set_records(Some(records))
63 .delivery_stream_name(stream_name)
64 .send()
65 .instrument(info_span!("request").or_current())
66 .await
67 .map(|output: PutRecordBatchOutput| KinesisResponse {
68 failure_count: output.failed_put_count() as usize,
69 events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(),
70 })
71 }
72}