vector/sinks/aws_kinesis/streams/
config.rs

1use aws_sdk_kinesis::operation::describe_stream::DescribeStreamError;
2use aws_sdk_kinesis::operation::put_records::PutRecordsError;
3use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
4use futures::FutureExt;
5use snafu::Snafu;
6use vector_lib::configurable::{component::GenerateConfig, configurable_component};
7
8use crate::sinks::util::retries::RetryAction;
9use crate::{
10    aws::{create_client, is_retriable_error, ClientBuilder},
11    config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
12    sinks::{
13        util::{retries::RetryLogic, BatchConfig, SinkBatchSettings},
14        Healthcheck, VectorSink,
15    },
16};
17
18use super::sink::BatchKinesisRequest;
19use super::{
20    build_sink,
21    record::{KinesisStreamClient, KinesisStreamRecord},
22    KinesisClient, KinesisError, KinesisRecord, KinesisResponse, KinesisSinkBaseConfig,
23};
24
25#[allow(clippy::large_enum_variant)]
26#[derive(Debug, Snafu)]
27enum HealthcheckError {
28    #[snafu(display("DescribeStream failed: {}", source))]
29    DescribeStreamFailed {
30        source: SdkError<DescribeStreamError, HttpResponse>,
31    },
32    #[snafu(display("Stream names do not match, got {}, expected {}", name, stream_name))]
33    StreamNamesMismatch { name: String, stream_name: String },
34}
35
36pub struct KinesisClientBuilder;
37
38impl ClientBuilder for KinesisClientBuilder {
39    type Client = KinesisClient;
40
41    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
42        KinesisClient::new(config)
43    }
44}
45
46pub const MAX_PAYLOAD_SIZE: usize = 5_000_000;
47pub const MAX_PAYLOAD_EVENTS: usize = 500;
48
49#[derive(Clone, Copy, Debug, Default)]
50pub struct KinesisDefaultBatchSettings;
51
52impl SinkBatchSettings for KinesisDefaultBatchSettings {
53    const MAX_EVENTS: Option<usize> = Some(MAX_PAYLOAD_EVENTS);
54    const MAX_BYTES: Option<usize> = Some(MAX_PAYLOAD_SIZE);
55    const TIMEOUT_SECS: f64 = 1.0;
56}
57
58/// Configuration for the `aws_kinesis_streams` sink.
59#[configurable_component(sink(
60    "aws_kinesis_streams",
61    "Publish logs to AWS Kinesis Streams topics."
62))]
63#[derive(Clone, Debug)]
64pub struct KinesisStreamsSinkConfig {
65    #[serde(flatten)]
66    pub base: KinesisSinkBaseConfig,
67
68    #[configurable(derived)]
69    #[serde(default)]
70    pub batch: BatchConfig<KinesisDefaultBatchSettings>,
71}
72
73impl KinesisStreamsSinkConfig {
74    async fn healthcheck(self, client: KinesisClient) -> crate::Result<()> {
75        let stream_name = self.base.stream_name;
76
77        let describe_result = client
78            .describe_stream()
79            .stream_name(stream_name.clone())
80            .set_exclusive_start_shard_id(None)
81            .limit(1)
82            .send()
83            .await;
84
85        match describe_result {
86            Ok(resp) => {
87                let name = resp
88                    .stream_description
89                    .map(|x| x.stream_name)
90                    .unwrap_or_default();
91                if name == stream_name {
92                    Ok(())
93                } else {
94                    Err(HealthcheckError::StreamNamesMismatch { name, stream_name }.into())
95                }
96            }
97            Err(source) => Err(HealthcheckError::DescribeStreamFailed { source }.into()),
98        }
99    }
100
101    pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
102        create_client::<KinesisClientBuilder>(
103            &KinesisClientBuilder {},
104            &self.base.auth,
105            self.base.region.region(),
106            self.base.region.endpoint(),
107            proxy,
108            self.base.tls.as_ref(),
109            None,
110        )
111        .await
112    }
113}
114
115#[async_trait::async_trait]
116#[typetag::serde(name = "aws_kinesis_streams")]
117impl SinkConfig for KinesisStreamsSinkConfig {
118    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
119        let client = self.create_client(&cx.proxy).await?;
120        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
121
122        let batch_settings = self
123            .batch
124            .validate()?
125            .limit_max_bytes(MAX_PAYLOAD_SIZE)?
126            .limit_max_events(MAX_PAYLOAD_EVENTS)?
127            .into_batcher_settings()?;
128
129        let sink = build_sink::<
130            KinesisStreamClient,
131            KinesisRecord,
132            KinesisStreamRecord,
133            KinesisError,
134            KinesisRetryLogic,
135        >(
136            &self.base,
137            self.base.partition_key_field.clone(),
138            batch_settings,
139            KinesisStreamClient { client },
140            KinesisRetryLogic {
141                retry_partial: self.base.request_retry_partial,
142            },
143        )?;
144
145        Ok((sink, healthcheck))
146    }
147
148    fn input(&self) -> Input {
149        self.base.input()
150    }
151
152    fn acknowledgements(&self) -> &AcknowledgementsConfig {
153        self.base.acknowledgements()
154    }
155}
156
157impl GenerateConfig for KinesisStreamsSinkConfig {
158    fn generate_config() -> toml::Value {
159        toml::from_str(
160            r#"partition_key_field = "foo"
161            stream_name = "my-stream"
162            encoding.codec = "json""#,
163        )
164        .unwrap()
165    }
166}
167#[derive(Default, Clone)]
168struct KinesisRetryLogic {
169    retry_partial: bool,
170}
171
172impl RetryLogic for KinesisRetryLogic {
173    type Error = SdkError<KinesisError, HttpResponse>;
174    type Request = BatchKinesisRequest<KinesisStreamRecord>;
175    type Response = KinesisResponse;
176
177    fn is_retriable_error(&self, error: &Self::Error) -> bool {
178        if let SdkError::ServiceError(inner) = error {
179            // Note that if the request partially fails (records sent to one
180            // partition fail but the others do not, for example), Vector
181            // does not retry. This line only covers a failure for the entire
182            // request.
183            //
184            // https://github.com/vectordotdev/vector/issues/359
185            if matches!(
186                inner.err(),
187                PutRecordsError::ProvisionedThroughputExceededException(_)
188            ) {
189                return true;
190            }
191        }
192        is_retriable_error(error)
193    }
194
195    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
196        if response.failure_count > 0 && self.retry_partial {
197            let msg = format!("partial error count {}", response.failure_count);
198            RetryAction::Retry(msg.into())
199        } else {
200            RetryAction::Successful
201        }
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::KinesisStreamsSinkConfig;
208
209    #[test]
210    fn generate_config() {
211        crate::test_util::test_generate_config::<KinesisStreamsSinkConfig>();
212    }
213}