vector/config/
enrichment_table.rs1use enum_dispatch::enum_dispatch;
2use serde::Serialize;
3use vector_lib::config::GlobalOptions;
4use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
5use vector_lib::id::{ComponentKey, Inputs};
6
7use crate::enrichment_tables::EnrichmentTables;
8
9use super::dot_graph::GraphConfig;
10use super::{SinkConfig, SinkOuter, SourceConfig, SourceOuter};
11
12#[configurable_component]
14#[derive(Clone, Debug)]
15pub struct EnrichmentTableOuter<T>
16where
17 T: Configurable + Serialize + 'static + ToValue + Clone,
18{
19 #[serde(flatten)]
20 pub inner: EnrichmentTables,
21 #[configurable(derived)]
22 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
23 pub graph: GraphConfig,
24 #[configurable(derived)]
25 #[serde(
26 default = "Inputs::<T>::default",
27 skip_serializing_if = "Inputs::is_empty"
28 )]
29 pub inputs: Inputs<T>,
30}
31
32impl<T> EnrichmentTableOuter<T>
33where
34 T: Configurable + Serialize + 'static + ToValue + Clone,
35{
36 pub fn new<I, IET>(inputs: I, inner: IET) -> Self
37 where
38 I: IntoIterator<Item = T>,
39 IET: Into<EnrichmentTables>,
40 {
41 Self {
42 inner: inner.into(),
43 graph: Default::default(),
44 inputs: Inputs::from_iter(inputs),
45 }
46 }
47
48 pub fn as_sink(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SinkOuter<T>)> {
61 self.inner.sink_config(default_key).map(|(key, sink)| {
62 (
63 key,
64 SinkOuter {
65 graph: self.graph.clone(),
66 inputs: self.inputs.clone(),
67 healthcheck_uri: None,
68 healthcheck: Default::default(),
69 buffer: Default::default(),
70 proxy: Default::default(),
71 inner: sink,
72 },
73 )
74 })
75 }
76
77 pub fn as_source(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SourceOuter)> {
78 self.inner.source_config(default_key).map(|(key, source)| {
79 (
80 key,
81 SourceOuter {
82 graph: self.graph.clone(),
83 sink_acknowledgements: false,
84 proxy: Default::default(),
85 inner: source,
86 },
87 )
88 })
89 }
90
91 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
92 where
93 U: Configurable + Serialize + 'static + ToValue + Clone,
94 {
95 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
96 self.with_inputs(inputs)
97 }
98
99 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
100 where
101 I: IntoIterator<Item = U>,
102 U: Configurable + Serialize + 'static + ToValue + Clone,
103 {
104 EnrichmentTableOuter {
105 inputs: Inputs::from_iter(inputs),
106 inner: self.inner,
107 graph: self.graph,
108 }
109 }
110}
111
112#[enum_dispatch]
114pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync {
115 async fn build(
125 &self,
126 globals: &GlobalOptions,
127 ) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;
128
129 fn sink_config(
130 &self,
131 _default_key: &ComponentKey,
132 ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
133 None
134 }
135
136 fn source_config(
137 &self,
138 _default_key: &ComponentKey,
139 ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
140 None
141 }
142}