1use std::{cell::RefCell, collections::HashMap};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7 attributes::CustomAttribute,
8 schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12 config::{
13 AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14 SourceOutput,
15 },
16 source::Source,
17};
18
19use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
20use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
21
22pub type BoxedSource = Box<dyn SourceConfig>;
23
24impl Configurable for BoxedSource {
25 fn referenceable_name() -> Option<&'static str> {
26 Some("vector::sources::Sources")
27 }
28
29 fn metadata() -> Metadata {
30 let mut metadata = Metadata::default();
31 metadata.set_description("Configurable sources in Vector.");
32 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
33 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
34 metadata
35 }
36
37 fn generate_schema(
38 generator: &RefCell<SchemaGenerator>,
39 ) -> Result<SchemaObject, GenerateError> {
40 vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
41 }
42}
43
44impl<T: SourceConfig + 'static> From<T> for BoxedSource {
45 fn from(value: T) -> Self {
46 Box::new(value)
47 }
48}
49
50#[configurable_component]
52#[configurable(metadata(docs::component_base_type = "source"))]
53#[derive(Clone, Debug)]
54pub struct SourceOuter {
55 #[configurable(derived)]
56 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
57 pub proxy: ProxyConfig,
58
59 #[configurable(derived)]
60 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
61 pub graph: GraphConfig,
62
63 #[serde(default, skip)]
64 pub sink_acknowledgements: bool,
65
66 #[configurable(metadata(docs::hidden))]
67 #[serde(flatten)]
68 pub(crate) inner: BoxedSource,
69}
70
71impl SourceOuter {
72 pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
73 Self {
74 proxy: Default::default(),
75 graph: Default::default(),
76 sink_acknowledgements: false,
77 inner: inner.into(),
78 }
79 }
80}
81
82#[async_trait]
84#[typetag::serde(tag = "type")]
85pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
86 async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
95
96 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
98
99 fn resources(&self) -> Vec<Resource> {
107 Vec::new()
108 }
109
110 fn can_acknowledge(&self) -> bool;
123}
124
125dyn_clone::clone_trait_object!(SourceConfig);
126
127pub struct SourceContext {
128 pub key: ComponentKey,
129 pub globals: GlobalOptions,
130 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
131 pub shutdown: ShutdownSignal,
132 pub out: SourceSender,
133 pub proxy: ProxyConfig,
134 pub acknowledgements: bool,
135 pub schema: schema::Options,
136
137 pub schema_definitions: HashMap<Option<String>, schema::Definition>,
142
143 pub extra_context: ExtraContext,
146}
147
148impl SourceContext {
149 #[cfg(any(test, feature = "test-utils"))]
150 pub fn new_shutdown(
151 key: &ComponentKey,
152 out: SourceSender,
153 ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
154 let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
155 let (shutdown_signal, _) = shutdown.register_source(key, false);
156 (
157 Self {
158 key: key.clone(),
159 globals: GlobalOptions::default(),
160 enrichment_tables: Default::default(),
161 shutdown: shutdown_signal,
162 out,
163 proxy: Default::default(),
164 acknowledgements: false,
165 schema_definitions: HashMap::default(),
166 schema: Default::default(),
167 extra_context: Default::default(),
168 },
169 shutdown,
170 )
171 }
172
173 #[cfg(any(test, feature = "test-utils"))]
174 pub fn new_test(
175 out: SourceSender,
176 schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
177 ) -> Self {
178 Self {
179 key: ComponentKey::from("default"),
180 globals: GlobalOptions::default(),
181 enrichment_tables: Default::default(),
182 shutdown: ShutdownSignal::noop(),
183 out,
184 proxy: Default::default(),
185 acknowledgements: false,
186 schema_definitions: schema_definitions.unwrap_or_default(),
187 schema: Default::default(),
188 extra_context: Default::default(),
189 }
190 }
191
192 pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
193 let config = AcknowledgementsConfig::from(config);
194 if config.enabled() {
195 warn!(
196 message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
197 component_id = self.key.id(),
198 );
199 }
200
201 config
202 .merge_default(&self.globals.acknowledgements)
203 .merge_default(&self.acknowledgements.into())
204 .enabled()
205 }
206
207 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
210 namespace
211 .or(self.schema.log_namespace)
212 .unwrap_or(false)
213 .into()
214 }
215}