1use std::cell::RefCell;
2use std::collections::HashMap;
3
4use async_trait::async_trait;
5use dyn_clone::DynClone;
6use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
7use vector_config_common::attributes::CustomAttribute;
8use vector_config_common::schema::{SchemaGenerator, SchemaObject};
9use vector_config_macros::configurable_component;
10use vector_lib::{
11 config::{
12 AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
13 SourceOutput,
14 },
15 source::Source,
16};
17
18use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
19use crate::{extra_context::ExtraContext, shutdown::ShutdownSignal, SourceSender};
20
21pub type BoxedSource = Box<dyn SourceConfig>;
22
23impl Configurable for BoxedSource {
24 fn referenceable_name() -> Option<&'static str> {
25 Some("vector::sources::Sources")
26 }
27
28 fn metadata() -> Metadata {
29 let mut metadata = Metadata::default();
30 metadata.set_description("Configurable sources in Vector.");
31 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
32 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
33 metadata
34 }
35
36 fn generate_schema(
37 generator: &RefCell<SchemaGenerator>,
38 ) -> Result<SchemaObject, GenerateError> {
39 vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
40 }
41}
42
43impl<T: SourceConfig + 'static> From<T> for BoxedSource {
44 fn from(value: T) -> Self {
45 Box::new(value)
46 }
47}
48
49#[configurable_component]
51#[configurable(metadata(docs::component_base_type = "source"))]
52#[derive(Clone, Debug)]
53pub struct SourceOuter {
54 #[configurable(derived)]
55 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
56 pub proxy: ProxyConfig,
57
58 #[configurable(derived)]
59 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
60 pub graph: GraphConfig,
61
62 #[serde(default, skip)]
63 pub sink_acknowledgements: bool,
64
65 #[configurable(metadata(docs::hidden))]
66 #[serde(flatten)]
67 pub(crate) inner: BoxedSource,
68}
69
70impl SourceOuter {
71 pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
72 Self {
73 proxy: Default::default(),
74 graph: Default::default(),
75 sink_acknowledgements: false,
76 inner: inner.into(),
77 }
78 }
79}
80
81#[async_trait]
83#[typetag::serde(tag = "type")]
84pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
85 async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
94
95 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
97
98 fn resources(&self) -> Vec<Resource> {
106 Vec::new()
107 }
108
109 fn can_acknowledge(&self) -> bool;
122}
123
124dyn_clone::clone_trait_object!(SourceConfig);
125
126pub struct SourceContext {
127 pub key: ComponentKey,
128 pub globals: GlobalOptions,
129 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
130 pub shutdown: ShutdownSignal,
131 pub out: SourceSender,
132 pub proxy: ProxyConfig,
133 pub acknowledgements: bool,
134 pub schema: schema::Options,
135
136 pub schema_definitions: HashMap<Option<String>, schema::Definition>,
141
142 pub extra_context: ExtraContext,
145}
146
147impl SourceContext {
148 #[cfg(any(test, feature = "test-utils"))]
149 pub fn new_shutdown(
150 key: &ComponentKey,
151 out: SourceSender,
152 ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
153 let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
154 let (shutdown_signal, _) = shutdown.register_source(key, false);
155 (
156 Self {
157 key: key.clone(),
158 globals: GlobalOptions::default(),
159 enrichment_tables: Default::default(),
160 shutdown: shutdown_signal,
161 out,
162 proxy: Default::default(),
163 acknowledgements: false,
164 schema_definitions: HashMap::default(),
165 schema: Default::default(),
166 extra_context: Default::default(),
167 },
168 shutdown,
169 )
170 }
171
172 #[cfg(any(test, feature = "test-utils"))]
173 pub fn new_test(
174 out: SourceSender,
175 schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
176 ) -> Self {
177 Self {
178 key: ComponentKey::from("default"),
179 globals: GlobalOptions::default(),
180 enrichment_tables: Default::default(),
181 shutdown: ShutdownSignal::noop(),
182 out,
183 proxy: Default::default(),
184 acknowledgements: false,
185 schema_definitions: schema_definitions.unwrap_or_default(),
186 schema: Default::default(),
187 extra_context: Default::default(),
188 }
189 }
190
191 pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
192 let config = AcknowledgementsConfig::from(config);
193 if config.enabled() {
194 warn!(
195 message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
196 component_id = self.key.id(),
197 );
198 }
199
200 config
201 .merge_default(&self.globals.acknowledgements)
202 .merge_default(&self.acknowledgements.into())
203 .enabled()
204 }
205
206 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
209 namespace
210 .or(self.schema.log_namespace)
211 .unwrap_or(false)
212 .into()
213 }
214}