vector/sinks/aws_kinesis/
sink.rs

1use std::{borrow::Cow, fmt::Debug, marker::PhantomData};
2
3use rand::random;
4use vector_lib::lookup::lookup_v2::ConfigValuePath;
5use vrl::path::PathPrefix;
6
7use super::{
8    record::Record,
9    request_builder::{KinesisRequest, KinesisRequestBuilder},
10};
11use crate::{
12    internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
13    sinks::{
14        prelude::*,
15        util::{StreamSink, processed_event::ProcessedEvent},
16    },
17};
18
19pub type KinesisProcessedEvent = ProcessedEvent<LogEvent, KinesisKey>;
20
21#[derive(Debug, Clone, Hash, Eq, PartialEq)]
22pub struct KinesisKey {
23    pub partition_key: String,
24}
25
26#[derive(Clone)]
27pub struct KinesisSink<S, R> {
28    pub batch_settings: BatcherSettings,
29    pub service: S,
30    pub request_builder: KinesisRequestBuilder<R>,
31    pub partition_key_field: Option<ConfigValuePath>,
32    pub _phantom: PhantomData<R>,
33}
34
35impl<S, R> KinesisSink<S, R>
36where
37    S: Service<BatchKinesisRequest<R>> + Send + 'static,
38    S::Future: Send + 'static,
39    S::Response: DriverResponse + Send + 'static,
40    S::Error: Debug + Into<crate::Error> + Send,
41    R: Record + Send + Sync + Unpin + Clone + 'static,
42{
43    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
44        let batch_settings = self.batch_settings;
45
46        input
47            .filter_map(|event| {
48                // Panic: This sink only accepts Logs, so this should never panic
49                let log = event.into_log();
50                let processed = process_log(log, self.partition_key_field.as_ref());
51
52                future::ready(processed)
53            })
54            .request_builder(
55                default_request_builder_concurrency_limit(),
56                self.request_builder,
57            )
58            .filter_map(|request| async move {
59                match request {
60                    Err(error) => {
61                        emit!(SinkRequestBuildError { error });
62                        None
63                    }
64                    Ok(req) => Some(req),
65                }
66            })
67            .batched(batch_settings.as_byte_size_config())
68            .map(|events| {
69                let metadata = RequestMetadata::from_batch(
70                    events.iter().map(|req| req.get_metadata().clone()),
71                );
72                BatchKinesisRequest { events, metadata }
73            })
74            .into_driver(self.service)
75            .run()
76            .await
77    }
78}
79
80#[async_trait]
81impl<S, R> StreamSink<Event> for KinesisSink<S, R>
82where
83    S: Service<BatchKinesisRequest<R>> + Send + 'static,
84    S::Future: Send + 'static,
85    S::Response: DriverResponse + Send + 'static,
86    S::Error: Debug + Into<crate::Error> + Send,
87    R: Record + Send + Sync + Unpin + Clone + 'static,
88{
89    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
90        self.run_inner(input).await
91    }
92}
93
94/// Returns a `KinesisProcessedEvent` containing the unmodified log event + metadata consisting of
95/// the partition key. The partition key is either generated from the provided partition_key_field
96/// or is generated randomly.
97///
98/// If the provided partition_key_field was not found in the log, `Error` `EventsDropped` internal
99/// events are emitted and None is returned.
100pub(crate) fn process_log(
101    log: LogEvent,
102    partition_key_field: Option<&ConfigValuePath>,
103) -> Option<KinesisProcessedEvent> {
104    let partition_key = if let Some(partition_key_field) = partition_key_field {
105        if let Some(v) = log.get((PathPrefix::Event, partition_key_field)) {
106            v.to_string_lossy()
107        } else {
108            emit!(AwsKinesisStreamNoPartitionKeyError {
109                partition_key_field: partition_key_field.0.to_string().as_str()
110            });
111            return None;
112        }
113    } else {
114        Cow::Owned(gen_partition_key())
115    };
116    let partition_key = if partition_key.len() >= 256 {
117        partition_key[..256].to_string()
118    } else {
119        partition_key.into_owned()
120    };
121
122    Some(KinesisProcessedEvent {
123        event: log,
124        metadata: KinesisKey { partition_key },
125    })
126}
127
128fn gen_partition_key() -> String {
129    random::<[char; 16]>()
130        .iter()
131        .fold(String::new(), |mut s, c| {
132            s.push(*c);
133            s
134        })
135}
136
137pub struct BatchKinesisRequest<R>
138where
139    R: Record + Clone,
140{
141    pub events: Vec<KinesisRequest<R>>,
142    pub metadata: RequestMetadata,
143}
144
145impl<R> Clone for BatchKinesisRequest<R>
146where
147    R: Record + Clone,
148{
149    fn clone(&self) -> Self {
150        Self {
151            events: self.events.to_vec(),
152            metadata: self.metadata.clone(),
153        }
154    }
155}
156
157impl<R> Finalizable for BatchKinesisRequest<R>
158where
159    R: Record + Clone,
160{
161    fn take_finalizers(&mut self) -> EventFinalizers {
162        self.events.take_finalizers()
163    }
164}
165
166impl<R> MetaDescriptive for BatchKinesisRequest<R>
167where
168    R: Record + Clone,
169{
170    fn get_metadata(&self) -> &RequestMetadata {
171        &self.metadata
172    }
173
174    fn metadata_mut(&mut self) -> &mut RequestMetadata {
175        &mut self.metadata
176    }
177}