vector/config/
enrichment_table.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use enum_dispatch::enum_dispatch;
use serde::Serialize;
use vector_lib::config::GlobalOptions;
use vector_lib::configurable::{configurable_component, Configurable, NamedComponent, ToValue};
use vector_lib::id::{ComponentKey, Inputs};

use crate::enrichment_tables::EnrichmentTables;

use super::dot_graph::GraphConfig;
use super::{SinkConfig, SinkOuter, SourceConfig, SourceOuter};

/// Fully resolved enrichment table component.
#[configurable_component]
#[derive(Clone, Debug)]
pub struct EnrichmentTableOuter<T>
where
    T: Configurable + Serialize + 'static + ToValue + Clone,
{
    #[serde(flatten)]
    pub inner: EnrichmentTables,
    #[configurable(derived)]
    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
    pub graph: GraphConfig,
    #[configurable(derived)]
    #[serde(
        default = "Inputs::<T>::default",
        skip_serializing_if = "Inputs::is_empty"
    )]
    pub inputs: Inputs<T>,
}

impl<T> EnrichmentTableOuter<T>
where
    T: Configurable + Serialize + 'static + ToValue + Clone,
{
    pub fn new<I, IET>(inputs: I, inner: IET) -> Self
    where
        I: IntoIterator<Item = T>,
        IET: Into<EnrichmentTables>,
    {
        Self {
            inner: inner.into(),
            graph: Default::default(),
            inputs: Inputs::from_iter(inputs),
        }
    }

    // Components are currently built in a way that they match exactly one of the roles (source,
    // transform, sink, enrichment table). Due to specific requirements of the "memory" enrichment
    // table, it has to fulfill 2 of these roles (sink and enrichment table). To reduce the impact
    // of this very specific requirement, any enrichment table can now be optionally mapped into a
    // sink, but this will only work for a "memory" enrichment table, since other tables will not
    // have a "sink_config" present.
    // This is also not ideal, since `SinkOuter` is not meant to represent the actual configuration,
    // but it should just be a representation of that config used for deserialization.
    // In the future, if more such components come up, it would be good to limit such "Outer"
    // components to deserialization and build up the components and the topology in a more granular
    // way, with each having "modules" for inputs (making them valid as sinks), for healthchecks,
    // for providing outputs, etc.
    pub fn as_sink(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SinkOuter<T>)> {
        self.inner.sink_config(default_key).map(|(key, sink)| {
            (
                key,
                SinkOuter {
                    graph: self.graph.clone(),
                    inputs: self.inputs.clone(),
                    healthcheck_uri: None,
                    healthcheck: Default::default(),
                    buffer: Default::default(),
                    proxy: Default::default(),
                    inner: sink,
                },
            )
        })
    }

    pub fn as_source(&self, default_key: &ComponentKey) -> Option<(ComponentKey, SourceOuter)> {
        self.inner.source_config(default_key).map(|(key, source)| {
            (
                key,
                SourceOuter {
                    graph: self.graph.clone(),
                    sink_acknowledgements: false,
                    proxy: Default::default(),
                    inner: source,
                },
            )
        })
    }

    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> EnrichmentTableOuter<U>
    where
        U: Configurable + Serialize + 'static + ToValue + Clone,
    {
        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
        self.with_inputs(inputs)
    }

    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> EnrichmentTableOuter<U>
    where
        I: IntoIterator<Item = U>,
        U: Configurable + Serialize + 'static + ToValue + Clone,
    {
        EnrichmentTableOuter {
            inputs: Inputs::from_iter(inputs),
            inner: self.inner,
            graph: self.graph,
        }
    }
}

/// Generalized interface for describing and building enrichment table components.
#[enum_dispatch]
pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync {
    /// Builds the enrichment table with the given globals.
    ///
    /// If the enrichment table is built successfully, `Ok(...)` is returned containing the
    /// enrichment table.
    ///
    /// # Errors
    ///
    /// If an error occurs while building the enrichment table, an error variant explaining the
    /// issue is returned.
    async fn build(
        &self,
        globals: &GlobalOptions,
    ) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;

    fn sink_config(
        &self,
        _default_key: &ComponentKey,
    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
        None
    }

    fn source_config(
        &self,
        _default_key: &ComponentKey,
    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
        None
    }
}