vector/sinks/aws_kinesis/
config.rs1use std::marker::PhantomData;
2use vector_lib::lookup::lookup_v2::ConfigValuePath;
3
4use vector_lib::stream::BatcherSettings;
5
6use crate::{
7 aws::{AwsAuthentication, RegionOrEndpoint},
8 sinks::{
9 prelude::*,
10 util::{retries::RetryLogic, TowerRequestConfig},
11 },
12};
13
14use super::{
15 record::{Record, SendRecord},
16 request_builder::KinesisRequestBuilder,
17 sink::{BatchKinesisRequest, KinesisSink},
18 KinesisResponse, KinesisService,
19};
20
21#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct KinesisSinkBaseConfig {
28 #[configurable(metadata(docs::examples = "my-stream"))]
32 pub stream_name: String,
33
34 #[serde(flatten)]
35 #[configurable(derived)]
36 pub region: RegionOrEndpoint,
37
38 #[configurable(derived)]
39 pub encoding: EncodingConfig,
40
41 #[configurable(derived)]
42 #[serde(default)]
43 pub compression: Compression,
44
45 #[configurable(derived)]
46 #[serde(default)]
47 pub request: TowerRequestConfig,
48
49 #[configurable(derived)]
50 pub tls: Option<TlsConfig>,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 pub auth: AwsAuthentication,
55
56 #[serde(default)]
58 #[configurable(metadata(docs::advanced))]
59 pub request_retry_partial: bool,
60
61 #[configurable(derived)]
62 #[serde(
63 default,
64 deserialize_with = "crate::serde::bool_or_struct",
65 skip_serializing_if = "crate::serde::is_default"
66 )]
67 pub acknowledgements: AcknowledgementsConfig,
68
69 #[configurable(metadata(docs::examples = "user_id"))]
73 pub partition_key_field: Option<ConfigValuePath>,
74}
75
76impl KinesisSinkBaseConfig {
77 pub fn input(&self) -> Input {
78 Input::new(self.encoding.config().input_type() & DataType::Log)
79 }
80
81 pub const fn acknowledgements(&self) -> &AcknowledgementsConfig {
82 &self.acknowledgements
83 }
84}
85
86pub fn build_sink<C, R, RR, E, RT>(
88 config: &KinesisSinkBaseConfig,
89 partition_key_field: Option<ConfigValuePath>,
90 batch_settings: BatcherSettings,
91 client: C,
92 retry_logic: RT,
93) -> crate::Result<VectorSink>
94where
95 C: SendRecord + Clone + Send + Sync + 'static,
96 <C as SendRecord>::T: Send,
97 <C as SendRecord>::E: Send + Sync + snafu::Error,
98 Vec<<C as SendRecord>::T>: FromIterator<R>,
99 R: Send + 'static,
100 RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
101 E: Send + 'static,
102 RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
103{
104 let request_limits = config.request.into_settings();
105
106 let region = config.region.region();
107 let service = ServiceBuilder::new()
108 .settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
109 .service(KinesisService::<C, R, E> {
110 client,
111 stream_name: config.stream_name.clone(),
112 region,
113 _phantom_t: PhantomData,
114 _phantom_e: PhantomData,
115 });
116
117 let transformer = config.encoding.transformer();
118 let serializer = config.encoding.build()?;
119 let encoder = Encoder::<()>::new(serializer);
120
121 let request_builder = KinesisRequestBuilder::<RR> {
122 compression: config.compression,
123 encoder: (transformer, encoder),
124 _phantom: PhantomData,
125 };
126
127 let sink = KinesisSink {
128 batch_settings,
129 service,
130 request_builder,
131 partition_key_field,
132 _phantom: PhantomData,
133 };
134 Ok(VectorSink::from_event_streamsink(sink))
135}