use std::cell::RefCell;
use async_trait::async_trait;
use dyn_clone::DynClone;
use serde::Serialize;
use vector_lib::buffers::{BufferConfig, BufferType};
use vector_lib::configurable::attributes::CustomAttribute;
use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
use vector_lib::configurable::{
configurable_component, Configurable, GenerateError, Metadata, NamedComponent,
};
use vector_lib::{
config::{AcknowledgementsConfig, GlobalOptions, Input},
id::Inputs,
sink::VectorSink,
};
use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
use crate::extra_context::ExtraContext;
use crate::sinks::{util::UriSerde, Healthcheck};
pub type BoxedSink = Box<dyn SinkConfig>;
impl Configurable for BoxedSink {
fn referenceable_name() -> Option<&'static str> {
Some("vector::sinks::Sinks")
}
fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("Configurable sinks in Vector.");
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
metadata
}
fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
vector_lib::configurable::component::SinkDescription::generate_schemas(gen)
}
}
impl<T: SinkConfig + 'static> From<T> for BoxedSink {
fn from(value: T) -> Self {
Box::new(value)
}
}
#[configurable_component]
#[configurable(metadata(docs::component_base_type = "sink"))]
#[derive(Clone, Debug)]
pub struct SinkOuter<T>
where
T: Configurable + Serialize + 'static,
{
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub graph: GraphConfig,
#[configurable(derived)]
pub inputs: Inputs<T>,
#[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))]
healthcheck_uri: Option<UriSerde>,
#[configurable(derived, metadata(docs::advanced))]
#[serde(default, deserialize_with = "crate::serde::bool_or_struct")]
healthcheck: SinkHealthcheckOptions,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
pub buffer: BufferConfig,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
proxy: ProxyConfig,
#[serde(flatten)]
#[configurable(metadata(docs::hidden))]
pub inner: BoxedSink,
}
impl<T> SinkOuter<T>
where
T: Configurable + Serialize,
{
pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
where
I: IntoIterator<Item = T>,
IS: Into<BoxedSink>,
{
SinkOuter {
inputs: Inputs::from_iter(inputs),
buffer: Default::default(),
healthcheck: SinkHealthcheckOptions::default(),
healthcheck_uri: None,
inner: inner.into(),
proxy: Default::default(),
graph: Default::default(),
}
}
pub fn resources(&self, id: &ComponentKey) -> Vec<Resource> {
let mut resources = self.inner.resources();
for stage in self.buffer.stages() {
match stage {
BufferType::Memory { .. } => {}
BufferType::DiskV2 { .. } => resources.push(Resource::DiskBuffer(id.to_string())),
}
}
resources
}
pub fn healthcheck(&self) -> SinkHealthcheckOptions {
if self.healthcheck_uri.is_some() && self.healthcheck.uri.is_some() {
warn!("Both `healthcheck.uri` and `healthcheck_uri` options are specified. Using value of `healthcheck.uri`.")
} else if self.healthcheck_uri.is_some() {
warn!(
"The `healthcheck_uri` option has been deprecated, use `healthcheck.uri` instead."
)
}
SinkHealthcheckOptions {
uri: self
.healthcheck
.uri
.clone()
.or_else(|| self.healthcheck_uri.clone()),
..self.healthcheck.clone()
}
}
pub const fn proxy(&self) -> &ProxyConfig {
&self.proxy
}
pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> SinkOuter<U>
where
U: Configurable + Serialize,
{
let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
self.with_inputs(inputs)
}
pub(crate) fn with_inputs<I, U>(self, inputs: I) -> SinkOuter<U>
where
I: IntoIterator<Item = U>,
U: Configurable + Serialize,
{
SinkOuter {
inputs: Inputs::from_iter(inputs),
inner: self.inner,
buffer: self.buffer,
healthcheck: self.healthcheck,
healthcheck_uri: self.healthcheck_uri,
proxy: self.proxy,
graph: self.graph,
}
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(default)]
pub struct SinkHealthcheckOptions {
pub enabled: bool,
#[configurable(validation(format = "uri"))]
pub uri: Option<UriSerde>,
}
impl Default for SinkHealthcheckOptions {
fn default() -> Self {
Self {
enabled: true,
uri: None,
}
}
}
impl From<bool> for SinkHealthcheckOptions {
fn from(enabled: bool) -> Self {
Self { enabled, uri: None }
}
}
impl From<UriSerde> for SinkHealthcheckOptions {
fn from(uri: UriSerde) -> Self {
Self {
enabled: true,
uri: Some(uri),
}
}
}
#[async_trait]
#[typetag::serde(tag = "type")]
pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)>;
fn input(&self) -> Input;
fn resources(&self) -> Vec<Resource> {
Vec::new()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig;
}
dyn_clone::clone_trait_object!(SinkConfig);
#[derive(Clone)]
pub struct SinkContext {
pub healthcheck: SinkHealthcheckOptions,
pub globals: GlobalOptions,
pub proxy: ProxyConfig,
pub schema: schema::Options,
pub app_name: String,
pub app_name_slug: String,
pub extra_context: ExtraContext,
}
impl Default for SinkContext {
fn default() -> Self {
Self {
healthcheck: Default::default(),
globals: Default::default(),
proxy: Default::default(),
schema: Default::default(),
app_name: crate::get_app_name().to_string(),
app_name_slug: crate::get_slugified_app_name(),
extra_context: Default::default(),
}
}
}
impl SinkContext {
pub const fn globals(&self) -> &GlobalOptions {
&self.globals
}
pub const fn proxy(&self) -> &ProxyConfig {
&self.proxy
}
}