1use std::{cell::RefCell, path::PathBuf, time::Duration};
2
3use async_trait::async_trait;
4use dyn_clone::DynClone;
5use serde::Serialize;
6use serde_with::serde_as;
7use vector_lib::{
8 buffers::{BufferConfig, BufferType},
9 config::{AcknowledgementsConfig, GlobalOptions, Input},
10 configurable::{
11 Configurable, GenerateError, Metadata, NamedComponent,
12 attributes::CustomAttribute,
13 configurable_component,
14 schema::{SchemaGenerator, SchemaObject},
15 },
16 id::Inputs,
17 sink::VectorSink,
18};
19
20use super::{ComponentKey, ProxyConfig, Resource, dot_graph::GraphConfig, schema};
21use crate::{
22 extra_context::ExtraContext,
23 sinks::{Healthcheck, util::UriSerde},
24};
25
26pub type BoxedSink = Box<dyn SinkConfig>;
27
28impl Configurable for BoxedSink {
29 fn referenceable_name() -> Option<&'static str> {
30 Some("vector::sinks::Sinks")
31 }
32
33 fn metadata() -> Metadata {
34 let mut metadata = Metadata::default();
35 metadata.set_description("Configurable sinks in Vector.");
36 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
37 metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
38 metadata
39 }
40
41 fn generate_schema(
42 generator: &RefCell<SchemaGenerator>,
43 ) -> Result<SchemaObject, GenerateError> {
44 vector_lib::configurable::component::SinkDescription::generate_schemas(generator)
45 }
46}
47
48impl<T: SinkConfig + 'static> From<T> for BoxedSink {
49 fn from(value: T) -> Self {
50 Box::new(value)
51 }
52}
53
54#[configurable_component]
56#[configurable(metadata(docs::component_base_type = "sink"))]
57#[derive(Clone, Debug)]
58pub struct SinkOuter<T>
59where
60 T: Configurable + Serialize + 'static,
61{
62 #[configurable(derived)]
63 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
64 pub graph: GraphConfig,
65
66 #[configurable(derived)]
67 pub inputs: Inputs<T>,
68
69 #[configurable(deprecated, metadata(docs::hidden), validation(format = "uri"))]
74 pub healthcheck_uri: Option<UriSerde>,
75
76 #[configurable(derived, metadata(docs::advanced))]
77 #[serde(default, deserialize_with = "crate::serde::bool_or_struct")]
78 pub healthcheck: SinkHealthcheckOptions,
79
80 #[configurable(derived)]
81 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
82 pub buffer: BufferConfig,
83
84 #[configurable(derived)]
85 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
86 pub proxy: ProxyConfig,
87
88 #[serde(flatten)]
89 #[configurable(metadata(docs::hidden))]
90 pub inner: BoxedSink,
91}
92
93impl<T> SinkOuter<T>
94where
95 T: Configurable + Serialize,
96{
97 pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
98 where
99 I: IntoIterator<Item = T>,
100 IS: Into<BoxedSink>,
101 {
102 SinkOuter {
103 inputs: Inputs::from_iter(inputs),
104 buffer: Default::default(),
105 healthcheck: SinkHealthcheckOptions::default(),
106 healthcheck_uri: None,
107 inner: inner.into(),
108 proxy: Default::default(),
109 graph: Default::default(),
110 }
111 }
112
113 pub fn resources(&self, id: &ComponentKey) -> Vec<Resource> {
114 let mut resources = self.inner.resources();
115 for stage in self.buffer.stages() {
116 match stage {
117 BufferType::Memory { .. } => {}
118 BufferType::DiskV2 { .. } => resources.push(Resource::DiskBuffer(id.to_string())),
119 }
120 }
121 resources
122 }
123
124 pub fn healthcheck(&self) -> SinkHealthcheckOptions {
125 if self.healthcheck_uri.is_some() && self.healthcheck.uri.is_some() {
126 warn!(
127 "Both `healthcheck.uri` and `healthcheck_uri` options are specified. Using value of `healthcheck.uri`."
128 )
129 } else if self.healthcheck_uri.is_some() {
130 warn!(
131 "The `healthcheck_uri` option has been deprecated, use `healthcheck.uri` instead."
132 )
133 }
134 SinkHealthcheckOptions {
135 uri: self
136 .healthcheck
137 .uri
138 .clone()
139 .or_else(|| self.healthcheck_uri.clone()),
140 ..self.healthcheck.clone()
141 }
142 }
143
144 pub const fn proxy(&self) -> &ProxyConfig {
145 &self.proxy
146 }
147
148 pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> SinkOuter<U>
149 where
150 U: Configurable + Serialize,
151 {
152 let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
153 self.with_inputs(inputs)
154 }
155
156 pub(crate) fn with_inputs<I, U>(self, inputs: I) -> SinkOuter<U>
157 where
158 I: IntoIterator<Item = U>,
159 U: Configurable + Serialize,
160 {
161 SinkOuter {
162 inputs: Inputs::from_iter(inputs),
163 inner: self.inner,
164 buffer: self.buffer,
165 healthcheck: self.healthcheck,
166 healthcheck_uri: self.healthcheck_uri,
167 proxy: self.proxy,
168 graph: self.graph,
169 }
170 }
171}
172
173#[serde_as]
175#[configurable_component]
176#[derive(Clone, Debug)]
177#[serde(default)]
178pub struct SinkHealthcheckOptions {
179 pub enabled: bool,
181
182 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
184 #[serde(
185 default = "default_healthcheck_timeout",
186 skip_serializing_if = "is_default_healthcheck_timeout"
187 )]
188 pub timeout: Duration,
189
190 #[configurable(validation(format = "uri"))]
195 pub uri: Option<UriSerde>,
196}
197
198const fn default_healthcheck_timeout() -> Duration {
199 Duration::from_secs(10)
200}
201
202fn is_default_healthcheck_timeout(timeout: &Duration) -> bool {
203 timeout == &default_healthcheck_timeout()
204}
205
206impl Default for SinkHealthcheckOptions {
207 fn default() -> Self {
208 Self {
209 enabled: true,
210 uri: None,
211 timeout: default_healthcheck_timeout(),
212 }
213 }
214}
215
216impl From<bool> for SinkHealthcheckOptions {
217 fn from(enabled: bool) -> Self {
218 Self {
219 enabled,
220 ..Default::default()
221 }
222 }
223}
224
225impl From<UriSerde> for SinkHealthcheckOptions {
226 fn from(uri: UriSerde) -> Self {
227 Self {
228 uri: Some(uri),
229 ..Default::default()
230 }
231 }
232}
233
234#[async_trait]
236#[typetag::serde(tag = "type")]
237pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
238 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)>;
248
249 fn input(&self) -> Input;
251
252 fn files_to_watch(&self) -> Vec<&PathBuf> {
254 Vec::new()
255 }
256
257 fn resources(&self) -> Vec<Resource> {
265 Vec::new()
266 }
267
268 fn acknowledgements(&self) -> &AcknowledgementsConfig;
270}
271
272dyn_clone::clone_trait_object!(SinkConfig);
273
274#[derive(Clone, Debug)]
275pub struct SinkContext {
276 pub healthcheck: SinkHealthcheckOptions,
277 pub globals: GlobalOptions,
278 pub enrichment_tables: vector_lib::enrichment::TableRegistry,
279 pub proxy: ProxyConfig,
280 pub schema: schema::Options,
281 pub app_name: String,
282 pub app_name_slug: String,
283
284 pub extra_context: ExtraContext,
287}
288
289impl Default for SinkContext {
290 fn default() -> Self {
291 Self {
292 healthcheck: Default::default(),
293 globals: Default::default(),
294 enrichment_tables: Default::default(),
295 proxy: Default::default(),
296 schema: Default::default(),
297 app_name: crate::get_app_name().to_string(),
298 app_name_slug: crate::get_slugified_app_name(),
299 extra_context: Default::default(),
300 }
301 }
302}
303
304impl SinkContext {
305 pub const fn globals(&self) -> &GlobalOptions {
306 &self.globals
307 }
308
309 pub const fn proxy(&self) -> &ProxyConfig {
310 &self.proxy
311 }
312}