vector/sinks/aws_kinesis/
config.rs1use std::marker::PhantomData;
2
3use vector_lib::{lookup::lookup_v2::ConfigValuePath, stream::BatcherSettings};
4
5use super::{
6 KinesisResponse, KinesisService,
7 record::{Record, SendRecord},
8 request_builder::KinesisRequestBuilder,
9 sink::{BatchKinesisRequest, KinesisSink},
10};
11use crate::{
12 aws::{AwsAuthentication, RegionOrEndpoint},
13 sinks::{
14 prelude::*,
15 util::{TowerRequestConfig, retries::RetryLogic},
16 },
17};
18
19#[configurable_component]
23#[derive(Clone, Debug)]
24#[serde(deny_unknown_fields)]
25pub struct KinesisSinkBaseConfig {
26 #[configurable(metadata(docs::examples = "my-stream"))]
30 pub stream_name: String,
31
32 #[serde(flatten)]
33 #[configurable(derived)]
34 pub region: RegionOrEndpoint,
35
36 #[configurable(derived)]
37 pub encoding: EncodingConfig,
38
39 #[configurable(derived)]
40 #[serde(default)]
41 pub compression: Compression,
42
43 #[configurable(derived)]
44 #[serde(default)]
45 pub request: TowerRequestConfig,
46
47 #[configurable(derived)]
48 pub tls: Option<TlsConfig>,
49
50 #[configurable(derived)]
51 #[serde(default)]
52 pub auth: AwsAuthentication,
53
54 #[serde(default)]
56 #[configurable(metadata(docs::advanced))]
57 pub request_retry_partial: bool,
58
59 #[configurable(derived)]
60 #[serde(
61 default,
62 deserialize_with = "crate::serde::bool_or_struct",
63 skip_serializing_if = "crate::serde::is_default"
64 )]
65 pub acknowledgements: AcknowledgementsConfig,
66
67 #[configurable(metadata(docs::examples = "user_id"))]
71 pub partition_key_field: Option<ConfigValuePath>,
72}
73
74impl KinesisSinkBaseConfig {
75 pub fn input(&self) -> Input {
76 Input::new(self.encoding.config().input_type() & DataType::Log)
77 }
78
79 pub const fn acknowledgements(&self) -> &AcknowledgementsConfig {
80 &self.acknowledgements
81 }
82}
83
84pub fn build_sink<C, R, RR, E, RT>(
86 config: &KinesisSinkBaseConfig,
87 partition_key_field: Option<ConfigValuePath>,
88 batch_settings: BatcherSettings,
89 client: C,
90 retry_logic: RT,
91) -> crate::Result<VectorSink>
92where
93 C: SendRecord + Clone + Send + Sync + 'static,
94 <C as SendRecord>::T: Send,
95 <C as SendRecord>::E: Send + Sync + snafu::Error,
96 Vec<<C as SendRecord>::T>: FromIterator<R>,
97 R: Send + 'static,
98 RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
99 E: Send + 'static,
100 RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
101{
102 let request_limits = config.request.into_settings();
103
104 let region = config.region.region();
105 let service = ServiceBuilder::new()
106 .settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
107 .service(KinesisService::<C, R, E> {
108 client,
109 stream_name: config.stream_name.clone(),
110 region,
111 _phantom_t: PhantomData,
112 _phantom_e: PhantomData,
113 });
114
115 let transformer = config.encoding.transformer();
116 let serializer = config.encoding.build()?;
117 let encoder = Encoder::<()>::new(serializer);
118
119 let request_builder = KinesisRequestBuilder::<RR> {
120 compression: config.compression,
121 encoder: (transformer, encoder),
122 _phantom: PhantomData,
123 };
124
125 let sink = KinesisSink {
126 batch_settings,
127 service,
128 request_builder,
129 partition_key_field,
130 _phantom: PhantomData,
131 };
132 Ok(VectorSink::from_event_streamsink(sink))
133}