vector/sinks/aws_kinesis/
sink.rs1use std::{borrow::Cow, fmt::Debug, marker::PhantomData};
2
3use rand::random;
4use vector_lib::lookup::lookup_v2::ConfigValuePath;
5use vrl::path::PathPrefix;
6
7use super::{
8 record::Record,
9 request_builder::{KinesisRequest, KinesisRequestBuilder},
10};
11use crate::{
12 internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
13 sinks::{
14 prelude::*,
15 util::{StreamSink, processed_event::ProcessedEvent},
16 },
17};
18
19pub type KinesisProcessedEvent = ProcessedEvent<LogEvent, KinesisKey>;
20
21#[derive(Debug, Clone, Hash, Eq, PartialEq)]
22pub struct KinesisKey {
23 pub partition_key: String,
24}
25
26#[derive(Clone)]
27pub struct KinesisSink<S, R> {
28 pub batch_settings: BatcherSettings,
29 pub service: S,
30 pub request_builder: KinesisRequestBuilder<R>,
31 pub partition_key_field: Option<ConfigValuePath>,
32 pub _phantom: PhantomData<R>,
33}
34
35impl<S, R> KinesisSink<S, R>
36where
37 S: Service<BatchKinesisRequest<R>> + Send + 'static,
38 S::Future: Send + 'static,
39 S::Response: DriverResponse + Send + 'static,
40 S::Error: Debug + Into<crate::Error> + Send,
41 R: Record + Send + Sync + Unpin + Clone + 'static,
42{
43 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
44 let batch_settings = self.batch_settings;
45
46 input
47 .filter_map(|event| {
48 let log = event.into_log();
50 let processed = process_log(log, self.partition_key_field.as_ref());
51
52 future::ready(processed)
53 })
54 .request_builder(
55 default_request_builder_concurrency_limit(),
56 self.request_builder,
57 )
58 .filter_map(|request| async move {
59 match request {
60 Err(error) => {
61 emit!(SinkRequestBuildError { error });
62 None
63 }
64 Ok(req) => Some(req),
65 }
66 })
67 .batched(batch_settings.as_byte_size_config())
68 .map(|events| {
69 let metadata = RequestMetadata::from_batch(
70 events.iter().map(|req| req.get_metadata().clone()),
71 );
72 BatchKinesisRequest { events, metadata }
73 })
74 .into_driver(self.service)
75 .run()
76 .await
77 }
78}
79
80#[async_trait]
81impl<S, R> StreamSink<Event> for KinesisSink<S, R>
82where
83 S: Service<BatchKinesisRequest<R>> + Send + 'static,
84 S::Future: Send + 'static,
85 S::Response: DriverResponse + Send + 'static,
86 S::Error: Debug + Into<crate::Error> + Send,
87 R: Record + Send + Sync + Unpin + Clone + 'static,
88{
89 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
90 self.run_inner(input).await
91 }
92}
93
94pub(crate) fn process_log(
101 log: LogEvent,
102 partition_key_field: Option<&ConfigValuePath>,
103) -> Option<KinesisProcessedEvent> {
104 let partition_key = if let Some(partition_key_field) = partition_key_field {
105 if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
106 v.to_string_lossy()
107 } else {
108 emit!(AwsKinesisStreamNoPartitionKeyError {
109 partition_key_field: partition_key_field.0.to_string().as_str()
110 });
111 return None;
112 }
113 } else {
114 Cow::Owned(gen_partition_key())
115 };
116 let partition_key = if partition_key.len() >= 256 {
117 partition_key[..256].to_string()
118 } else {
119 partition_key.into_owned()
120 };
121
122 Some(KinesisProcessedEvent {
123 event: log,
124 metadata: KinesisKey { partition_key },
125 })
126}
127
128fn gen_partition_key() -> String {
129 random::<[char; 16]>()
130 .iter()
131 .fold(String::new(), |mut s, c| {
132 s.push(*c);
133 s
134 })
135}
136
137pub struct BatchKinesisRequest<R>
138where
139 R: Record + Clone,
140{
141 pub events: Vec<KinesisRequest<R>>,
142 pub metadata: RequestMetadata,
143}
144
145impl<R> Clone for BatchKinesisRequest<R>
146where
147 R: Record + Clone,
148{
149 fn clone(&self) -> Self {
150 Self {
151 events: self.events.to_vec(),
152 metadata: self.metadata.clone(),
153 }
154 }
155}
156
157impl<R> Finalizable for BatchKinesisRequest<R>
158where
159 R: Record + Clone,
160{
161 fn take_finalizers(&mut self) -> EventFinalizers {
162 self.events.take_finalizers()
163 }
164}
165
166impl<R> MetaDescriptive for BatchKinesisRequest<R>
167where
168 R: Record + Clone,
169{
170 fn get_metadata(&self) -> &RequestMetadata {
171 &self.metadata
172 }
173
174 fn metadata_mut(&mut self) -> &mut RequestMetadata {
175 &mut self.metadata
176 }
177}