vector/sinks/gcp_chronicle/
partitioner.rs1use vector_lib::{event::Event, partition::Partitioner};
2
3use crate::{internal_events::TemplateRenderingError, template::Template};
4
5#[derive(Clone, Debug, Eq, Hash, PartialEq)]
6pub struct ChroniclePartitionKey {
7 pub log_type: String,
8 pub namespace: Option<String>,
9}
10
11pub struct ChroniclePartitioner {
13 log_type: Template,
14 fallback_log_type: Option<String>,
15 namespace_template: Option<Template>,
16}
17
18impl ChroniclePartitioner {
19 pub const fn new(
20 log_type: Template,
21 fallback_log_type: Option<String>,
22 namespace_template: Option<Template>,
23 ) -> Self {
24 Self {
25 log_type,
26 fallback_log_type,
27 namespace_template,
28 }
29 }
30}
31
32impl Partitioner for ChroniclePartitioner {
33 type Item = Event;
34 type Key = Option<ChroniclePartitionKey>;
35
36 fn partition(&self, item: &Self::Item) -> Self::Key {
37 let log_type = self
38 .log_type
39 .render_string(item)
40 .or_else(|error| {
41 if let Some(fallback_log_type) = &self.fallback_log_type {
42 emit!(TemplateRenderingError {
43 error,
44 field: Some("log_type"),
45 drop_event: false,
46 });
47 Ok(fallback_log_type.clone())
48 } else {
49 Err(emit!(TemplateRenderingError {
50 error,
51 field: Some("log_type"),
52 drop_event: true,
53 }))
54 }
55 })
56 .ok()?;
57
58 let namespace = self
59 .namespace_template
60 .as_ref()
61 .map(|namespace| {
62 namespace.render_string(item).map_err(|error| {
63 emit!(TemplateRenderingError {
64 error,
65 field: Some("namespace"),
66 drop_event: true,
67 });
68 })
69 })
70 .transpose()
71 .ok()?;
72 Some(ChroniclePartitionKey {
73 log_type,
74 namespace,
75 })
76 }
77}