vector/sinks/aws_kinesis/firehose/
config.rs

1use aws_sdk_firehose::operation::{
2    describe_delivery_stream::DescribeDeliveryStreamError, put_record_batch::PutRecordBatchError,
3};
4use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
5use futures::FutureExt;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::sinks::util::retries::RetryAction;
10use crate::{
11    aws::{create_client, is_retriable_error, ClientBuilder},
12    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
13    sinks::{
14        util::{retries::RetryLogic, BatchConfig, SinkBatchSettings},
15        Healthcheck, VectorSink,
16    },
17};
18
19use super::sink::BatchKinesisRequest;
20use super::{
21    build_sink,
22    record::{KinesisFirehoseClient, KinesisFirehoseRecord},
23    KinesisClient, KinesisError, KinesisRecord, KinesisResponse, KinesisSinkBaseConfig,
24};
25
26#[allow(clippy::large_enum_variant)]
27#[derive(Debug, Snafu)]
28enum HealthcheckError {
29    #[snafu(display("DescribeDeliveryStream failed: {}", source))]
30    DescribeDeliveryStreamFailed {
31        source: SdkError<DescribeDeliveryStreamError, HttpResponse>,
32    },
33    #[snafu(display("Stream name does not match, got {}, expected {}", name, stream_name))]
34    StreamNamesMismatch { name: String, stream_name: String },
35}
36
37pub struct KinesisFirehoseClientBuilder;
38
39impl ClientBuilder for KinesisFirehoseClientBuilder {
40    type Client = KinesisClient;
41
42    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
43        Self::Client::new(config)
44    }
45}
46
47// AWS Kinesis Firehose API accepts payloads up to 4MB or 500 events
48// https://docs.aws.amazon.com/firehose/latest/dev/limits.html
49pub const MAX_PAYLOAD_SIZE: usize = 1024 * 1024 * 4;
50pub const MAX_PAYLOAD_EVENTS: usize = 500;
51
52#[derive(Clone, Copy, Debug, Default)]
53pub struct KinesisFirehoseDefaultBatchSettings;
54
55impl SinkBatchSettings for KinesisFirehoseDefaultBatchSettings {
56    const MAX_EVENTS: Option<usize> = Some(MAX_PAYLOAD_EVENTS);
57    const MAX_BYTES: Option<usize> = Some(MAX_PAYLOAD_SIZE);
58    const TIMEOUT_SECS: f64 = 1.0;
59}
60
61/// Configuration for the `aws_kinesis_firehose` sink.
62#[configurable_component(sink(
63    "aws_kinesis_firehose",
64    "Publish logs to AWS Kinesis Data Firehose topics."
65))]
66#[derive(Clone, Debug)]
67pub struct KinesisFirehoseSinkConfig {
68    #[serde(flatten)]
69    pub base: KinesisSinkBaseConfig,
70
71    #[configurable(derived)]
72    #[serde(default)]
73    pub batch: BatchConfig<KinesisFirehoseDefaultBatchSettings>,
74}
75
76impl KinesisFirehoseSinkConfig {
77    async fn healthcheck(self, client: KinesisClient) -> crate::Result<()> {
78        let stream_name = self.base.stream_name;
79
80        let result = client
81            .describe_delivery_stream()
82            .delivery_stream_name(stream_name.clone())
83            .set_exclusive_start_destination_id(None)
84            .limit(1)
85            .send()
86            .await;
87
88        match result {
89            Ok(resp) => {
90                let name = resp
91                    .delivery_stream_description
92                    .map(|x| x.delivery_stream_name)
93                    .unwrap_or_default();
94                if name == stream_name {
95                    Ok(())
96                } else {
97                    Err(HealthcheckError::StreamNamesMismatch { name, stream_name }.into())
98                }
99            }
100            Err(source) => Err(HealthcheckError::DescribeDeliveryStreamFailed { source }.into()),
101        }
102    }
103
104    pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
105        create_client::<KinesisFirehoseClientBuilder>(
106            &KinesisFirehoseClientBuilder {},
107            &self.base.auth,
108            self.base.region.region(),
109            self.base.region.endpoint(),
110            proxy,
111            self.base.tls.as_ref(),
112            None,
113        )
114        .await
115    }
116}
117
118#[async_trait::async_trait]
119#[typetag::serde(name = "aws_kinesis_firehose")]
120impl SinkConfig for KinesisFirehoseSinkConfig {
121    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
122        let client = self.create_client(&cx.proxy).await?;
123        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
124
125        let batch_settings = self
126            .batch
127            .validate()?
128            .limit_max_bytes(MAX_PAYLOAD_SIZE)?
129            .limit_max_events(MAX_PAYLOAD_EVENTS)?
130            .into_batcher_settings()?;
131
132        let sink = build_sink::<
133            KinesisFirehoseClient,
134            KinesisRecord,
135            KinesisFirehoseRecord,
136            KinesisError,
137            KinesisRetryLogic,
138        >(
139            &self.base,
140            self.base.partition_key_field.clone(),
141            batch_settings,
142            KinesisFirehoseClient { client },
143            KinesisRetryLogic {
144                retry_partial: self.base.request_retry_partial,
145            },
146        )?;
147
148        Ok((sink, healthcheck))
149    }
150
151    fn input(&self) -> Input {
152        self.base.input()
153    }
154
155    fn acknowledgements(&self) -> &AcknowledgementsConfig {
156        self.base.acknowledgements()
157    }
158}
159
160impl GenerateConfig for KinesisFirehoseSinkConfig {
161    fn generate_config() -> toml::Value {
162        toml::from_str(
163            r#"stream_name = "my-stream"
164            encoding.codec = "json""#,
165        )
166        .unwrap()
167    }
168}
169
170#[derive(Clone, Default)]
171struct KinesisRetryLogic {
172    retry_partial: bool,
173}
174
175impl RetryLogic for KinesisRetryLogic {
176    type Error = SdkError<KinesisError, HttpResponse>;
177    type Request = BatchKinesisRequest<KinesisFirehoseRecord>;
178    type Response = KinesisResponse;
179
180    fn is_retriable_error(&self, error: &Self::Error) -> bool {
181        if let SdkError::ServiceError(inner) = error {
182            if matches!(
183                inner.err(),
184                PutRecordBatchError::ServiceUnavailableException(_)
185            ) {
186                return true;
187            }
188        }
189        is_retriable_error(error)
190    }
191
192    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
193        if response.failure_count > 0 && self.retry_partial {
194            let msg = format!("partial error count {}", response.failure_count);
195            RetryAction::Retry(msg.into())
196        } else {
197            RetryAction::Successful
198        }
199    }
200}