vector/sinks/aws_kinesis/
config.rs

1use std::marker::PhantomData;
2
3use vector_lib::{lookup::lookup_v2::ConfigValuePath, stream::BatcherSettings};
4
5use super::{
6    KinesisResponse, KinesisService,
7    record::{Record, SendRecord},
8    request_builder::KinesisRequestBuilder,
9    sink::{BatchKinesisRequest, KinesisSink},
10};
11use crate::{
12    aws::{AwsAuthentication, RegionOrEndpoint},
13    sinks::{
14        prelude::*,
15        util::{TowerRequestConfig, retries::RetryLogic},
16    },
17};
18
19/// Base configuration for the `aws_kinesis_` sinks.
20/// The actual specific sink configuration types should either wrap this in a newtype wrapper,
21/// or should extend it in a new struct with `serde(flatten)`.
22#[configurable_component]
23#[derive(Clone, Debug)]
24#[serde(deny_unknown_fields)]
25pub struct KinesisSinkBaseConfig {
26    /// The [stream name][stream_name] of the target Kinesis Firehose delivery stream.
27    ///
28    /// [stream_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
29    #[configurable(metadata(docs::examples = "my-stream"))]
30    pub stream_name: String,
31
32    #[serde(flatten)]
33    #[configurable(derived)]
34    pub region: RegionOrEndpoint,
35
36    #[configurable(derived)]
37    pub encoding: EncodingConfig,
38
39    #[configurable(derived)]
40    #[serde(default)]
41    pub compression: Compression,
42
43    #[configurable(derived)]
44    #[serde(default)]
45    pub request: TowerRequestConfig,
46
47    #[configurable(derived)]
48    pub tls: Option<TlsConfig>,
49
50    #[configurable(derived)]
51    #[serde(default)]
52    pub auth: AwsAuthentication,
53
54    /// Whether or not to retry successful requests containing partial failures.
55    #[serde(default)]
56    #[configurable(metadata(docs::advanced))]
57    pub request_retry_partial: bool,
58
59    #[configurable(derived)]
60    #[serde(
61        default,
62        deserialize_with = "crate::serde::bool_or_struct",
63        skip_serializing_if = "crate::serde::is_default"
64    )]
65    pub acknowledgements: AcknowledgementsConfig,
66
67    /// The log field used as the Kinesis record’s partition key value.
68    ///
69    /// If not specified, a unique partition key is generated for each Kinesis record.
70    #[configurable(metadata(docs::examples = "user_id"))]
71    pub partition_key_field: Option<ConfigValuePath>,
72}
73
74impl KinesisSinkBaseConfig {
75    pub fn input(&self) -> Input {
76        Input::new(self.encoding.config().input_type() & DataType::Log)
77    }
78
79    pub const fn acknowledgements(&self) -> &AcknowledgementsConfig {
80        &self.acknowledgements
81    }
82}
83
84/// Builds an aws_kinesis sink.
85pub fn build_sink<C, R, RR, E, RT>(
86    config: &KinesisSinkBaseConfig,
87    partition_key_field: Option<ConfigValuePath>,
88    batch_settings: BatcherSettings,
89    client: C,
90    retry_logic: RT,
91) -> crate::Result<VectorSink>
92where
93    C: SendRecord + Clone + Send + Sync + 'static,
94    <C as SendRecord>::T: Send,
95    <C as SendRecord>::E: Send + Sync + snafu::Error,
96    Vec<<C as SendRecord>::T>: FromIterator<R>,
97    R: Send + 'static,
98    RR: Record + Record<T = R> + Clone + Send + Sync + Unpin + 'static,
99    E: Send + 'static,
100    RT: RetryLogic<Request = BatchKinesisRequest<RR>, Response = KinesisResponse> + Default,
101{
102    let request_limits = config.request.into_settings();
103
104    let region = config.region.region();
105    let service = ServiceBuilder::new()
106        .settings::<RT, BatchKinesisRequest<RR>>(request_limits, retry_logic)
107        .service(KinesisService::<C, R, E> {
108            client,
109            stream_name: config.stream_name.clone(),
110            region,
111            _phantom_t: PhantomData,
112            _phantom_e: PhantomData,
113        });
114
115    let transformer = config.encoding.transformer();
116    let serializer = config.encoding.build()?;
117    let encoder = Encoder::<()>::new(serializer);
118
119    let request_builder = KinesisRequestBuilder::<RR> {
120        compression: config.compression,
121        encoder: (transformer, encoder),
122        _phantom: PhantomData,
123    };
124
125    let sink = KinesisSink {
126        batch_settings,
127        service,
128        request_builder,
129        partition_key_field,
130        _phantom: PhantomData,
131    };
132    Ok(VectorSink::from_event_streamsink(sink))
133}