vector/config/
sink.rs

1use std::cell::RefCell;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use dyn_clone::DynClone;
6use serde::Serialize;
7use serde_with::serde_as;
8use std::path::PathBuf;
9use vector_lib::buffers::{BufferConfig, BufferType};
10use vector_lib::configurable::attributes::CustomAttribute;
11use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
12use vector_lib::configurable::{
13    configurable_component, Configurable, GenerateError, Metadata, NamedComponent,
14};
15use vector_lib::{
16    config::{AcknowledgementsConfig, GlobalOptions, Input},
17    id::Inputs,
18    sink::VectorSink,
19};
20
21use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
22use crate::extra_context::ExtraContext;
23use crate::sinks::{util::UriSerde, Healthcheck};
24
25pub type BoxedSink = Box<dyn SinkConfig>;
26
27impl Configurable for BoxedSink {
28    fn referenceable_name() -> Option<&'static str> {
29        Some("vector::sinks::Sinks")
30    }
31
32    fn metadata() -> Metadata {
33        let mut metadata = Metadata::default();
34        metadata.set_description("Configurable sinks in Vector.");
35        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
36        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
37        metadata
38    }
39
40    fn generate_schema(
41        generator: &RefCell<SchemaGenerator>,
42    ) -> Result<SchemaObject, GenerateError> {
43        vector_lib::configurable::component::SinkDescription::generate_schemas(generator)
44    }
45}
46
47impl<T: SinkConfig + 'static> From<T> for BoxedSink {
48    fn from(value: T) -> Self {
49        Box::new(value)
50    }
51}
52
53/// Fully resolved sink component.
54#[configurable_component]
55#[configurable(metadata(docs::component_base_type = "sink"))]
56#[derive(Clone, Debug)]
57pub struct SinkOuter<T>
58where
59    T: Configurable + Serialize + 'static,
60{
61    #[configurable(derived)]
62    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
63    pub graph: GraphConfig,
64
65    #[configurable(derived)]
66    pub inputs: Inputs<T>,
67
68    /// The full URI to make HTTP healthcheck requests to.
69    ///
70    /// This must be a valid URI, which requires at least the scheme and host. All other
71    /// components -- port, path, etc -- are allowed as well.
72    #[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))]
73    pub healthcheck_uri: Option<UriSerde>,
74
75    #[configurable(derived, metadata(docs::advanced))]
76    #[serde(default, deserialize_with = "crate::serde::bool_or_struct")]
77    pub healthcheck: SinkHealthcheckOptions,
78
79    #[configurable(derived)]
80    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
81    pub buffer: BufferConfig,
82
83    #[configurable(derived)]
84    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
85    pub proxy: ProxyConfig,
86
87    #[serde(flatten)]
88    #[configurable(metadata(docs::hidden))]
89    pub inner: BoxedSink,
90}
91
92impl<T> SinkOuter<T>
93where
94    T: Configurable + Serialize,
95{
96    pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
97    where
98        I: IntoIterator<Item = T>,
99        IS: Into<BoxedSink>,
100    {
101        SinkOuter {
102            inputs: Inputs::from_iter(inputs),
103            buffer: Default::default(),
104            healthcheck: SinkHealthcheckOptions::default(),
105            healthcheck_uri: None,
106            inner: inner.into(),
107            proxy: Default::default(),
108            graph: Default::default(),
109        }
110    }
111
112    pub fn resources(&self, id: &ComponentKey) -> Vec<Resource> {
113        let mut resources = self.inner.resources();
114        for stage in self.buffer.stages() {
115            match stage {
116                BufferType::Memory { .. } => {}
117                BufferType::DiskV2 { .. } => resources.push(Resource::DiskBuffer(id.to_string())),
118            }
119        }
120        resources
121    }
122
123    pub fn healthcheck(&self) -> SinkHealthcheckOptions {
124        if self.healthcheck_uri.is_some() && self.healthcheck.uri.is_some() {
125            warn!("Both `healthcheck.uri` and `healthcheck_uri` options are specified. Using value of `healthcheck.uri`.")
126        } else if self.healthcheck_uri.is_some() {
127            warn!(
128                "The `healthcheck_uri` option has been deprecated, use `healthcheck.uri` instead."
129            )
130        }
131        SinkHealthcheckOptions {
132            uri: self
133                .healthcheck
134                .uri
135                .clone()
136                .or_else(|| self.healthcheck_uri.clone()),
137            ..self.healthcheck.clone()
138        }
139    }
140
141    pub const fn proxy(&self) -> &ProxyConfig {
142        &self.proxy
143    }
144
145    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> SinkOuter<U>
146    where
147        U: Configurable + Serialize,
148    {
149        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
150        self.with_inputs(inputs)
151    }
152
153    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> SinkOuter<U>
154    where
155        I: IntoIterator<Item = U>,
156        U: Configurable + Serialize,
157    {
158        SinkOuter {
159            inputs: Inputs::from_iter(inputs),
160            inner: self.inner,
161            buffer: self.buffer,
162            healthcheck: self.healthcheck,
163            healthcheck_uri: self.healthcheck_uri,
164            proxy: self.proxy,
165            graph: self.graph,
166        }
167    }
168}
169
170/// Healthcheck configuration.
171#[serde_as]
172#[configurable_component]
173#[derive(Clone, Debug)]
174#[serde(default)]
175pub struct SinkHealthcheckOptions {
176    /// Whether or not to check the health of the sink when Vector starts up.
177    pub enabled: bool,
178
179    /// Timeout duration for healthcheck in seconds.
180    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
181    #[serde(
182        default = "default_healthcheck_timeout",
183        skip_serializing_if = "is_default_healthcheck_timeout"
184    )]
185    pub timeout: Duration,
186
187    /// The full URI to make HTTP healthcheck requests to.
188    ///
189    /// This must be a valid URI, which requires at least the scheme and host. All other
190    /// components -- port, path, etc -- are allowed as well.
191    #[configurable(validation(format = "uri"))]
192    pub uri: Option<UriSerde>,
193}
194
195const fn default_healthcheck_timeout() -> Duration {
196    Duration::from_secs(10)
197}
198
199fn is_default_healthcheck_timeout(timeout: &Duration) -> bool {
200    timeout == &default_healthcheck_timeout()
201}
202
203impl Default for SinkHealthcheckOptions {
204    fn default() -> Self {
205        Self {
206            enabled: true,
207            uri: None,
208            timeout: default_healthcheck_timeout(),
209        }
210    }
211}
212
213impl From<bool> for SinkHealthcheckOptions {
214    fn from(enabled: bool) -> Self {
215        Self {
216            enabled,
217            ..Default::default()
218        }
219    }
220}
221
222impl From<UriSerde> for SinkHealthcheckOptions {
223    fn from(uri: UriSerde) -> Self {
224        Self {
225            uri: Some(uri),
226            ..Default::default()
227        }
228    }
229}
230
231/// Generalized interface for describing and building sink components.
232#[async_trait]
233#[typetag::serde(tag = "type")]
234pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
235    /// Builds the sink with the given context.
236    ///
237    /// If the sink is built successfully, `Ok(...)` is returned containing the sink and the sink's
238    /// healthcheck.
239    ///
240    /// # Errors
241    ///
242    /// If an error occurs while building the sink, an error variant explaining the issue is
243    /// returned.
244    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)>;
245
246    /// Gets the input configuration for this sink.
247    fn input(&self) -> Input;
248
249    /// Gets the files to watch to trigger reload
250    fn files_to_watch(&self) -> Vec<&PathBuf> {
251        Vec::new()
252    }
253
254    /// Gets the list of resources, if any, used by this sink.
255    ///
256    /// Resources represent dependencies -- network ports, file descriptors, and so on -- that
257    /// cannot be shared between components at runtime. This ensures that components can not be
258    /// configured in a way that would deadlock the spawning of a topology, and as well, allows
259    /// Vector to determine the correct order for rebuilding a topology during configuration reload
260    /// when resources must first be reclaimed before being reassigned, and so on.
261    fn resources(&self) -> Vec<Resource> {
262        Vec::new()
263    }
264
265    /// Gets the acknowledgements configuration for this sink.
266    fn acknowledgements(&self) -> &AcknowledgementsConfig;
267}
268
269dyn_clone::clone_trait_object!(SinkConfig);
270
271#[derive(Clone, Debug)]
272pub struct SinkContext {
273    pub healthcheck: SinkHealthcheckOptions,
274    pub globals: GlobalOptions,
275    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
276    pub proxy: ProxyConfig,
277    pub schema: schema::Options,
278    pub app_name: String,
279    pub app_name_slug: String,
280
281    /// Extra context data provided by the running app and shared across all components. This can be
282    /// used to pass shared settings or other data from outside the components.
283    pub extra_context: ExtraContext,
284}
285
286impl Default for SinkContext {
287    fn default() -> Self {
288        Self {
289            healthcheck: Default::default(),
290            globals: Default::default(),
291            enrichment_tables: Default::default(),
292            proxy: Default::default(),
293            schema: Default::default(),
294            app_name: crate::get_app_name().to_string(),
295            app_name_slug: crate::get_slugified_app_name(),
296            extra_context: Default::default(),
297        }
298    }
299}
300
301impl SinkContext {
302    pub const fn globals(&self) -> &GlobalOptions {
303        &self.globals
304    }
305
306    pub const fn proxy(&self) -> &ProxyConfig {
307        &self.proxy
308    }
309}