1use std::{cell::RefCell, collections::HashMap, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
6use vector_config_common::{
7 attributes::CustomAttribute,
8 schema::{SchemaGenerator, SchemaObject},
9};
10use vector_config_macros::configurable_component;
11use vector_lib::{
12 config::{
13 AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
14 SourceOutput,
15 },
16 source::Source,
17};
18use vector_vrl_metrics::MetricsStorage;
19
20use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
21use crate::{SourceSender, extra_context::ExtraContext, shutdown::ShutdownSignal};
22
23pub type BoxedSource = Box<dyn SourceConfig>;
24
25impl Configurable for BoxedSource {
26 fn referenceable_name() -> Option<&'static str> {
27 Some("vector::sources::Sources")
28 }
29
30 fn metadata() -> Metadata {
31 let mut metadata = Metadata::default();
32 metadata.set_description("Configurable sources in Vector.");
33 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
34 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
35 metadata
36 }
37
38 fn generate_schema(
39 generator: &RefCell<SchemaGenerator>,
40 ) -> Result<SchemaObject, GenerateError> {
41 vector_lib::configurable::component::SourceDescription::generate_schemas(generator)
42 }
43}
44
45impl<T: SourceConfig + 'static> From<T> for BoxedSource {
46 fn from(value: T) -> Self {
47 Box::new(value)
48 }
49}
50
51#[configurable_component]
53#[configurable(metadata(docs::component_base_type = "source"))]
54#[derive(Clone, Debug)]
55pub struct SourceOuter {
56 #[configurable(derived)]
57 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
58 pub proxy: ProxyConfig,
59
60 #[configurable(derived)]
61 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
62 pub graph: GraphConfig,
63
64 #[serde(default, skip)]
65 pub sink_acknowledgements: bool,
66
67 #[configurable(metadata(docs::hidden))]
68 #[serde(flatten)]
69 pub(crate) inner: BoxedSource,
70}
71
72impl SourceOuter {
73 pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
74 Self {
75 proxy: Default::default(),
76 graph: Default::default(),
77 sink_acknowledgements: false,
78 inner: inner.into(),
79 }
80 }
81}
82
83#[async_trait]
85#[typetag::serde(tag = "type")]
86pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
87 async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
96
97 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
99
100 fn resources(&self) -> Vec<Resource> {
108 Vec::new()
109 }
110
111 fn can_acknowledge(&self) -> bool;
124
125 fn send_timeout(&self) -> Option<Duration> {
128 None
129 }
130}
131
132dyn_clone::clone_trait_object!(SourceConfig);
133
134pub struct SourceContext {
135 pub key: ComponentKey,
136 pub globals: GlobalOptions,
137 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
138 pub metrics_storage: MetricsStorage,
139 pub shutdown: ShutdownSignal,
140 pub out: SourceSender,
141 pub proxy: ProxyConfig,
142 pub acknowledgements: bool,
143 pub schema: schema::Options,
144
145 pub schema_definitions: HashMap<Option<String>, schema::Definition>,
150
151 pub extra_context: ExtraContext,
154}
155
156impl SourceContext {
157 #[cfg(any(test, feature = "test-utils"))]
158 pub fn new_shutdown(
159 key: &ComponentKey,
160 out: SourceSender,
161 ) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
162 let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
163 let (shutdown_signal, _) = shutdown.register_source(key, false);
164 (
165 Self {
166 key: key.clone(),
167 globals: GlobalOptions::default(),
168 enrichment_tables: Default::default(),
169 metrics_storage: Default::default(),
170 shutdown: shutdown_signal,
171 out,
172 proxy: Default::default(),
173 acknowledgements: false,
174 schema_definitions: HashMap::default(),
175 schema: Default::default(),
176 extra_context: Default::default(),
177 },
178 shutdown,
179 )
180 }
181
182 #[cfg(any(test, feature = "test-utils"))]
183 pub fn new_test(
184 out: SourceSender,
185 schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
186 ) -> Self {
187 Self {
188 key: ComponentKey::from("default"),
189 globals: GlobalOptions::default(),
190 enrichment_tables: Default::default(),
191 metrics_storage: Default::default(),
192 shutdown: ShutdownSignal::noop(),
193 out,
194 proxy: Default::default(),
195 acknowledgements: false,
196 schema_definitions: schema_definitions.unwrap_or_default(),
197 schema: Default::default(),
198 extra_context: Default::default(),
199 }
200 }
201
202 pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
203 let config = AcknowledgementsConfig::from(config);
204 if config.enabled() {
205 warn!(
206 message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
207 component_id = self.key.id(),
208 );
209 }
210
211 config
212 .merge_default(&self.globals.acknowledgements)
213 .merge_default(&self.acknowledgements.into())
214 .enabled()
215 }
216
217 pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
220 namespace
221 .or(self.schema.log_namespace)
222 .unwrap_or(false)
223 .into()
224 }
225}