vector/sinks/aws_kinesis/firehose/
record.rs1use aws_sdk_firehose::operation::put_record_batch::PutRecordBatchOutput;
2use aws_smithy_runtime_api::client::result::SdkError;
3use aws_smithy_types::Blob;
4use bytes::Bytes;
5use tracing::Instrument;
6
7use super::{KinesisClient, KinesisError, KinesisRecord, KinesisResponse, Record, SendRecord};
8use crate::sinks::prelude::*;
9
10#[derive(Clone)]
11pub struct KinesisFirehoseRecord {
12 pub record: KinesisRecord,
13}
14
15impl Record for KinesisFirehoseRecord {
16 type T = KinesisRecord;
17
18 fn new(payload_bytes: &Bytes, _partition_key: &str) -> Self {
19 Self {
20 record: KinesisRecord::builder()
21 .data(Blob::new(&payload_bytes[..]))
22 .build()
23 .expect("all builder records specified"),
24 }
25 }
26
27 fn encoded_length(&self) -> usize {
28 let data_len = self.record.data.as_ref().len();
29 data_len.div_ceil(3) * 4 + 3
31 }
32
33 fn get(self) -> Self::T {
34 self.record
35 }
36}
37
38#[derive(Clone)]
39pub struct KinesisFirehoseClient {
40 pub client: KinesisClient,
41}
42
43impl SendRecord for KinesisFirehoseClient {
44 type T = KinesisRecord;
45 type E = KinesisError;
46
47 async fn send(
48 &self,
49 records: Vec<Self::T>,
50 stream_name: String,
51 ) -> Result<
52 KinesisResponse,
53 SdkError<Self::E, aws_smithy_runtime_api::client::orchestrator::HttpResponse>,
54 > {
55 let rec_count = records.len();
56 let total_size = records
57 .iter()
58 .fold(0, |acc, record| acc + record.data().as_ref().len());
59 self.client
60 .put_record_batch()
61 .set_records(Some(records))
62 .delivery_stream_name(stream_name)
63 .send()
64 .instrument(info_span!("request").or_current())
65 .await
66 .map(|output: PutRecordBatchOutput| KinesisResponse {
67 failure_count: output.failed_put_count() as usize,
68 events_byte_size: CountByteSize(rec_count, JsonSize::new(total_size)).into(),
69 #[cfg(feature = "sinks-aws_kinesis_streams")]
70 failed_records: vec![], })
72 }
73}