vector/sinks/aws_kinesis/
sink.rs1use std::{borrow::Cow, fmt::Debug, marker::PhantomData};
2
3use rand::random;
4use vector_lib::lookup::lookup_v2::ConfigValuePath;
5use vrl::path::PathPrefix;
6
7use crate::{
8 internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
9 sinks::{
10 prelude::*,
11 util::{processed_event::ProcessedEvent, StreamSink},
12 },
13};
14
15use super::{
16 record::Record,
17 request_builder::{KinesisRequest, KinesisRequestBuilder},
18};
19
20pub type KinesisProcessedEvent = ProcessedEvent<LogEvent, KinesisKey>;
21
22#[derive(Debug, Clone, Hash, Eq, PartialEq)]
23pub struct KinesisKey {
24 pub partition_key: String,
25}
26
27#[derive(Clone)]
28pub struct KinesisSink<S, R> {
29 pub batch_settings: BatcherSettings,
30 pub service: S,
31 pub request_builder: KinesisRequestBuilder<R>,
32 pub partition_key_field: Option<ConfigValuePath>,
33 pub _phantom: PhantomData<R>,
34}
35
36impl<S, R> KinesisSink<S, R>
37where
38 S: Service<BatchKinesisRequest<R>> + Send + 'static,
39 S::Future: Send + 'static,
40 S::Response: DriverResponse + Send + 'static,
41 S::Error: Debug + Into<crate::Error> + Send,
42 R: Record + Send + Sync + Unpin + Clone + 'static,
43{
44 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
45 let batch_settings = self.batch_settings;
46
47 input
48 .filter_map(|event| {
49 let log = event.into_log();
51 let processed = process_log(log, self.partition_key_field.as_ref());
52
53 future::ready(processed)
54 })
55 .request_builder(
56 default_request_builder_concurrency_limit(),
57 self.request_builder,
58 )
59 .filter_map(|request| async move {
60 match request {
61 Err(error) => {
62 emit!(SinkRequestBuildError { error });
63 None
64 }
65 Ok(req) => Some(req),
66 }
67 })
68 .batched(batch_settings.as_byte_size_config())
69 .map(|events| {
70 let metadata = RequestMetadata::from_batch(
71 events.iter().map(|req| req.get_metadata().clone()),
72 );
73 BatchKinesisRequest { events, metadata }
74 })
75 .into_driver(self.service)
76 .run()
77 .await
78 }
79}
80
81#[async_trait]
82impl<S, R> StreamSink<Event> for KinesisSink<S, R>
83where
84 S: Service<BatchKinesisRequest<R>> + Send + 'static,
85 S::Future: Send + 'static,
86 S::Response: DriverResponse + Send + 'static,
87 S::Error: Debug + Into<crate::Error> + Send,
88 R: Record + Send + Sync + Unpin + Clone + 'static,
89{
90 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
91 self.run_inner(input).await
92 }
93}
94
95pub(crate) fn process_log(
102 log: LogEvent,
103 partition_key_field: Option<&ConfigValuePath>,
104) -> Option<KinesisProcessedEvent> {
105 let partition_key = if let Some(partition_key_field) = partition_key_field {
106 if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
107 v.to_string_lossy()
108 } else {
109 emit!(AwsKinesisStreamNoPartitionKeyError {
110 partition_key_field: partition_key_field.0.to_string().as_str()
111 });
112 return None;
113 }
114 } else {
115 Cow::Owned(gen_partition_key())
116 };
117 let partition_key = if partition_key.len() >= 256 {
118 partition_key[..256].to_string()
119 } else {
120 partition_key.into_owned()
121 };
122
123 Some(KinesisProcessedEvent {
124 event: log,
125 metadata: KinesisKey { partition_key },
126 })
127}
128
129fn gen_partition_key() -> String {
130 random::<[char; 16]>()
131 .iter()
132 .fold(String::new(), |mut s, c| {
133 s.push(*c);
134 s
135 })
136}
137
138pub struct BatchKinesisRequest<R>
139where
140 R: Record + Clone,
141{
142 pub events: Vec<KinesisRequest<R>>,
143 metadata: RequestMetadata,
144}
145
146impl<R> Clone for BatchKinesisRequest<R>
147where
148 R: Record + Clone,
149{
150 fn clone(&self) -> Self {
151 Self {
152 events: self.events.to_vec(),
153 metadata: self.metadata.clone(),
154 }
155 }
156}
157
158impl<R> Finalizable for BatchKinesisRequest<R>
159where
160 R: Record + Clone,
161{
162 fn take_finalizers(&mut self) -> EventFinalizers {
163 self.events.take_finalizers()
164 }
165}
166
167impl<R> MetaDescriptive for BatchKinesisRequest<R>
168where
169 R: Record + Clone,
170{
171 fn get_metadata(&self) -> &RequestMetadata {
172 &self.metadata
173 }
174
175 fn metadata_mut(&mut self) -> &mut RequestMetadata {
176 &mut self.metadata
177 }
178}