vector/config/
source.rs

1use std::{cell::RefCell, collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7    attributes::CustomAttribute,
8    schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12    config::{
13        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14        SourceOutput,
15    },
16    source::Source,
17};
18use vector_vrl_metrics::MetricsStorage;
19
20use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
21use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
22
23pub type BoxedSource = Box<dyn SourceConfig>;
24
25impl Configurable for BoxedSource {
26    fn referenceable_name() -> Option<&'static str> {
27        Some("vector::sources::Sources")
28    }
29
30    fn metadata() -> Metadata {
31        let mut metadata = Metadata::default();
32        metadata.set_description("Configurable sources in Vector.");
33        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
34        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
35        metadata
36    }
37
38    fn generate_schema(
39        generator: &RefCell<SchemaGenerator>,
40    ) -> Result<SchemaObject, GenerateError> {
41        vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
42    }
43}
44
45impl<T: SourceConfig + 'static> From<T> for BoxedSource {
46    fn from(value: T) -> Self {
47        Box::new(value)
48    }
49}
50
51/// Fully resolved source component.
52#[configurable_component]
53#[configurable(metadata(docs::component_base_type = "source"))]
54#[derive(Clone, Debug)]
55pub struct SourceOuter {
56    #[configurable(derived)]
57    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
58    pub proxy: ProxyConfig,
59
60    #[configurable(derived)]
61    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
62    pub graph: GraphConfig,
63
64    #[serde(default, skip)]
65    pub sink_acknowledgements: bool,
66
67    #[configurable(metadata(docs::hidden))]
68    #[serde(flatten)]
69    pub(crate) inner: BoxedSource,
70}
71
72impl SourceOuter {
73    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
74        Self {
75            proxy: Default::default(),
76            graph: Default::default(),
77            sink_acknowledgements: false,
78            inner: inner.into(),
79        }
80    }
81}
82
83/// Generalized interface for describing and building source components.
84#[async_trait]
85#[typetag::serde(tag = "type")]
86pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
87    /// Builds the source with the given context.
88    ///
89    /// If the source is built successfully, `Ok(...)` is returned containing the source.
90    ///
91    /// # Errors
92    ///
93    /// If an error occurs while building the source, an error variant explaining the issue is
94    /// returned.
95    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
96
97    /// Gets the list of outputs exposed by this source.
98    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
99
100    /// Gets the list of resources, if any, used by this source.
101    ///
102    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
103    /// cannot be shared between components at runtime. This ensures that components can not be
104    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
105    /// Vector to determine the correct order for rebuilding a topology during configuration reload
106    /// when resources must first be reclaimed before being reassigned, and so on.
107    fn resources(&self) -> Vec<Resource> {
108        Vec::new()
109    }
110
111    /// Whether or not this source can acknowledge the events it emits.
112    ///
113    /// Generally, Vector uses acknowledgements to track when an event has finally been processed,
114    /// either successfully or unsuccessfully. While it is used internally in some areas, such as
115    /// within disk buffers for knowing when a message can be deleted from the buffer, it is
116    /// primarily used to signal back to a source that a message has been successfully (durably)
117    /// processed or not.
118    ///
119    /// By exposing whether or not a source supports acknowledgements, we can avoid situations where
120    /// using acknowledgements would only add processing overhead for no benefit to the source, as
121    /// well as emit contextual warnings when end-to-end acknowledgements are enabled, but the
122    /// topology as configured does not actually support the use of end-to-end acknowledgements.
123    fn can_acknowledge(&self) -> bool;
124
125    /// If this source supports timeout returns from the `SourceSender` and the configuration
126    /// provides a timeout value, return it here and the `out` channel will be configured with it.
127    fn send_timeout(&self) -> Option<Duration> {
128        None
129    }
130}
131
132dyn_clone::clone_trait_object!(SourceConfig);
133
134pub struct SourceContext {
135    pub key: ComponentKey,
136    pub globals: GlobalOptions,
137    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
138    pub metrics_storage: MetricsStorage,
139    pub shutdown: ShutdownSignal,
140    pub out: SourceSender,
141    pub proxy: ProxyConfig,
142    pub acknowledgements: bool,
143    pub schema: schema::Options,
144
145    /// Tracks the schema IDs assigned to schemas exposed by the source.
146    ///
147    /// Given a source can expose multiple [`SourceOutput`] channels, the ID is tied to the identifier of
148    /// that `SourceOutput`.
149    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
150
151    /// Extra context data provided by the running app and shared across all components. This can be
152    /// used to pass shared settings or other data from outside the components.
153    pub extra_context: ExtraContext,
154}
155
156impl SourceContext {
157    #[cfg(any(test, feature = "test-utils"))]
158    pub fn new_shutdown(
159        key: &ComponentKey,
160        out: SourceSender,
161    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
162        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
163        let (shutdown_signal, _) = shutdown.register_source(key, false);
164        (
165            Self {
166                key: key.clone(),
167                globals: GlobalOptions::default(),
168                enrichment_tables: Default::default(),
169                metrics_storage: Default::default(),
170                shutdown: shutdown_signal,
171                out,
172                proxy: Default::default(),
173                acknowledgements: false,
174                schema_definitions: HashMap::default(),
175                schema: Default::default(),
176                extra_context: Default::default(),
177            },
178            shutdown,
179        )
180    }
181
182    #[cfg(any(test, feature = "test-utils"))]
183    pub fn new_test(
184        out: SourceSender,
185        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
186    ) -> Self {
187        Self {
188            key: ComponentKey::from("default"),
189            globals: GlobalOptions::default(),
190            enrichment_tables: Default::default(),
191            metrics_storage: Default::default(),
192            shutdown: ShutdownSignal::noop(),
193            out,
194            proxy: Default::default(),
195            acknowledgements: false,
196            schema_definitions: schema_definitions.unwrap_or_default(),
197            schema: Default::default(),
198            extra_context: Default::default(),
199        }
200    }
201
202    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
203        let config = AcknowledgementsConfig::from(config);
204        if config.enabled() {
205            warn!(
206                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
207                component_id = self.key.id(),
208            );
209        }
210
211        config
212            .merge_default(&self.globals.acknowledgements)
213            .merge_default(&self.acknowledgements.into())
214            .enabled()
215    }
216
217    /// Gets the log namespacing to use. The passed in value is from the source itself
218    /// and will override any global default if it's set.
219    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
220        namespace
221            .or(self.schema.log_namespace)
222            .unwrap_or(false)
223            .into()
224    }
225}