vector/sinks/aws_kinesis/streams/
config.rs

1use aws_sdk_kinesis::operation::{
2    describe_stream::DescribeStreamError, put_records::PutRecordsError,
3};
4use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
5use futures::FutureExt;
6use snafu::Snafu;
7use vector_lib::configurable::{component::GenerateConfig, configurable_component};
8
9use super::{
10    KinesisClient, KinesisError, KinesisRecord, KinesisResponse, KinesisSinkBaseConfig, build_sink,
11    record::{KinesisStreamClient, KinesisStreamRecord},
12    sink::BatchKinesisRequest,
13};
14use crate::{
15    aws::{ClientBuilder, create_client, is_retriable_error},
16    config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
17    sinks::{
18        Healthcheck, VectorSink,
19        prelude::*,
20        util::{
21            BatchConfig, SinkBatchSettings,
22            retries::{RetryAction, RetryLogic},
23        },
24    },
25};
26
27#[allow(clippy::large_enum_variant)]
28#[derive(Debug, Snafu)]
29enum HealthcheckError {
30    #[snafu(display("DescribeStream failed: {}", source))]
31    DescribeStreamFailed {
32        source: SdkError<DescribeStreamError, HttpResponse>,
33    },
34    #[snafu(display("Stream names do not match, got {}, expected {}", name, stream_name))]
35    StreamNamesMismatch { name: String, stream_name: String },
36}
37
38pub struct KinesisClientBuilder;
39
40impl ClientBuilder for KinesisClientBuilder {
41    type Client = KinesisClient;
42
43    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
44        KinesisClient::new(config)
45    }
46}
47
48pub const MAX_PAYLOAD_SIZE: usize = 5_000_000;
49pub const MAX_PAYLOAD_EVENTS: usize = 500;
50
51#[derive(Clone, Copy, Debug, Default)]
52pub struct KinesisDefaultBatchSettings;
53
54impl SinkBatchSettings for KinesisDefaultBatchSettings {
55    const MAX_EVENTS: Option<usize> = Some(MAX_PAYLOAD_EVENTS);
56    const MAX_BYTES: Option<usize> = Some(MAX_PAYLOAD_SIZE);
57    const TIMEOUT_SECS: f64 = 1.0;
58}
59
60/// Configuration for the `aws_kinesis_streams` sink.
61#[configurable_component(sink(
62    "aws_kinesis_streams",
63    "Publish logs to AWS Kinesis Streams topics."
64))]
65#[derive(Clone, Debug)]
66pub struct KinesisStreamsSinkConfig {
67    #[serde(flatten)]
68    pub base: KinesisSinkBaseConfig,
69
70    #[configurable(derived)]
71    #[serde(default)]
72    pub batch: BatchConfig<KinesisDefaultBatchSettings>,
73}
74
75impl KinesisStreamsSinkConfig {
76    async fn healthcheck(self, client: KinesisClient) -> crate::Result<()> {
77        let stream_name = self.base.stream_name;
78
79        let describe_result = client
80            .describe_stream()
81            .stream_name(stream_name.clone())
82            .set_exclusive_start_shard_id(None)
83            .limit(1)
84            .send()
85            .await;
86
87        match describe_result {
88            Ok(resp) => {
89                let name = resp
90                    .stream_description
91                    .map(|x| x.stream_name)
92                    .unwrap_or_default();
93                if name == stream_name {
94                    Ok(())
95                } else {
96                    Err(HealthcheckError::StreamNamesMismatch { name, stream_name }.into())
97                }
98            }
99            Err(source) => Err(HealthcheckError::DescribeStreamFailed { source }.into()),
100        }
101    }
102
103    pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
104        create_client::<KinesisClientBuilder>(
105            &KinesisClientBuilder {},
106            &self.base.auth,
107            self.base.region.region(),
108            self.base.region.endpoint(),
109            proxy,
110            self.base.tls.as_ref(),
111            None,
112        )
113        .await
114    }
115}
116
117#[async_trait::async_trait]
118#[typetag::serde(name = "aws_kinesis_streams")]
119impl SinkConfig for KinesisStreamsSinkConfig {
120    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
121        let client = self.create_client(&cx.proxy).await?;
122        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
123
124        let batch_settings = self
125            .batch
126            .validate()?
127            .limit_max_bytes(MAX_PAYLOAD_SIZE)?
128            .limit_max_events(MAX_PAYLOAD_EVENTS)?
129            .into_batcher_settings()?;
130
131        let sink = build_sink::<
132            KinesisStreamClient,
133            KinesisRecord,
134            KinesisStreamRecord,
135            KinesisError,
136            KinesisRetryLogic,
137        >(
138            &self.base,
139            self.base.partition_key_field.clone(),
140            batch_settings,
141            KinesisStreamClient { client },
142            KinesisRetryLogic {
143                retry_partial: self.base.request_retry_partial,
144            },
145        )?;
146
147        Ok((sink, healthcheck))
148    }
149
150    fn input(&self) -> Input {
151        self.base.input()
152    }
153
154    fn acknowledgements(&self) -> &AcknowledgementsConfig {
155        self.base.acknowledgements()
156    }
157}
158
159impl GenerateConfig for KinesisStreamsSinkConfig {
160    fn generate_config() -> toml::Value {
161        toml::from_str(
162            r#"partition_key_field = "foo"
163            stream_name = "my-stream"
164            encoding.codec = "json""#,
165        )
166        .unwrap()
167    }
168}
169#[derive(Default, Clone)]
170struct KinesisRetryLogic {
171    retry_partial: bool,
172}
173
174impl RetryLogic for KinesisRetryLogic {
175    type Error = SdkError<KinesisError, HttpResponse>;
176    type Request = BatchKinesisRequest<KinesisStreamRecord>;
177    type Response = KinesisResponse;
178
179    fn is_retriable_error(&self, error: &Self::Error) -> bool {
180        if let SdkError::ServiceError(inner) = error {
181            // Note that if the request partially fails (records sent to one
182            // partition fail but the others do not, for example), Vector
183            // does not retry. This line only covers a failure for the entire
184            // request.
185            //
186            // https://github.com/vectordotdev/vector/issues/359
187            if matches!(
188                inner.err(),
189                PutRecordsError::ProvisionedThroughputExceededException(_)
190            ) {
191                return true;
192            }
193        }
194        is_retriable_error(error)
195    }
196
197    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
198        if response.failure_count > 0 && self.retry_partial && !response.failed_records.is_empty() {
199            let failed_records = response.failed_records.clone();
200            RetryAction::RetryPartial(Box::new(move |original_request| {
201                let failed_events: Vec<_> = failed_records
202                    .iter()
203                    .filter_map(|r| original_request.events.get(r.index).cloned())
204                    .collect();
205
206                let metadata = RequestMetadata::from_batch(
207                    failed_events.iter().map(|req| req.get_metadata().clone()),
208                );
209
210                BatchKinesisRequest {
211                    events: failed_events,
212                    metadata,
213                }
214            }))
215        } else {
216            RetryAction::Successful
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::KinesisStreamsSinkConfig;
224
225    #[test]
226    fn generate_config() {
227        crate::test_util::test_generate_config::<KinesisStreamsSinkConfig>();
228    }
229}