vector/config/
enrichment_table.rs

1use enum_dispatch::enum_dispatch;
2use serde::Serialize;
3use vector_lib::config::GlobalOptions;
4use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
5use vector_lib::id::{ComponentKey, Inputs};
6
7use crate::enrichment_tables::EnrichmentTables;
8
9use super::dot_graph::GraphConfig;
10use super::{SinkConfig, SinkOuter, SourceConfig, SourceOuter};
11
12/// Fully resolved enrichment table component.
13#[configurable_component]
14#[derive(Clone, Debug)]
15pub struct EnrichmentTableOuter<T>
16where
17    T: Configurable + Serialize + 'static + ToValue + Clone,
18{
19    #[serde(flatten)]
20    pub inner: EnrichmentTables,
21    #[configurable(derived)]
22    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
23    pub graph: GraphConfig,
24    #[configurable(derived)]
25    #[serde(
26        default = "Inputs::<T>::default",
27        skip_serializing_if = "Inputs::is_empty"
28    )]
29    pub inputs: Inputs<T>,
30}
31
32impl<T> EnrichmentTableOuter<T>
33where
34    T: Configurable + Serialize + 'static + ToValue + Clone,
35{
36    pub fn new<I, IET>(inputs: I, inner: IET) -> Self
37    where
38        I: IntoIterator<Item = T>,
39        IET: Into<EnrichmentTables>,
40    {
41        Self {
42            inner: inner.into(),
43            graph: Default::default(),
44            inputs: Inputs::from_iter(inputs),
45        }
46    }
47
48    // Components are currently built in a way that they match exactly one of the roles (source,
49    // transform, sink, enrichment table). Due to specific requirements of the "memory" enrichment
50    // table, it has to fulfill 2 of these roles (sink and enrichment table). To reduce the impact
51    // of this very specific requirement, any enrichment table can now be optionally mapped into a
52    // sink, but this will only work for a "memory" enrichment table, since other tables will not
53    // have a "sink_config" present.
54    // This is also not ideal, since `SinkOuter` is not meant to represent the actual configuration,
55    // but it should just be a representation of that config used for deserialization.
56    // In the future, if more such components come up, it would be good to limit such "Outer"
57    // components to deserialization and build up the components and the topology in a more granular
58    // way, with each having "modules" for inputs (making them valid as sinks), for healthchecks,
59    // for providing outputs, etc.
60    pub fn as_sink(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SinkOuter<T>)> {
61        self.inner.sink_config(default_key).map(|(key, sink)| {
62            (
63                key,
64                SinkOuter {
65                    graph: self.graph.clone(),
66                    inputs: self.inputs.clone(),
67                    healthcheck_uri: None,
68                    healthcheck: Default::default(),
69                    buffer: Default::default(),
70                    proxy: Default::default(),
71                    inner: sink,
72                },
73            )
74        })
75    }
76
77    pub fn as_source(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SourceOuter)> {
78        self.inner.source_config(default_key).map(|(key, source)| {
79            (
80                key,
81                SourceOuter {
82                    graph: self.graph.clone(),
83                    sink_acknowledgements: false,
84                    proxy: Default::default(),
85                    inner: source,
86                },
87            )
88        })
89    }
90
91    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
92    where
93        U: Configurable + Serialize + 'static + ToValue + Clone,
94    {
95        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
96        self.with_inputs(inputs)
97    }
98
99    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
100    where
101        I: IntoIterator<Item = U>,
102        U: Configurable + Serialize + 'static + ToValue + Clone,
103    {
104        EnrichmentTableOuter {
105            inputs: Inputs::from_iter(inputs),
106            inner: self.inner,
107            graph: self.graph,
108        }
109    }
110}
111
112/// Generalized interface for describing and building enrichment table components.
113#[enum_dispatch]
114pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync {
115    /// Builds the enrichment table with the given globals.
116    ///
117    /// If the enrichment table is built successfully, `Ok(...)` is returned containing the
118    /// enrichment table.
119    ///
120    /// # Errors
121    ///
122    /// If an error occurs while building the enrichment table, an error variant explaining the
123    /// issue is returned.
124    async fn build(
125        &self,
126        globals: &GlobalOptions,
127    ) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;
128
129    fn sink_config(
130        &self,
131        _default_key: &ComponentKey,
132    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
133        None
134    }
135
136    fn source_config(
137        &self,
138        _default_key: &ComponentKey,
139    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
140        None
141    }
142}