vector/sinks/aws_kinesis/firehose/
config.rs1use aws_sdk_firehose::operation::{
2 describe_delivery_stream::DescribeDeliveryStreamError, put_record_batch::PutRecordBatchError,
3};
4use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
5use futures::FutureExt;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::sinks::util::retries::RetryAction;
10use crate::{
11 aws::{create_client, is_retriable_error, ClientBuilder},
12 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
13 sinks::{
14 util::{retries::RetryLogic, BatchConfig, SinkBatchSettings},
15 Healthcheck, VectorSink,
16 },
17};
18
19use super::sink::BatchKinesisRequest;
20use super::{
21 build_sink,
22 record::{KinesisFirehoseClient, KinesisFirehoseRecord},
23 KinesisClient, KinesisError, KinesisRecord, KinesisResponse, KinesisSinkBaseConfig,
24};
25
26#[allow(clippy::large_enum_variant)]
27#[derive(Debug, Snafu)]
28enum HealthcheckError {
29 #[snafu(display("DescribeDeliveryStream failed: {}", source))]
30 DescribeDeliveryStreamFailed {
31 source: SdkError<DescribeDeliveryStreamError, HttpResponse>,
32 },
33 #[snafu(display("Stream name does not match, got {}, expected {}", name, stream_name))]
34 StreamNamesMismatch { name: String, stream_name: String },
35}
36
37pub struct KinesisFirehoseClientBuilder;
38
39impl ClientBuilder for KinesisFirehoseClientBuilder {
40 type Client = KinesisClient;
41
42 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
43 Self::Client::new(config)
44 }
45}
46
47pub const MAX_PAYLOAD_SIZE: usize = 1024 * 1024 * 4;
50pub const MAX_PAYLOAD_EVENTS: usize = 500;
51
52#[derive(Clone, Copy, Debug, Default)]
53pub struct KinesisFirehoseDefaultBatchSettings;
54
55impl SinkBatchSettings for KinesisFirehoseDefaultBatchSettings {
56 const MAX_EVENTS: Option<usize> = Some(MAX_PAYLOAD_EVENTS);
57 const MAX_BYTES: Option<usize> = Some(MAX_PAYLOAD_SIZE);
58 const TIMEOUT_SECS: f64 = 1.0;
59}
60
61#[configurable_component(sink(
63 "aws_kinesis_firehose",
64 "Publish logs to AWS Kinesis Data Firehose topics."
65))]
66#[derive(Clone, Debug)]
67pub struct KinesisFirehoseSinkConfig {
68 #[serde(flatten)]
69 pub base: KinesisSinkBaseConfig,
70
71 #[configurable(derived)]
72 #[serde(default)]
73 pub batch: BatchConfig<KinesisFirehoseDefaultBatchSettings>,
74}
75
76impl KinesisFirehoseSinkConfig {
77 async fn healthcheck(self, client: KinesisClient) -> crate::Result<()> {
78 let stream_name = self.base.stream_name;
79
80 let result = client
81 .describe_delivery_stream()
82 .delivery_stream_name(stream_name.clone())
83 .set_exclusive_start_destination_id(None)
84 .limit(1)
85 .send()
86 .await;
87
88 match result {
89 Ok(resp) => {
90 let name = resp
91 .delivery_stream_description
92 .map(|x| x.delivery_stream_name)
93 .unwrap_or_default();
94 if name == stream_name {
95 Ok(())
96 } else {
97 Err(HealthcheckError::StreamNamesMismatch { name, stream_name }.into())
98 }
99 }
100 Err(source) => Err(HealthcheckError::DescribeDeliveryStreamFailed { source }.into()),
101 }
102 }
103
104 pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
105 create_client::<KinesisFirehoseClientBuilder>(
106 &KinesisFirehoseClientBuilder {},
107 &self.base.auth,
108 self.base.region.region(),
109 self.base.region.endpoint(),
110 proxy,
111 self.base.tls.as_ref(),
112 None,
113 )
114 .await
115 }
116}
117
118#[async_trait::async_trait]
119#[typetag::serde(name = "aws_kinesis_firehose")]
120impl SinkConfig for KinesisFirehoseSinkConfig {
121 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
122 let client = self.create_client(&cx.proxy).await?;
123 let healthcheck = self.clone().healthcheck(client.clone()).boxed();
124
125 let batch_settings = self
126 .batch
127 .validate()?
128 .limit_max_bytes(MAX_PAYLOAD_SIZE)?
129 .limit_max_events(MAX_PAYLOAD_EVENTS)?
130 .into_batcher_settings()?;
131
132 let sink = build_sink::<
133 KinesisFirehoseClient,
134 KinesisRecord,
135 KinesisFirehoseRecord,
136 KinesisError,
137 KinesisRetryLogic,
138 >(
139 &self.base,
140 self.base.partition_key_field.clone(),
141 batch_settings,
142 KinesisFirehoseClient { client },
143 KinesisRetryLogic {
144 retry_partial: self.base.request_retry_partial,
145 },
146 )?;
147
148 Ok((sink, healthcheck))
149 }
150
151 fn input(&self) -> Input {
152 self.base.input()
153 }
154
155 fn acknowledgements(&self) -> &AcknowledgementsConfig {
156 self.base.acknowledgements()
157 }
158}
159
160impl GenerateConfig for KinesisFirehoseSinkConfig {
161 fn generate_config() -> toml::Value {
162 toml::from_str(
163 r#"stream_name = "my-stream"
164 encoding.codec = "json""#,
165 )
166 .unwrap()
167 }
168}
169
170#[derive(Clone, Default)]
171struct KinesisRetryLogic {
172 retry_partial: bool,
173}
174
175impl RetryLogic for KinesisRetryLogic {
176 type Error = SdkError<KinesisError, HttpResponse>;
177 type Request = BatchKinesisRequest<KinesisFirehoseRecord>;
178 type Response = KinesisResponse;
179
180 fn is_retriable_error(&self, error: &Self::Error) -> bool {
181 if let SdkError::ServiceError(inner) = error {
182 if matches!(
183 inner.err(),
184 PutRecordBatchError::ServiceUnavailableException(_)
185 ) {
186 return true;
187 }
188 }
189 is_retriable_error(error)
190 }
191
192 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
193 if response.failure_count > 0 && self.retry_partial {
194 let msg = format!("partial error count {}", response.failure_count);
195 RetryAction::Retry(msg.into())
196 } else {
197 RetryAction::Successful
198 }
199 }
200}