vector/config/
sink.rs

1use std::{cell::RefCell, path::PathBuf, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use serde::Serialize;
6use serde_with::serde_as;
7use vector_lib::{
8    buffers::{BufferConfig, BufferType},
9    config::{AcknowledgementsConfig, GlobalOptions, Input},
10    configurable::{
11        Configurable, GenerateError, Metadata, NamedComponent,
12        attributes::CustomAttribute,
13        configurable_component,
14        schema::{SchemaGenerator, SchemaObject},
15    },
16    id::Inputs,
17    sink::VectorSink,
18};
19
20use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
21use crate::{
22    extra_context::ExtraContext,
23    sinks::{Healthcheck, util::UriSerde},
24};
25
26pub type BoxedSink = Box<dyn SinkConfig>;
27
28impl Configurable for BoxedSink {
29    fn referenceable_name() -> Option<&'static str> {
30        Some("vector::sinks::Sinks")
31    }
32
33    fn metadata() -> Metadata {
34        let mut metadata = Metadata::default();
35        metadata.set_description("Configurable sinks in Vector.");
36        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
37        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
38        metadata
39    }
40
41    fn generate_schema(
42        generator: &RefCell<SchemaGenerator>,
43    ) -> Result<SchemaObject, GenerateError> {
44        vector_lib::configurable::component::SinkDescription::generate_schemas(generator)
45    }
46}
47
48impl<T: SinkConfig + 'static> From<T> for BoxedSink {
49    fn from(value: T) -> Self {
50        Box::new(value)
51    }
52}
53
54/// Fully resolved sink component.
55#[configurable_component]
56#[configurable(metadata(docs::component_base_type = "sink"))]
57#[derive(Clone, Debug)]
58pub struct SinkOuter<T>
59where
60    T: Configurable + Serialize + 'static,
61{
62    #[configurable(derived)]
63    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
64    pub graph: GraphConfig,
65
66    #[configurable(derived)]
67    pub inputs: Inputs<T>,
68
69    /// The full URI to make HTTP healthcheck requests to.
70    ///
71    /// This must be a valid URI, which requires at least the scheme and host. All other
72    /// components -- port, path, etc -- are allowed as well.
73    #[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))]
74    pub healthcheck_uri: Option<UriSerde>,
75
76    #[configurable(derived, metadata(docs::advanced))]
77    #[serde(default, deserialize_with = "crate::serde::bool_or_struct")]
78    pub healthcheck: SinkHealthcheckOptions,
79
80    #[configurable(derived)]
81    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
82    pub buffer: BufferConfig,
83
84    #[configurable(derived)]
85    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
86    pub proxy: ProxyConfig,
87
88    #[serde(flatten)]
89    #[configurable(metadata(docs::hidden))]
90    pub inner: BoxedSink,
91}
92
93impl<T> SinkOuter<T>
94where
95    T: Configurable + Serialize,
96{
97    pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
98    where
99        I: IntoIterator<Item = T>,
100        IS: Into<BoxedSink>,
101    {
102        SinkOuter {
103            inputs: Inputs::from_iter(inputs),
104            buffer: Default::default(),
105            healthcheck: SinkHealthcheckOptions::default(),
106            healthcheck_uri: None,
107            inner: inner.into(),
108            proxy: Default::default(),
109            graph: Default::default(),
110        }
111    }
112
113    pub fn resources(&self, id: &ComponentKey) -> Vec<Resource> {
114        let mut resources = self.inner.resources();
115        for stage in self.buffer.stages() {
116            match stage {
117                BufferType::Memory { .. } => {}
118                BufferType::DiskV2 { .. } => resources.push(Resource::DiskBuffer(id.to_string())),
119            }
120        }
121        resources
122    }
123
124    pub fn healthcheck(&self) -> SinkHealthcheckOptions {
125        if self.healthcheck_uri.is_some() && self.healthcheck.uri.is_some() {
126            warn!(
127                "Both `healthcheck.uri` and `healthcheck_uri` options are specified. Using value of `healthcheck.uri`."
128            )
129        } else if self.healthcheck_uri.is_some() {
130            warn!(
131                "The `healthcheck_uri` option has been deprecated, use `healthcheck.uri` instead."
132            )
133        }
134        SinkHealthcheckOptions {
135            uri: self
136                .healthcheck
137                .uri
138                .clone()
139                .or_else(|| self.healthcheck_uri.clone()),
140            ..self.healthcheck.clone()
141        }
142    }
143
144    pub const fn proxy(&self) -> &ProxyConfig {
145        &self.proxy
146    }
147
148    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> SinkOuter<U>
149    where
150        U: Configurable + Serialize,
151    {
152        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
153        self.with_inputs(inputs)
154    }
155
156    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> SinkOuter<U>
157    where
158        I: IntoIterator<Item = U>,
159        U: Configurable + Serialize,
160    {
161        SinkOuter {
162            inputs: Inputs::from_iter(inputs),
163            inner: self.inner,
164            buffer: self.buffer,
165            healthcheck: self.healthcheck,
166            healthcheck_uri: self.healthcheck_uri,
167            proxy: self.proxy,
168            graph: self.graph,
169        }
170    }
171}
172
173/// Healthcheck configuration.
174#[serde_as]
175#[configurable_component]
176#[derive(Clone, Debug)]
177#[serde(default)]
178pub struct SinkHealthcheckOptions {
179    /// Whether or not to check the health of the sink when Vector starts up.
180    pub enabled: bool,
181
182    /// Timeout duration for healthcheck in seconds.
183    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
184    #[serde(
185        default = "default_healthcheck_timeout",
186        skip_serializing_if = "is_default_healthcheck_timeout"
187    )]
188    pub timeout: Duration,
189
190    /// The full URI to make HTTP healthcheck requests to.
191    ///
192    /// This must be a valid URI, which requires at least the scheme and host. All other
193    /// components -- port, path, etc -- are allowed as well.
194    #[configurable(validation(format = "uri"))]
195    pub uri: Option<UriSerde>,
196}
197
198const fn default_healthcheck_timeout() -> Duration {
199    Duration::from_secs(10)
200}
201
202fn is_default_healthcheck_timeout(timeout: &Duration) -> bool {
203    timeout == &default_healthcheck_timeout()
204}
205
206impl Default for SinkHealthcheckOptions {
207    fn default() -> Self {
208        Self {
209            enabled: true,
210            uri: None,
211            timeout: default_healthcheck_timeout(),
212        }
213    }
214}
215
216impl From<bool> for SinkHealthcheckOptions {
217    fn from(enabled: bool) -> Self {
218        Self {
219            enabled,
220            ..Default::default()
221        }
222    }
223}
224
225impl From<UriSerde> for SinkHealthcheckOptions {
226    fn from(uri: UriSerde) -> Self {
227        Self {
228            uri: Some(uri),
229            ..Default::default()
230        }
231    }
232}
233
234/// Generalized interface for describing and building sink components.
235#[async_trait]
236#[typetag::serde(tag = "type")]
237pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
238    /// Builds the sink with the given context.
239    ///
240    /// If the sink is built successfully, `Ok(...)` is returned containing the sink and the sink's
241    /// healthcheck.
242    ///
243    /// # Errors
244    ///
245    /// If an error occurs while building the sink, an error variant explaining the issue is
246    /// returned.
247    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)>;
248
249    /// Gets the input configuration for this sink.
250    fn input(&self) -> Input;
251
252    /// Gets the files to watch to trigger reload
253    fn files_to_watch(&self) -> Vec<&PathBuf> {
254        Vec::new()
255    }
256
257    /// Gets the list of resources, if any, used by this sink.
258    ///
259    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
260    /// cannot be shared between components at runtime. This ensures that components can not be
261    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
262    /// Vector to determine the correct order for rebuilding a topology during configuration reload
263    /// when resources must first be reclaimed before being reassigned, and so on.
264    fn resources(&self) -> Vec<Resource> {
265        Vec::new()
266    }
267
268    /// Gets the acknowledgements configuration for this sink.
269    fn acknowledgements(&self) -> &AcknowledgementsConfig;
270}
271
272dyn_clone::clone_trait_object!(SinkConfig);
273
274#[derive(Clone, Debug)]
275pub struct SinkContext {
276    pub healthcheck: SinkHealthcheckOptions,
277    pub globals: GlobalOptions,
278    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
279    pub proxy: ProxyConfig,
280    pub schema: schema::Options,
281    pub app_name: String,
282    pub app_name_slug: String,
283
284    /// Extra context data provided by the running app and shared across all components. This can be
285    /// used to pass shared settings or other data from outside the components.
286    pub extra_context: ExtraContext,
287}
288
289impl Default for SinkContext {
290    fn default() -> Self {
291        Self {
292            healthcheck: Default::default(),
293            globals: Default::default(),
294            enrichment_tables: Default::default(),
295            proxy: Default::default(),
296            schema: Default::default(),
297            app_name: crate::get_app_name().to_string(),
298            app_name_slug: crate::get_slugified_app_name(),
299            extra_context: Default::default(),
300        }
301    }
302}
303
304impl SinkContext {
305    pub const fn globals(&self) -> &GlobalOptions {
306        &self.globals
307    }
308
309    pub const fn proxy(&self) -> &ProxyConfig {
310        &self.proxy
311    }
312}