vector/sinks/gcp_chronicle/
partitioner.rs

1use vector_lib::{event::Event, partition::Partitioner};
2
3use crate::{internal_events::TemplateRenderingError, template::Template};
4
5#[derive(Clone, Debug, Eq, Hash, PartialEq)]
6pub struct ChroniclePartitionKey {
7    pub log_type: String,
8    pub namespace: Option<String>,
9}
10
11/// Partitions items based on the generated key for the given event.
12pub struct ChroniclePartitioner {
13    log_type: Template,
14    fallback_log_type: Option<String>,
15    namespace_template: Option<Template>,
16}
17
18impl ChroniclePartitioner {
19    pub const fn new(
20        log_type: Template,
21        fallback_log_type: Option<String>,
22        namespace_template: Option<Template>,
23    ) -> Self {
24        Self {
25            log_type,
26            fallback_log_type,
27            namespace_template,
28        }
29    }
30}
31
32impl Partitioner for ChroniclePartitioner {
33    type Item = Event;
34    type Key = Option<ChroniclePartitionKey>;
35
36    fn partition(&self, item: &Self::Item) -> Self::Key {
37        let log_type = self
38            .log_type
39            .render_string(item)
40            .or_else(|error| {
41                if let Some(fallback_log_type) = &self.fallback_log_type {
42                    emit!(TemplateRenderingError {
43                        error,
44                        field: Some("log_type"),
45                        drop_event: false,
46                    });
47                    Ok(fallback_log_type.clone())
48                } else {
49                    Err(emit!(TemplateRenderingError {
50                        error,
51                        field: Some("log_type"),
52                        drop_event: true,
53                    }))
54                }
55            })
56            .ok()?;
57
58        let namespace = self
59            .namespace_template
60            .as_ref()
61            .map(|namespace| {
62                namespace.render_string(item).map_err(|error| {
63                    emit!(TemplateRenderingError {
64                        error,
65                        field: Some("namespace"),
66                        drop_event: true,
67                    });
68                })
69            })
70            .transpose()
71            .ok()?;
72        Some(ChroniclePartitionKey {
73            log_type,
74            namespace,
75        })
76    }
77}