use std::cell::RefCell;
use std::collections::HashMap;
use async_trait::async_trait;
use dyn_clone::DynClone;
use vector_config::{Configurable, GenerateError, Metadata, NamedComponent};
use vector_config_common::attributes::CustomAttribute;
use vector_config_common::schema::{SchemaGenerator, SchemaObject};
use vector_config_macros::configurable_component;
use vector_lib::{
config::{
AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
SourceOutput,
},
source::Source,
};
use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
use crate::{extra_context::ExtraContext, shutdown::ShutdownSignal, SourceSender};
pub type BoxedSource = Box<dyn SourceConfig>;
impl Configurable for BoxedSource {
fn referenceable_name() -> Option<&'static str> {
Some("vector::sources::Sources")
}
fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("Configurable sources in Vector.");
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
metadata
}
fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
vector_lib::configurable::component::SourceDescription::generate_schemas(gen)
}
}
impl<T: SourceConfig + 'static> From<T> for BoxedSource {
fn from(value: T) -> Self {
Box::new(value)
}
}
#[configurable_component]
#[configurable(metadata(docs::component_base_type = "source"))]
#[derive(Clone, Debug)]
pub struct SourceOuter {
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub proxy: ProxyConfig,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub graph: GraphConfig,
#[serde(default, skip)]
pub sink_acknowledgements: bool,
#[configurable(metadata(docs::hidden))]
#[serde(flatten)]
pub(crate) inner: BoxedSource,
}
impl SourceOuter {
pub(crate) fn new<I: Into<BoxedSource>>(inner: I) -> Self {
Self {
proxy: Default::default(),
graph: Default::default(),
sink_acknowledgements: false,
inner: inner.into(),
}
}
}
#[async_trait]
#[typetag::serde(tag = "type")]
pub trait SourceConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
async fn build(&self, cx: SourceContext) -> crate::Result<Source>;
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput>;
fn resources(&self) -> Vec<Resource> {
Vec::new()
}
fn can_acknowledge(&self) -> bool;
}
dyn_clone::clone_trait_object!(SourceConfig);
pub struct SourceContext {
pub key: ComponentKey,
pub globals: GlobalOptions,
pub shutdown: ShutdownSignal,
pub out: SourceSender,
pub proxy: ProxyConfig,
pub acknowledgements: bool,
pub schema: schema::Options,
pub schema_definitions: HashMap<Option<String>, schema::Definition>,
pub extra_context: ExtraContext,
}
impl SourceContext {
#[cfg(any(test, feature = "test-utils"))]
pub fn new_shutdown(
key: &ComponentKey,
out: SourceSender,
) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(key, false);
(
Self {
key: key.clone(),
globals: GlobalOptions::default(),
shutdown: shutdown_signal,
out,
proxy: Default::default(),
acknowledgements: false,
schema_definitions: HashMap::default(),
schema: Default::default(),
extra_context: Default::default(),
},
shutdown,
)
}
#[cfg(any(test, feature = "test-utils"))]
pub fn new_test(
out: SourceSender,
schema_definitions: Option<HashMap<Option<String>, schema::Definition>>,
) -> Self {
Self {
key: ComponentKey::from("default"),
globals: GlobalOptions::default(),
shutdown: ShutdownSignal::noop(),
out,
proxy: Default::default(),
acknowledgements: false,
schema_definitions: schema_definitions.unwrap_or_default(),
schema: Default::default(),
extra_context: Default::default(),
}
}
pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
let config = AcknowledgementsConfig::from(config);
if config.enabled() {
warn!(
message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
component_id = self.key.id(),
);
}
config
.merge_default(&self.globals.acknowledgements)
.merge_default(&self.acknowledgements.into())
.enabled()
}
pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
namespace
.or(self.schema.log_namespace)
.unwrap_or(false)
.into()
}
}