vector/config/
source.rs

1use std::cell::RefCell;
2use std::collections::HashMap;
3
4use async_trait::async_trait;
5use dyn_clone::DynClone;
6use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
7use vector_config_common::attributes::CustomAttribute;
8use vector_config_common::schema::{SchemaGenerator, SchemaObject};
9use vector_config_macros::configurable_component;
10use vector_lib::{
11    config::{
12        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
13        SourceOutput,
14    },
15    source::Source,
16};
17
18use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
19use crate::{extra_context::ExtraContext, shutdown::ShutdownSignal, SourceSender};
20
21pub type BoxedSource = Box<dyn SourceConfig>;
22
23impl Configurable for BoxedSource {
24    fn referenceable_name() -> Option<&'static str> {
25        Some("vector::sources::Sources")
26    }
27
28    fn metadata() -> Metadata {
29        let mut metadata = Metadata::default();
30        metadata.set_description("Configurable sources in Vector.");
31        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
32        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
33        metadata
34    }
35
36    fn generate_schema(
37        generator: &RefCell<SchemaGenerator>,
38    ) -> Result<SchemaObject, GenerateError> {
39        vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
40    }
41}
42
43impl<T: SourceConfig + 'static> From<T> for BoxedSource {
44    fn from(value: T) -> Self {
45        Box::new(value)
46    }
47}
48
49/// Fully resolved source component.
50#[configurable_component]
51#[configurable(metadata(docs::component_base_type = "source"))]
52#[derive(Clone, Debug)]
53pub struct SourceOuter {
54    #[configurable(derived)]
55    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
56    pub proxy: ProxyConfig,
57
58    #[configurable(derived)]
59    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
60    pub graph: GraphConfig,
61
62    #[serde(default, skip)]
63    pub sink_acknowledgements: bool,
64
65    #[configurable(metadata(docs::hidden))]
66    #[serde(flatten)]
67    pub(crate) inner: BoxedSource,
68}
69
70impl SourceOuter {
71    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
72        Self {
73            proxy: Default::default(),
74            graph: Default::default(),
75            sink_acknowledgements: false,
76            inner: inner.into(),
77        }
78    }
79}
80
81/// Generalized interface for describing and building source components.
82#[async_trait]
83#[typetag::serde(tag = "type")]
84pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
85    /// Builds the source with the given context.
86    ///
87    /// If the source is built successfully, `Ok(...)` is returned containing the source.
88    ///
89    /// # Errors
90    ///
91    /// If an error occurs while building the source, an error variant explaining the issue is
92    /// returned.
93    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
94
95    /// Gets the list of outputs exposed by this source.
96    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
97
98    /// Gets the list of resources, if any, used by this source.
99    ///
100    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
101    /// cannot be shared between components at runtime. This ensures that components can not be
102    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
103    /// Vector to determine the correct order for rebuilding a topology during configuration reload
104    /// when resources must first be reclaimed before being reassigned, and so on.
105    fn resources(&self) -> Vec<Resource> {
106        Vec::new()
107    }
108
109    /// Whether or not this source can acknowledge the events it emits.
110    ///
111    /// Generally, Vector uses acknowledgements to track when an event has finally been processed,
112    /// either successfully or unsuccessfully. While it is used internally in some areas, such as
113    /// within disk buffers for knowing when a message can be deleted from the buffer, it is
114    /// primarily used to signal back to a source that a message has been successfully (durably)
115    /// processed or not.
116    ///
117    /// By exposing whether or not a source supports acknowledgements, we can avoid situations where
118    /// using acknowledgements would only add processing overhead for no benefit to the source, as
119    /// well as emit contextual warnings when end-to-end acknowledgements are enabled, but the
120    /// topology as configured does not actually support the use of end-to-end acknowledgements.
121    fn can_acknowledge(&self) -> bool;
122}
123
124dyn_clone::clone_trait_object!(SourceConfig);
125
126pub struct SourceContext {
127    pub key: ComponentKey,
128    pub globals: GlobalOptions,
129    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
130    pub shutdown: ShutdownSignal,
131    pub out: SourceSender,
132    pub proxy: ProxyConfig,
133    pub acknowledgements: bool,
134    pub schema: schema::Options,
135
136    /// Tracks the schema IDs assigned to schemas exposed by the source.
137    ///
138    /// Given a source can expose multiple [`SourceOutput`] channels, the ID is tied to the identifier of
139    /// that `SourceOutput`.
140    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
141
142    /// Extra context data provided by the running app and shared across all components. This can be
143    /// used to pass shared settings or other data from outside the components.
144    pub extra_context: ExtraContext,
145}
146
147impl SourceContext {
148    #[cfg(any(test, feature = "test-utils"))]
149    pub fn new_shutdown(
150        key: &ComponentKey,
151        out: SourceSender,
152    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
153        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
154        let (shutdown_signal, _) = shutdown.register_source(key, false);
155        (
156            Self {
157                key: key.clone(),
158                globals: GlobalOptions::default(),
159                enrichment_tables: Default::default(),
160                shutdown: shutdown_signal,
161                out,
162                proxy: Default::default(),
163                acknowledgements: false,
164                schema_definitions: HashMap::default(),
165                schema: Default::default(),
166                extra_context: Default::default(),
167            },
168            shutdown,
169        )
170    }
171
172    #[cfg(any(test, feature = "test-utils"))]
173    pub fn new_test(
174        out: SourceSender,
175        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
176    ) -> Self {
177        Self {
178            key: ComponentKey::from("default"),
179            globals: GlobalOptions::default(),
180            enrichment_tables: Default::default(),
181            shutdown: ShutdownSignal::noop(),
182            out,
183            proxy: Default::default(),
184            acknowledgements: false,
185            schema_definitions: schema_definitions.unwrap_or_default(),
186            schema: Default::default(),
187            extra_context: Default::default(),
188        }
189    }
190
191    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
192        let config = AcknowledgementsConfig::from(config);
193        if config.enabled() {
194            warn!(
195                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
196                component_id = self.key.id(),
197            );
198        }
199
200        config
201            .merge_default(&self.globals.acknowledgements)
202            .merge_default(&self.acknowledgements.into())
203            .enabled()
204    }
205
206    /// Gets the log namespacing to use. The passed in value is from the source itself
207    /// and will override any global default if it's set.
208    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
209        namespace
210            .or(self.schema.log_namespace)
211            .unwrap_or(false)
212            .into()
213    }
214}