1use std::{cell::RefCell, collections::HashMap};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7    attributes::CustomAttribute,
8    schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12    config::{
13        AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14        SourceOutput,
15    },
16    source::Source,
17};
18
19use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
20use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
21
22pub type BoxedSource = Box<dyn SourceConfig>;
23
24impl Configurable for BoxedSource {
25    fn referenceable_name() -> Option<&'static str> {
26        Some("vector::sources::Sources")
27    }
28
29    fn metadata() -> Metadata {
30        let mut metadata = Metadata::default();
31        metadata.set_description("Configurable sources in Vector.");
32        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
33        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
34        metadata
35    }
36
37    fn generate_schema(
38        generator: &RefCell<SchemaGenerator>,
39    ) -> Result<SchemaObject, GenerateError> {
40        vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
41    }
42}
43
44impl<T: SourceConfig + 'static> From<T> for BoxedSource {
45    fn from(value: T) -> Self {
46        Box::new(value)
47    }
48}
49
50#[configurable_component]
52#[configurable(metadata(docs::component_base_type = "source"))]
53#[derive(Clone, Debug)]
54pub struct SourceOuter {
55    #[configurable(derived)]
56    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
57    pub proxy: ProxyConfig,
58
59    #[configurable(derived)]
60    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
61    pub graph: GraphConfig,
62
63    #[serde(default, skip)]
64    pub sink_acknowledgements: bool,
65
66    #[configurable(metadata(docs::hidden))]
67    #[serde(flatten)]
68    pub(crate) inner: BoxedSource,
69}
70
71impl SourceOuter {
72    pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
73        Self {
74            proxy: Default::default(),
75            graph: Default::default(),
76            sink_acknowledgements: false,
77            inner: inner.into(),
78        }
79    }
80}
81
82#[async_trait]
84#[typetag::serde(tag = "type")]
85pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
86    async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
95
96    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
98
99    fn resources(&self) -> Vec<Resource> {
107        Vec::new()
108    }
109
110    fn can_acknowledge(&self) -> bool;
123}
124
125dyn_clone::clone_trait_object!(SourceConfig);
126
127pub struct SourceContext {
128    pub key: ComponentKey,
129    pub globals: GlobalOptions,
130    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
131    pub shutdown: ShutdownSignal,
132    pub out: SourceSender,
133    pub proxy: ProxyConfig,
134    pub acknowledgements: bool,
135    pub schema: schema::Options,
136
137    pub schema_definitions: HashMap<Option<String>, schema::Definition>,
142
143    pub extra_context: ExtraContext,
146}
147
148impl SourceContext {
149    #[cfg(any(test, feature = "test-utils"))]
150    pub fn new_shutdown(
151        key: &ComponentKey,
152        out: SourceSender,
153    ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
154        let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
155        let (shutdown_signal, _) = shutdown.register_source(key, false);
156        (
157            Self {
158                key: key.clone(),
159                globals: GlobalOptions::default(),
160                enrichment_tables: Default::default(),
161                shutdown: shutdown_signal,
162                out,
163                proxy: Default::default(),
164                acknowledgements: false,
165                schema_definitions: HashMap::default(),
166                schema: Default::default(),
167                extra_context: Default::default(),
168            },
169            shutdown,
170        )
171    }
172
173    #[cfg(any(test, feature = "test-utils"))]
174    pub fn new_test(
175        out: SourceSender,
176        schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
177    ) -> Self {
178        Self {
179            key: ComponentKey::from("default"),
180            globals: GlobalOptions::default(),
181            enrichment_tables: Default::default(),
182            shutdown: ShutdownSignal::noop(),
183            out,
184            proxy: Default::default(),
185            acknowledgements: false,
186            schema_definitions: schema_definitions.unwrap_or_default(),
187            schema: Default::default(),
188            extra_context: Default::default(),
189        }
190    }
191
192    pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
193        let config = AcknowledgementsConfig::from(config);
194        if config.enabled() {
195            warn!(
196                message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
197                component_id = self.key.id(),
198            );
199        }
200
201        config
202            .merge_default(&self.globals.acknowledgements)
203            .merge_default(&self.acknowledgements.into())
204            .enabled()
205    }
206
207    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
210        namespace
211            .or(self.schema.log_namespace)
212            .unwrap_or(false)
213            .into()
214    }
215}