vector/config/enrichment_table.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
use enum_dispatch::enum_dispatch;
use serde::Serialize;
use vector_lib::config::GlobalOptions;
use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
use vector_lib::id::Inputs;
use crate::enrichment_tables::EnrichmentTables;
use super::dot_graph::GraphConfig;
use super::{SinkConfig, SinkOuter};
/// Fully resolved enrichment table component.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
#[serde(flatten)]
pub inner: EnrichmentTables,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub graph: GraphConfig,
#[configurable(derived)]
#[serde(
default = "Inputs::<T>::default",
skip_serializing_if = "Inputs::is_empty"
)]
pub inputs: Inputs<T>,
}
impl<T> EnrichmentTableOuter<T>
where
T: Configurable + Serialize + 'static + ToValue + Clone,
{
pub fn new<I, IET>(inputs: I, inner: IET) -> Self
where
I: IntoIterator<Item = T>,
IET: Into<EnrichmentTables>,
{
Self {
inner: inner.into(),
graph: Default::default(),
inputs: Inputs::from_iter(inputs),
}
}
// Components are currently built in a way that they match exactly one of the roles (source,
// transform, sink, enrichment table). Due to specific requirements of the "memory" enrichment
// table, it has to fulfill 2 of these roles (sink and enrichment table). To reduce the impact
// of this very specific requirement, any enrichment table can now be optionally mapped into a
// sink, but this will only work for a "memory" enrichment table, since other tables will not
// have a "sink_config" present.
// This is also not ideal, since `SinkOuter` is not meant to represent the actual configuration,
// but it should just be a representation of that config used for deserialization.
// In the future, if more such components come up, it would be good to limit such "Outer"
// components to deserialization and build up the components and the topology in a more granular
// way, with each having "modules" for inputs (making them valid as sinks), for healthchecks,
// for providing outputs, etc.
pub fn as_sink(&self) -> Option<SinkOuter<T>> {
self.inner.sink_config().map(|sink| SinkOuter {
graph: self.graph.clone(),
inputs: self.inputs.clone(),
healthcheck_uri: None,
healthcheck: Default::default(),
buffer: Default::default(),
proxy: Default::default(),
inner: sink,
})
}
pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
where
U: Configurable + Serialize + 'static + ToValue + Clone,
{
let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
self.with_inputs(inputs)
}
pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
where
I: IntoIterator<Item = U>,
U: Configurable + Serialize + 'static + ToValue + Clone,
{
EnrichmentTableOuter {
inputs: Inputs::from_iter(inputs),
inner: self.inner,
graph: self.graph,
}
}
}
/// Generalized interface for describing and building enrichment table components.
#[enum_dispatch]
pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync {
/// Builds the enrichment table with the given globals.
///
/// If the enrichment table is built successfully, `Ok(...)` is returned containing the
/// enrichment table.
///
/// # Errors
///
/// If an error occurs while building the enrichment table, an error variant explaining the
/// issue is returned.
async fn build(
&self,
globals: &GlobalOptions,
) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;
fn sink_config(&self) -> Option<Box<dyn SinkConfig>> {
None
}
}