vector/sinks/s3_common/
partitioner.rs

1use vector_lib::{event::Event, partition::Partitioner};
2
3use crate::{internal_events::TemplateRenderingError, template::Template};
4
5#[derive(Clone, Debug, Eq, Hash, PartialEq)]
6pub struct S3PartitionKey {
7    pub key_prefix: String,
8    pub ssekms_key_id: Option<String>,
9}
10
11/// Partitions items based on the generated key for the given event.
12pub struct S3KeyPartitioner {
13    key_prefix_template: Template,
14    ssekms_key_id_template: Option<Template>,
15    dead_letter_key_prefix: Option<String>,
16}
17
18impl S3KeyPartitioner {
19    pub const fn new(
20        key_prefix_template: Template,
21        ssekms_key_id_template: Option<Template>,
22        dead_letter_key_prefix: Option<String>,
23    ) -> Self {
24        Self {
25            key_prefix_template,
26            ssekms_key_id_template,
27            dead_letter_key_prefix,
28        }
29    }
30}
31
32impl Partitioner for S3KeyPartitioner {
33    type Item = Event;
34    type Key = Option<S3PartitionKey>;
35
36    fn partition(&self, item: &Self::Item) -> Self::Key {
37        let key_prefix = self
38            .key_prefix_template
39            .render_string(item)
40            .or_else(|error| {
41                if let Some(dead_letter_key_prefix) = &self.dead_letter_key_prefix {
42                    emit!(TemplateRenderingError {
43                        error,
44                        field: Some("key_prefix"),
45                        drop_event: false,
46                    });
47                    Ok(dead_letter_key_prefix.clone())
48                } else {
49                    Err(emit!(TemplateRenderingError {
50                        error,
51                        field: Some("key_prefix"),
52                        drop_event: true,
53                    }))
54                }
55            })
56            .ok()?;
57
58        let ssekms_key_id = self
59            .ssekms_key_id_template
60            .as_ref()
61            .map(|ssekms_key_id| {
62                ssekms_key_id.render_string(item).map_err(|error| {
63                    emit!(TemplateRenderingError {
64                        error,
65                        field: Some("ssekms_key_id"),
66                        drop_event: true,
67                    });
68                })
69            })
70            .transpose()
71            .ok()?;
72        Some(S3PartitionKey {
73            key_prefix,
74            ssekms_key_id,
75        })
76    }
77}