1use std::cell::RefCell;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use dyn_clone::DynClone;
6use serde::Serialize;
7use serde_with::serde_as;
8use std::path::PathBuf;
9use vector_lib::buffers::{BufferConfig, BufferType};
10use vector_lib::configurable::attributes::CustomAttribute;
11use vector_lib::configurable::schema::{SchemaGenerator, SchemaObject};
12use vector_lib::configurable::{
13 configurable_component, Configurable, GenerateError, Metadata, NamedComponent,
14};
15use vector_lib::{
16 config::{AcknowledgementsConfig, GlobalOptions, Input},
17 id::Inputs,
18 sink::VectorSink,
19};
20
21use super::{dot_graph::GraphConfig, schema, ComponentKey, ProxyConfig, Resource};
22use crate::extra_context::ExtraContext;
23use crate::sinks::{util::UriSerde, Healthcheck};
24
25pub type BoxedSink = Box<dyn SinkConfig>;
26
27impl Configurable for BoxedSink {
28 fn referenceable_name() -> Option<&'static str> {
29 Some("vector::sinks::Sinks")
30 }
31
32 fn metadata() -> Metadata {
33 let mut metadata = Metadata::default();
34 metadata.set_description("Configurable sinks in Vector.");
35 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
36 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
37 metadata
38 }
39
40 fn generate_schema(
41 generator: &RefCell<SchemaGenerator>,
42 ) -> Result<SchemaObject, GenerateError> {
43 vector_lib::configurable::component::SinkDescription::generate_schemas(generator)
44 }
45}
46
47impl<T: SinkConfig + 'static> From<T> for BoxedSink {
48 fn from(value: T) -> Self {
49 Box::new(value)
50 }
51}
52
53#[configurable_component]
55#[configurable(metadata(docs::component_base_type = "sink"))]
56#[derive(Clone, Debug)]
57pub struct SinkOuter<T>
58where
59 T: Configurable + Serialize + 'static,
60{
61 #[configurable(derived)]
62 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
63 pub graph: GraphConfig,
64
65 #[configurable(derived)]
66 pub inputs: Inputs<T>,
67
68 #[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))]
73 pub healthcheck_uri: Option<UriSerde>,
74
75 #[configurable(derived, metadata(docs::advanced))]
76 #[serde(default, deserialize_with = "crate::serde::bool_or_struct")]
77 pub healthcheck: SinkHealthcheckOptions,
78
79 #[configurable(derived)]
80 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
81 pub buffer: BufferConfig,
82
83 #[configurable(derived)]
84 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
85 pub proxy: ProxyConfig,
86
87 #[serde(flatten)]
88 #[configurable(metadata(docs::hidden))]
89 pub inner: BoxedSink,
90}
91
92impl<T> SinkOuter<T>
93where
94 T: Configurable + Serialize,
95{
96 pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
97 where
98 I: IntoIterator<Item = T>,
99 IS: Into<BoxedSink>,
100 {
101 SinkOuter {
102 inputs: Inputs::from_iter(inputs),
103 buffer: Default::default(),
104 healthcheck: SinkHealthcheckOptions::default(),
105 healthcheck_uri: None,
106 inner: inner.into(),
107 proxy: Default::default(),
108 graph: Default::default(),
109 }
110 }
111
112 pub fn resources(&self, id: &ComponentKey) -> Vec<Resource> {
113 let mut resources = self.inner.resources();
114 for stage in self.buffer.stages() {
115 match stage {
116 BufferType::Memory { .. } => {}
117 BufferType::DiskV2 { .. } => resources.push(Resource::DiskBuffer(id.to_string())),
118 }
119 }
120 resources
121 }
122
123 pub fn healthcheck(&self) -> SinkHealthcheckOptions {
124 if self.healthcheck_uri.is_some() && self.healthcheck.uri.is_some() {
125 warn!("Both `healthcheck.uri` and `healthcheck_uri` options are specified. Using value of `healthcheck.uri`.")
126 } else if self.healthcheck_uri.is_some() {
127 warn!(
128 "The `healthcheck_uri` option has been deprecated, use `healthcheck.uri` instead."
129 )
130 }
131 SinkHealthcheckOptions {
132 uri: self
133 .healthcheck
134 .uri
135 .clone()
136 .or_else(|| self.healthcheck_uri.clone()),
137 ..self.healthcheck.clone()
138 }
139 }
140
141 pub const fn proxy(&self) -> &ProxyConfig {
142 &self.proxy
143 }
144
145 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> SinkOuter<U>
146 where
147 U: Configurable + Serialize,
148 {
149 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
150 self.with_inputs(inputs)
151 }
152
153 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> SinkOuter<U>
154 where
155 I: IntoIterator<Item = U>,
156 U: Configurable + Serialize,
157 {
158 SinkOuter {
159 inputs: Inputs::from_iter(inputs),
160 inner: self.inner,
161 buffer: self.buffer,
162 healthcheck: self.healthcheck,
163 healthcheck_uri: self.healthcheck_uri,
164 proxy: self.proxy,
165 graph: self.graph,
166 }
167 }
168}
169
170#[serde_as]
172#[configurable_component]
173#[derive(Clone, Debug)]
174#[serde(default)]
175pub struct SinkHealthcheckOptions {
176 pub enabled: bool,
178
179 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
181 #[serde(
182 default = "default_healthcheck_timeout",
183 skip_serializing_if = "is_default_healthcheck_timeout"
184 )]
185 pub timeout: Duration,
186
187 #[configurable(validation(format = "uri"))]
192 pub uri: Option<UriSerde>,
193}
194
195const fn default_healthcheck_timeout() -> Duration {
196 Duration::from_secs(10)
197}
198
199fn is_default_healthcheck_timeout(timeout: &Duration) -> bool {
200 timeout == &default_healthcheck_timeout()
201}
202
203impl Default for SinkHealthcheckOptions {
204 fn default() -> Self {
205 Self {
206 enabled: true,
207 uri: None,
208 timeout: default_healthcheck_timeout(),
209 }
210 }
211}
212
213impl From<bool> for SinkHealthcheckOptions {
214 fn from(enabled: bool) -> Self {
215 Self {
216 enabled,
217 ..Default::default()
218 }
219 }
220}
221
222impl From<UriSerde> for SinkHealthcheckOptions {
223 fn from(uri: UriSerde) -> Self {
224 Self {
225 uri: Some(uri),
226 ..Default::default()
227 }
228 }
229}
230
231#[async_trait]
233#[typetag::serde(tag = "type")]
234pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
235 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)>;
245
246 fn input(&self) -> Input;
248
249 fn files_to_watch(&self) -> Vec<&PathBuf> {
251 Vec::new()
252 }
253
254 fn resources(&self) -> Vec<Resource> {
262 Vec::new()
263 }
264
265 fn acknowledgements(&self) -> &AcknowledgementsConfig;
267}
268
269dyn_clone::clone_trait_object!(SinkConfig);
270
271#[derive(Clone, Debug)]
272pub struct SinkContext {
273 pub healthcheck: SinkHealthcheckOptions,
274 pub globals: GlobalOptions,
275 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
276 pub proxy: ProxyConfig,
277 pub schema: schema::Options,
278 pub app_name: String,
279 pub app_name_slug: String,
280
281 pub extra_context: ExtraContext,
284}
285
286impl Default for SinkContext {
287 fn default() -> Self {
288 Self {
289 healthcheck: Default::default(),
290 globals: Default::default(),
291 enrichment_tables: Default::default(),
292 proxy: Default::default(),
293 schema: Default::default(),
294 app_name: crate::get_app_name().to_string(),
295 app_name_slug: crate::get_slugified_app_name(),
296 extra_context: Default::default(),
297 }
298 }
299}
300
301impl SinkContext {
302 pub const fn globals(&self) -> &GlobalOptions {
303 &self.globals
304 }
305
306 pub const fn proxy(&self) -> &ProxyConfig {
307 &self.proxy
308 }
309}