vector/sinks/aws_kinesis/
sink.rs

1use std::{borrow::Cow, fmt::Debug, marker::PhantomData};
2
3use rand::random;
4use vector_lib::lookup::lookup_v2::ConfigValuePath;
5use vrl::path::PathPrefix;
6
7use crate::{
8    internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
9    sinks::{
10        prelude::*,
11        util::{processed_event::ProcessedEvent, StreamSink},
12    },
13};
14
15use super::{
16    record::Record,
17    request_builder::{KinesisRequest, KinesisRequestBuilder},
18};
19
20pub type KinesisProcessedEvent = ProcessedEvent<LogEvent, KinesisKey>;
21
22#[derive(Debug, Clone, Hash, Eq, PartialEq)]
23pub struct KinesisKey {
24    pub partition_key: String,
25}
26
27#[derive(Clone)]
28pub struct KinesisSink<S, R> {
29    pub batch_settings: BatcherSettings,
30    pub service: S,
31    pub request_builder: KinesisRequestBuilder<R>,
32    pub partition_key_field: Option<ConfigValuePath>,
33    pub _phantom: PhantomData<R>,
34}
35
36impl<S, R> KinesisSink<S, R>
37where
38    S: Service<BatchKinesisRequest<R>> + Send + 'static,
39    S::Future: Send + 'static,
40    S::Response: DriverResponse + Send + 'static,
41    S::Error: Debug + Into<crate::Error> + Send,
42    R: Record + Send + Sync + Unpin + Clone + 'static,
43{
44    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
45        let batch_settings = self.batch_settings;
46
47        input
48            .filter_map(|event| {
49                // Panic: This sink only accepts Logs, so this should never panic
50                let log = event.into_log();
51                let processed = process_log(log, self.partition_key_field.as_ref());
52
53                future::ready(processed)
54            })
55            .request_builder(
56                default_request_builder_concurrency_limit(),
57                self.request_builder,
58            )
59            .filter_map(|request| async move {
60                match request {
61                    Err(error) => {
62                        emit!(SinkRequestBuildError { error });
63                        None
64                    }
65                    Ok(req) => Some(req),
66                }
67            })
68            .batched(batch_settings.as_byte_size_config())
69            .map(|events| {
70                let metadata = RequestMetadata::from_batch(
71                    events.iter().map(|req| req.get_metadata().clone()),
72                );
73                BatchKinesisRequest { events, metadata }
74            })
75            .into_driver(self.service)
76            .run()
77            .await
78    }
79}
80
81#[async_trait]
82impl<S, R> StreamSink<Event> for KinesisSink<S, R>
83where
84    S: Service<BatchKinesisRequest<R>> + Send + 'static,
85    S::Future: Send + 'static,
86    S::Response: DriverResponse + Send + 'static,
87    S::Error: Debug + Into<crate::Error> + Send,
88    R: Record + Send + Sync + Unpin + Clone + 'static,
89{
90    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
91        self.run_inner(input).await
92    }
93}
94
95/// Returns a `KinesisProcessedEvent` containing the unmodified log event + metadata consisting of
96/// the partition key. The partition key is either generated from the provided partition_key_field
97/// or is generated randomly.
98///
99/// If the provided partition_key_field was not found in the log, `Error` `EventsDropped` internal
100/// events are emitted and None is returned.
101pub(crate) fn process_log(
102    log: LogEvent,
103    partition_key_field: Option<&ConfigValuePath>,
104) -> Option<KinesisProcessedEvent> {
105    let partition_key = if let Some(partition_key_field) = partition_key_field {
106        if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
107            v.to_string_lossy()
108        } else {
109            emit!(AwsKinesisStreamNoPartitionKeyError {
110                partition_key_field: partition_key_field.0.to_string().as_str()
111            });
112            return None;
113        }
114    } else {
115        Cow::Owned(gen_partition_key())
116    };
117    let partition_key = if partition_key.len() >= 256 {
118        partition_key[..256].to_string()
119    } else {
120        partition_key.into_owned()
121    };
122
123    Some(KinesisProcessedEvent {
124        event: log,
125        metadata: KinesisKey { partition_key },
126    })
127}
128
129fn gen_partition_key() -> String {
130    random::<[char; 16]>()
131        .iter()
132        .fold(String::new(), |mut s, c| {
133            s.push(*c);
134            s
135        })
136}
137
138pub struct BatchKinesisRequest<R>
139where
140    R: Record + Clone,
141{
142    pub events: Vec<KinesisRequest<R>>,
143    metadata: RequestMetadata,
144}
145
146impl<R> Clone for BatchKinesisRequest<R>
147where
148    R: Record + Clone,
149{
150    fn clone(&self) -> Self {
151        Self {
152            events: self.events.to_vec(),
153            metadata: self.metadata.clone(),
154        }
155    }
156}
157
158impl<R> Finalizable for BatchKinesisRequest<R>
159where
160    R: Record + Clone,
161{
162    fn take_finalizers(&mut self) -> EventFinalizers {
163        self.events.take_finalizers()
164    }
165}
166
167impl<R> MetaDescriptive for BatchKinesisRequest<R>
168where
169    R: Record + Clone,
170{
171    fn get_metadata(&self) -> &RequestMetadata {
172        &self.metadata
173    }
174
175    fn metadata_mut(&mut self) -> &mut RequestMetadata {
176        &mut self.metadata
177    }
178}