vector/sinks/s3_common/
partitioner.rs1use vector_lib::{event::Event, partition::Partitioner};
2
3use crate::{internal_events::TemplateRenderingError, template::Template};
4
5#[derive(Clone, Debug, Eq, Hash, PartialEq)]
6pub struct S3PartitionKey {
7 pub key_prefix: String,
8 pub ssekms_key_id: Option<String>,
9}
10
11pub struct S3KeyPartitioner {
13 key_prefix_template: Template,
14 ssekms_key_id_template: Option<Template>,
15 dead_letter_key_prefix: Option<String>,
16}
17
18impl S3KeyPartitioner {
19 pub const fn new(
20 key_prefix_template: Template,
21 ssekms_key_id_template: Option<Template>,
22 dead_letter_key_prefix: Option<String>,
23 ) -> Self {
24 Self {
25 key_prefix_template,
26 ssekms_key_id_template,
27 dead_letter_key_prefix,
28 }
29 }
30}
31
32impl Partitioner for S3KeyPartitioner {
33 type Item = Event;
34 type Key = Option<S3PartitionKey>;
35
36 fn partition(&self, item: &Self::Item) -> Self::Key {
37 let key_prefix = self
38 .key_prefix_template
39 .render_string(item)
40 .or_else(|error| {
41 if let Some(dead_letter_key_prefix) = &self.dead_letter_key_prefix {
42 emit!(TemplateRenderingError {
43 error,
44 field: Some("key_prefix"),
45 drop_event: false,
46 });
47 Ok(dead_letter_key_prefix.clone())
48 } else {
49 Err(emit!(TemplateRenderingError {
50 error,
51 field: Some("key_prefix"),
52 drop_event: true,
53 }))
54 }
55 })
56 .ok()?;
57
58 let ssekms_key_id = self
59 .ssekms_key_id_template
60 .as_ref()
61 .map(|ssekms_key_id| {
62 ssekms_key_id.render_string(item).map_err(|error| {
63 emit!(TemplateRenderingError {
64 error,
65 field: Some("ssekms_key_id"),
66 drop_event: true,
67 });
68 })
69 })
70 .transpose()
71 .ok()?;
72 Some(S3PartitionKey {
73 key_prefix,
74 ssekms_key_id,
75 })
76 }
77}