vector/config/
source.rs

1use std::{cell::RefCell, collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7    attributes::CustomAttribute,
8    schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12    config::{
13        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14        SourceOutput,
15    },
16    source::Source,
17};
18
19use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
20use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
21
22pub type BoxedSource = Box<dyn SourceConfig>;
23
24impl Configurable for BoxedSource {
25    fn referenceable_name() -> Option<&'static str> {
26        Some("vector::sources::Sources")
27    }
28
29    fn metadata() -> Metadata {
30        let mut metadata = Metadata::default();
31        metadata.set_description("Configurable sources in Vector.");
32        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
33        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
34        metadata
35    }
36
37    fn generate_schema(
38        generator: &RefCell<SchemaGenerator>,
39    ) -> Result<SchemaObject, GenerateError> {
40        vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
41    }
42}
43
44impl<T: SourceConfig + 'static> From<T> for BoxedSource {
45    fn from(value: T) -> Self {
46        Box::new(value)
47    }
48}
49
50/// Fully resolved source component.
51#[configurable_component]
52#[configurable(metadata(docs::component_base_type = "source"))]
53#[derive(Clone, Debug)]
54pub struct SourceOuter {
55    #[configurable(derived)]
56    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
57    pub proxy: ProxyConfig,
58
59    #[configurable(derived)]
60    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
61    pub graph: GraphConfig,
62
63    #[serde(default, skip)]
64    pub sink_acknowledgements: bool,
65
66    #[configurable(metadata(docs::hidden))]
67    #[serde(flatten)]
68    pub(crate) inner: BoxedSource,
69}
70
71impl SourceOuter {
72    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
73        Self {
74            proxy: Default::default(),
75            graph: Default::default(),
76            sink_acknowledgements: false,
77            inner: inner.into(),
78        }
79    }
80}
81
82/// Generalized interface for describing and building source components.
83#[async_trait]
84#[typetag::serde(tag = "type")]
85pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
86    /// Builds the source with the given context.
87    ///
88    /// If the source is built successfully, `Ok(...)` is returned containing the source.
89    ///
90    /// # Errors
91    ///
92    /// If an error occurs while building the source, an error variant explaining the issue is
93    /// returned.
94    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
95
96    /// Gets the list of outputs exposed by this source.
97    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
98
99    /// Gets the list of resources, if any, used by this source.
100    ///
101    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
102    /// cannot be shared between components at runtime. This ensures that components can not be
103    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
104    /// Vector to determine the correct order for rebuilding a topology during configuration reload
105    /// when resources must first be reclaimed before being reassigned, and so on.
106    fn resources(&self) -> Vec<Resource> {
107        Vec::new()
108    }
109
110    /// Whether or not this source can acknowledge the events it emits.
111    ///
112    /// Generally, Vector uses acknowledgements to track when an event has finally been processed,
113    /// either successfully or unsuccessfully. While it is used internally in some areas, such as
114    /// within disk buffers for knowing when a message can be deleted from the buffer, it is
115    /// primarily used to signal back to a source that a message has been successfully (durably)
116    /// processed or not.
117    ///
118    /// By exposing whether or not a source supports acknowledgements, we can avoid situations where
119    /// using acknowledgements would only add processing overhead for no benefit to the source, as
120    /// well as emit contextual warnings when end-to-end acknowledgements are enabled, but the
121    /// topology as configured does not actually support the use of end-to-end acknowledgements.
122    fn can_acknowledge(&self) -> bool;
123
124    /// If this source supports timeout returns from the `SourceSender` and the configuration
125    /// provides a timeout value, return it here and the `out` channel will be configured with it.
126    fn send_timeout(&self) -> Option<Duration> {
127        None
128    }
129}
130
131dyn_clone::clone_trait_object!(SourceConfig);
132
133pub struct SourceContext {
134    pub key: ComponentKey,
135    pub globals: GlobalOptions,
136    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
137    pub shutdown: ShutdownSignal,
138    pub out: SourceSender,
139    pub proxy: ProxyConfig,
140    pub acknowledgements: bool,
141    pub schema: schema::Options,
142
143    /// Tracks the schema IDs assigned to schemas exposed by the source.
144    ///
145    /// Given a source can expose multiple [`SourceOutput`] channels, the ID is tied to the identifier of
146    /// that `SourceOutput`.
147    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
148
149    /// Extra context data provided by the running app and shared across all components. This can be
150    /// used to pass shared settings or other data from outside the components.
151    pub extra_context: ExtraContext,
152}
153
154impl SourceContext {
155    #[cfg(any(test, feature = "test-utils"))]
156    pub fn new_shutdown(
157        key: &ComponentKey,
158        out: SourceSender,
159    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
160        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
161        let (shutdown_signal, _) = shutdown.register_source(key, false);
162        (
163            Self {
164                key: key.clone(),
165                globals: GlobalOptions::default(),
166                enrichment_tables: Default::default(),
167                shutdown: shutdown_signal,
168                out,
169                proxy: Default::default(),
170                acknowledgements: false,
171                schema_definitions: HashMap::default(),
172                schema: Default::default(),
173                extra_context: Default::default(),
174            },
175            shutdown,
176        )
177    }
178
179    #[cfg(any(test, feature = "test-utils"))]
180    pub fn new_test(
181        out: SourceSender,
182        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
183    ) -> Self {
184        Self {
185            key: ComponentKey::from("default"),
186            globals: GlobalOptions::default(),
187            enrichment_tables: Default::default(),
188            shutdown: ShutdownSignal::noop(),
189            out,
190            proxy: Default::default(),
191            acknowledgements: false,
192            schema_definitions: schema_definitions.unwrap_or_default(),
193            schema: Default::default(),
194            extra_context: Default::default(),
195        }
196    }
197
198    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
199        let config = AcknowledgementsConfig::from(config);
200        if config.enabled() {
201            warn!(
202                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
203                component_id = self.key.id(),
204            );
205        }
206
207        config
208            .merge_default(&self.globals.acknowledgements)
209            .merge_default(&self.acknowledgements.into())
210            .enabled()
211    }
212
213    /// Gets the log namespacing to use. The passed in value is from the source itself
214    /// and will override any global default if it's set.
215    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
216        namespace
217            .or(self.schema.log_namespace)
218            .unwrap_or(false)
219            .into()
220    }
221}