vector/config/
source.rs

1use std::{cell::RefCell, collections::HashMap};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7    attributes::CustomAttribute,
8    schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12    config::{
13        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14        SourceOutput,
15    },
16    source::Source,
17};
18
19use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
20use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
21
22pub type BoxedSource = Box<dyn SourceConfig>;
23
24impl Configurable for BoxedSource {
25    fn referenceable_name() -> Option<&'static str> {
26        Some("vector::sources::Sources")
27    }
28
29    fn metadata() -> Metadata {
30        let mut metadata = Metadata::default();
31        metadata.set_description("Configurable sources in Vector.");
32        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
33        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
34        metadata
35    }
36
37    fn generate_schema(
38        generator: &RefCell<SchemaGenerator>,
39    ) -> Result<SchemaObject, GenerateError> {
40        vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
41    }
42}
43
44impl<T: SourceConfig + 'static> From<T> for BoxedSource {
45    fn from(value: T) -> Self {
46        Box::new(value)
47    }
48}
49
50/// Fully resolved source component.
51#[configurable_component]
52#[configurable(metadata(docs::component_base_type = "source"))]
53#[derive(Clone, Debug)]
54pub struct SourceOuter {
55    #[configurable(derived)]
56    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
57    pub proxy: ProxyConfig,
58
59    #[configurable(derived)]
60    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
61    pub graph: GraphConfig,
62
63    #[serde(default, skip)]
64    pub sink_acknowledgements: bool,
65
66    #[configurable(metadata(docs::hidden))]
67    #[serde(flatten)]
68    pub(crate) inner: BoxedSource,
69}
70
71impl SourceOuter {
72    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
73        Self {
74            proxy: Default::default(),
75            graph: Default::default(),
76            sink_acknowledgements: false,
77            inner: inner.into(),
78        }
79    }
80}
81
82/// Generalized interface for describing and building source components.
83#[async_trait]
84#[typetag::serde(tag = "type")]
85pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
86    /// Builds the source with the given context.
87    ///
88    /// If the source is built successfully, `Ok(...)` is returned containing the source.
89    ///
90    /// # Errors
91    ///
92    /// If an error occurs while building the source, an error variant explaining the issue is
93    /// returned.
94    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
95
96    /// Gets the list of outputs exposed by this source.
97    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
98
99    /// Gets the list of resources, if any, used by this source.
100    ///
101    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
102    /// cannot be shared between components at runtime. This ensures that components can not be
103    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
104    /// Vector to determine the correct order for rebuilding a topology during configuration reload
105    /// when resources must first be reclaimed before being reassigned, and so on.
106    fn resources(&self) -> Vec<Resource> {
107        Vec::new()
108    }
109
110    /// Whether or not this source can acknowledge the events it emits.
111    ///
112    /// Generally, Vector uses acknowledgements to track when an event has finally been processed,
113    /// either successfully or unsuccessfully. While it is used internally in some areas, such as
114    /// within disk buffers for knowing when a message can be deleted from the buffer, it is
115    /// primarily used to signal back to a source that a message has been successfully (durably)
116    /// processed or not.
117    ///
118    /// By exposing whether or not a source supports acknowledgements, we can avoid situations where
119    /// using acknowledgements would only add processing overhead for no benefit to the source, as
120    /// well as emit contextual warnings when end-to-end acknowledgements are enabled, but the
121    /// topology as configured does not actually support the use of end-to-end acknowledgements.
122    fn can_acknowledge(&self) -> bool;
123}
124
125dyn_clone::clone_trait_object!(SourceConfig);
126
127pub struct SourceContext {
128    pub key: ComponentKey,
129    pub globals: GlobalOptions,
130    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
131    pub shutdown: ShutdownSignal,
132    pub out: SourceSender,
133    pub proxy: ProxyConfig,
134    pub acknowledgements: bool,
135    pub schema: schema::Options,
136
137    /// Tracks the schema IDs assigned to schemas exposed by the source.
138    ///
139    /// Given a source can expose multiple [`SourceOutput`] channels, the ID is tied to the identifier of
140    /// that `SourceOutput`.
141    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
142
143    /// Extra context data provided by the running app and shared across all components. This can be
144    /// used to pass shared settings or other data from outside the components.
145    pub extra_context: ExtraContext,
146}
147
148impl SourceContext {
149    #[cfg(any(test, feature = "test-utils"))]
150    pub fn new_shutdown(
151        key: &ComponentKey,
152        out: SourceSender,
153    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
154        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
155        let (shutdown_signal, _) = shutdown.register_source(key, false);
156        (
157            Self {
158                key: key.clone(),
159                globals: GlobalOptions::default(),
160                enrichment_tables: Default::default(),
161                shutdown: shutdown_signal,
162                out,
163                proxy: Default::default(),
164                acknowledgements: false,
165                schema_definitions: HashMap::default(),
166                schema: Default::default(),
167                extra_context: Default::default(),
168            },
169            shutdown,
170        )
171    }
172
173    #[cfg(any(test, feature = "test-utils"))]
174    pub fn new_test(
175        out: SourceSender,
176        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
177    ) -> Self {
178        Self {
179            key: ComponentKey::from("default"),
180            globals: GlobalOptions::default(),
181            enrichment_tables: Default::default(),
182            shutdown: ShutdownSignal::noop(),
183            out,
184            proxy: Default::default(),
185            acknowledgements: false,
186            schema_definitions: schema_definitions.unwrap_or_default(),
187            schema: Default::default(),
188            extra_context: Default::default(),
189        }
190    }
191
192    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
193        let config = AcknowledgementsConfig::from(config);
194        if config.enabled() {
195            warn!(
196                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
197                component_id = self.key.id(),
198            );
199        }
200
201        config
202            .merge_default(&self.globals.acknowledgements)
203            .merge_default(&self.acknowledgements.into())
204            .enabled()
205    }
206
207    /// Gets the log namespacing to use. The passed in value is from the source itself
208    /// and will override any global default if it's set.
209    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
210        namespace
211            .or(self.schema.log_namespace)
212            .unwrap_or(false)
213            .into()
214    }
215}