vector/sinks/aws_kinesis/
config.rs

1use std::marker::PhantomData;
2use vector_lib::lookup::lookup_v2::ConfigValuePath;
3
4use vector_lib::stream::BatcherSettings;
5
6use crate::{
7    aws::{AwsAuthentication, RegionOrEndpoint},
8    sinks::{
9        prelude::*,
10        util::{retries::RetryLogic, TowerRequestConfig},
11    },
12};
13
14use super::{
15    record::{Record, SendRecord},
16    request_builder::KinesisRequestBuilder,
17    sink::{BatchKinesisRequest, KinesisSink},
18    KinesisResponse, KinesisService,
19};
20
21/// Base configuration for the `aws_kinesis_` sinks.
22/// The actual specific sink configuration types should either wrap this in a newtype wrapper,
23/// or should extend it in a new struct with `serde(flatten)`.
24#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct KinesisSinkBaseConfig {
28    /// The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
29    ///
30    /// [stream_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
31    #[configurable(metadata(docs::examples = "my-stream"))]
32    pub stream_name: String,
33
34    #[serde(flatten)]
35    #[configurable(derived)]
36    pub region: RegionOrEndpoint,
37
38    #[configurable(derived)]
39    pub encoding: EncodingConfig,
40
41    #[configurable(derived)]
42    #[serde(default)]
43    pub compression: Compression,
44
45    #[configurable(derived)]
46    #[serde(default)]
47    pub request: TowerRequestConfig,
48
49    #[configurable(derived)]
50    pub tls: Option<TlsConfig>,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    pub auth: AwsAuthentication,
55
56    /// Whether or not to retry successful requests containing partial failures.
57    #[serde(default)]
58    #[configurable(metadata(docs::advanced))]
59    pub request_retry_partial: bool,
60
61    #[configurable(derived)]
62    #[serde(
63        default,
64        deserialize_with = "crate::serde::bool_or_struct",
65        skip_serializing_if = "crate::serde::is_default"
66    )]
67    pub acknowledgements: AcknowledgementsConfig,
68
69    /// The log field used as the Kinesis record’s partition key value.
70    ///
71    /// If not specified, a unique partition key is generated for each Kinesis record.
72    #[configurable(metadata(docs::examples = "user_id"))]
73    pub partition_key_field: Option<ConfigValuePath>,
74}
75
76impl KinesisSinkBaseConfig {
77    pub fn input(&self) -> Input {
78        Input::new(self.encoding.config().input_type() & DataType::Log)
79    }
80
81    pub const fn acknowledgements(&self) -> &AcknowledgementsConfig {
82        &self.acknowledgements
83    }
84}
85
86/// Builds an aws_kinesis sink.
87pub fn build_sink<C, R, RR, E, RT>(
88    config: &KinesisSinkBaseConfig,
89    partition_key_field: Option<ConfigValuePath>,
90    batch_settings: BatcherSettings,
91    client: C,
92    retry_logic: RT,
93) -> crate::Result<VectorSink>
94where
95    C: SendRecord + Clone + Send + Sync + 'static,
96    <C as SendRecord>::T: Send,
97    <C as SendRecord>::E: Send + Sync + snafu::Error,
98    Vec<<C as SendRecord>::T>: FromIterator<R>,
99    R: Send + 'static,
100    RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
101    E: Send + 'static,
102    RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
103{
104    let request_limits = config.request.into_settings();
105
106    let region = config.region.region();
107    let service = ServiceBuilder::new()
108        .settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
109        .service(KinesisService::<C, R, E> {
110            client,
111            stream_name: config.stream_name.clone(),
112            region,
113            _phantom_t: PhantomData,
114            _phantom_e: PhantomData,
115        });
116
117    let transformer = config.encoding.transformer();
118    let serializer = config.encoding.build()?;
119    let encoder = Encoder::<()>::new(serializer);
120
121    let request_builder = KinesisRequestBuilder::<RR> {
122        compression: config.compression,
123        encoder: (transformer, encoder),
124        _phantom: PhantomData,
125    };
126
127    let sink = KinesisSink {
128        batch_settings,
129        service,
130        request_builder,
131        partition_key_field,
132        _phantom: PhantomData,
133    };
134    Ok(VectorSink::from_event_streamsink(sink))
135}