1use std::{cell::RefCell, collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7 attributes::CustomAttribute,
8 schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12 config::{
13 AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14 SourceOutput,
15 },
16 source::Source,
17};
18
19use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
20use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
21
22pub type BoxedSource = Box<dyn SourceConfig>;
23
24impl Configurable for BoxedSource {
25 fn referenceable_name() -> Option<&'static str> {
26 Some("vector::sources::Sources")
27 }
28
29 fn metadata() -> Metadata {
30 let mut metadata = Metadata::default();
31 metadata.set_description("Configurable sources in Vector.");
32 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
33 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
34 metadata
35 }
36
37 fn generate_schema(
38 generator: &RefCell<SchemaGenerator>,
39 ) -> Result<SchemaObject, GenerateError> {
40 vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
41 }
42}
43
44impl<T: SourceConfig + 'static> From<T> for BoxedSource {
45 fn from(value: T) -> Self {
46 Box::new(value)
47 }
48}
49
50#[configurable_component]
52#[configurable(metadata(docs::component_base_type = "source"))]
53#[derive(Clone, Debug)]
54pub struct SourceOuter {
55 #[configurable(derived)]
56 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
57 pub proxy: ProxyConfig,
58
59 #[configurable(derived)]
60 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
61 pub graph: GraphConfig,
62
63 #[serde(default, skip)]
64 pub sink_acknowledgements: bool,
65
66 #[configurable(metadata(docs::hidden))]
67 #[serde(flatten)]
68 pub(crate) inner: BoxedSource,
69}
70
71impl SourceOuter {
72 pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
73 Self {
74 proxy: Default::default(),
75 graph: Default::default(),
76 sink_acknowledgements: false,
77 inner: inner.into(),
78 }
79 }
80}
81
82#[async_trait]
84#[typetag::serde(tag = "type")]
85pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
86 async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
95
96 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
98
99 fn resources(&self) -> Vec<Resource> {
107 Vec::new()
108 }
109
110 fn can_acknowledge(&self) -> bool;
123
124 fn send_timeout(&self) -> Option<Duration> {
127 None
128 }
129}
130
131dyn_clone::clone_trait_object!(SourceConfig);
132
133pub struct SourceContext {
134 pub key: ComponentKey,
135 pub globals: GlobalOptions,
136 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
137 pub shutdown: ShutdownSignal,
138 pub out: SourceSender,
139 pub proxy: ProxyConfig,
140 pub acknowledgements: bool,
141 pub schema: schema::Options,
142
143 pub schema_definitions: HashMap<Option<String>, schema::Definition>,
148
149 pub extra_context: ExtraContext,
152}
153
154impl SourceContext {
155 #[cfg(any(test, feature = "test-utils"))]
156 pub fn new_shutdown(
157 key: &ComponentKey,
158 out: SourceSender,
159 ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
160 let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
161 let (shutdown_signal, _) = shutdown.register_source(key, false);
162 (
163 Self {
164 key: key.clone(),
165 globals: GlobalOptions::default(),
166 enrichment_tables: Default::default(),
167 shutdown: shutdown_signal,
168 out,
169 proxy: Default::default(),
170 acknowledgements: false,
171 schema_definitions: HashMap::default(),
172 schema: Default::default(),
173 extra_context: Default::default(),
174 },
175 shutdown,
176 )
177 }
178
179 #[cfg(any(test, feature = "test-utils"))]
180 pub fn new_test(
181 out: SourceSender,
182 schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
183 ) -> Self {
184 Self {
185 key: ComponentKey::from("default"),
186 globals: GlobalOptions::default(),
187 enrichment_tables: Default::default(),
188 shutdown: ShutdownSignal::noop(),
189 out,
190 proxy: Default::default(),
191 acknowledgements: false,
192 schema_definitions: schema_definitions.unwrap_or_default(),
193 schema: Default::default(),
194 extra_context: Default::default(),
195 }
196 }
197
198 pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
199 let config = AcknowledgementsConfig::from(config);
200 if config.enabled() {
201 warn!(
202 message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
203 component_id = self.key.id(),
204 );
205 }
206
207 config
208 .merge_default(&self.globals.acknowledgements)
209 .merge_default(&self.acknowledgements.into())
210 .enabled()
211 }
212
213 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
216 namespace
217 .or(self.schema.log_namespace)
218 .unwrap_or(false)
219 .into()
220 }
221}