vector/sinks/databricks_zerobus/
config.rs1use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8 prelude::*,
9 util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19 docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22 #[serde(rename = "oauth")]
24 OAuth {
25 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27 #[configurable(metadata(docs::examples = "abc123..."))]
28 client_id: SensitiveString,
29
30 #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32 #[configurable(metadata(docs::examples = "secret123..."))]
33 client_secret: SensitiveString,
34 },
35}
36
37impl DatabricksAuthentication {
38 pub fn credentials(&self) -> (&str, &str) {
40 match self {
41 DatabricksAuthentication::OAuth {
42 client_id,
43 client_secret,
44 } => (client_id.inner(), client_secret.inner()),
45 }
46 }
47}
48
49#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ZerobusStreamOptions {
57 #[serde(default = "default_flush_timeout_ms")]
59 #[configurable(metadata(docs::examples = 30000))]
60 pub flush_timeout_ms: u64,
61
62 #[serde(default = "default_server_ack_timeout_ms")]
64 #[configurable(metadata(docs::examples = 60000))]
65 pub server_lack_of_ack_timeout_ms: u64,
66}
67
68impl Default for ZerobusStreamOptions {
69 fn default() -> Self {
70 Self {
71 flush_timeout_ms: default_flush_timeout_ms(),
72 server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
73 }
74 }
75}
76
77#[configurable_component(sink(
79 "databricks_zerobus",
80 "Stream observability data to Databricks Unity Catalog via Zerobus."
81))]
82#[derive(Clone, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ZerobusSinkConfig {
85 #[configurable(metadata(
94 docs::examples = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
95 ))]
96 #[configurable(metadata(
97 docs::examples = "https://6543210987654321.zerobus.us-east-1.cloud.databricks.com"
98 ))]
99 pub ingestion_endpoint: String,
100
101 #[configurable(metadata(docs::examples = "main.default.logs"))]
110 #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
111 pub table_name: String,
112
113 #[configurable(metadata(docs::examples = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"))]
122 #[configurable(metadata(docs::examples = "https://dbc-f6e5d4c3-b2a1.cloud.databricks.com"))]
123 pub unity_catalog_endpoint: String,
124
125 #[configurable(derived)]
132 pub auth: DatabricksAuthentication,
133
134 #[configurable(derived)]
135 #[serde(default)]
136 pub stream_options: ZerobusStreamOptions,
137
138 #[configurable(derived)]
139 #[serde(default)]
140 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
141
142 #[configurable(derived)]
143 #[serde(default)]
144 pub request: TowerRequestConfig,
145
146 #[configurable(derived)]
147 #[serde(
148 default,
149 deserialize_with = "crate::serde::bool_or_struct",
150 skip_serializing_if = "crate::serde::is_default"
151 )]
152 pub acknowledgements: AcknowledgementsConfig,
153}
154
155impl GenerateConfig for ZerobusSinkConfig {
156 fn generate_config() -> toml::Value {
157 toml::Value::try_from(Self {
158 ingestion_endpoint: "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
159 .to_string(),
160 table_name: "main.default.logs".to_string(),
161 unity_catalog_endpoint: "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com".to_string(),
162 auth: DatabricksAuthentication::OAuth {
163 client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
164 client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
165 },
166 stream_options: ZerobusStreamOptions::default(),
167 batch: BatchConfig::default(),
168 request: TowerRequestConfig::default(),
169 acknowledgements: AcknowledgementsConfig::default(),
170 })
171 .unwrap()
172 }
173}
174
175#[async_trait::async_trait]
176#[typetag::serde(name = "databricks_zerobus")]
177impl SinkConfig for ZerobusSinkConfig {
178 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
179 self.validate()?;
180
181 let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
182 let healthcheck_service = service.clone();
183
184 let request_limits = self.request.into_settings();
185
186 let sink = ZerobusSink::new(service, request_limits, self.batch)?;
187
188 let healthcheck = async move {
189 healthcheck_service
190 .ensure_stream()
191 .await
192 .map_err(|e| e.into())
193 };
194
195 Ok((
196 VectorSink::from_event_streamsink(sink),
197 Box::pin(healthcheck),
198 ))
199 }
200
201 fn input(&self) -> Input {
202 Input::log()
203 }
204
205 fn acknowledgements(&self) -> &AcknowledgementsConfig {
206 &self.acknowledgements
207 }
208}
209
210impl ZerobusSinkConfig {
211 pub fn validate(&self) -> Result<(), ZerobusSinkError> {
212 if self.ingestion_endpoint.is_empty() {
213 return Err(ZerobusSinkError::ConfigError {
214 message: "ingestion_endpoint cannot be empty".to_string(),
215 });
216 }
217
218 if self.table_name.is_empty() {
219 return Err(ZerobusSinkError::ConfigError {
220 message: "table_name cannot be empty".to_string(),
221 });
222 }
223
224 let parts: Vec<&str> = self.table_name.split('.').collect();
225 if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
226 return Err(ZerobusSinkError::ConfigError {
227 message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
228 .to_string(),
229 });
230 }
231
232 if self.unity_catalog_endpoint.is_empty() {
233 return Err(ZerobusSinkError::ConfigError {
234 message: "unity_catalog_endpoint cannot be empty".to_string(),
235 });
236 }
237
238 match &self.auth {
240 DatabricksAuthentication::OAuth {
241 client_id,
242 client_secret,
243 } => {
244 if client_id.inner().is_empty() {
245 return Err(ZerobusSinkError::ConfigError {
246 message: "OAuth client_id cannot be empty".to_string(),
247 });
248 }
249 if client_secret.inner().is_empty() {
250 return Err(ZerobusSinkError::ConfigError {
251 message: "OAuth client_secret cannot be empty".to_string(),
252 });
253 }
254 }
255 }
256
257 if let Some(max_bytes) = self.batch.max_bytes {
258 if max_bytes > 10_000_000 {
264 return Err(ZerobusSinkError::ConfigError {
265 message: "max_bytes must be less than or equal to 10MB".to_string(),
266 });
267 }
268 }
269
270 Ok(())
271 }
272}
273
274const fn default_flush_timeout_ms() -> u64 {
276 30000
277}
278
279const fn default_server_ack_timeout_ms() -> u64 {
280 60000
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use vector_lib::sensitive_string::SensitiveString;
287
288 fn create_test_config() -> ZerobusSinkConfig {
289 ZerobusSinkConfig {
290 ingestion_endpoint: "https://test.databricks.com".to_string(),
291 table_name: "test.default.logs".to_string(),
292 unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
293 auth: DatabricksAuthentication::OAuth {
294 client_id: SensitiveString::from("test-client-id".to_string()),
295 client_secret: SensitiveString::from("test-client-secret".to_string()),
296 },
297 stream_options: ZerobusStreamOptions::default(),
298 batch: Default::default(),
299 request: Default::default(),
300 acknowledgements: Default::default(),
301 }
302 }
303
304 #[test]
305 fn test_config_validation_success() {
306 let config = create_test_config();
307 assert!(config.validate().is_ok());
308 }
309
310 #[test]
311 fn test_config_validation_empty_endpoint() {
312 let mut config = create_test_config();
313 config.ingestion_endpoint = "".to_string();
314
315 let result = config.validate();
316 assert!(result.is_err());
317
318 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
319 message,
320 }) = result
321 {
322 assert!(message.contains("ingestion_endpoint cannot be empty"));
323 } else {
324 panic!("Expected ConfigError for empty ingestion_endpoint");
325 }
326 }
327
328 #[test]
329 fn test_config_validation_empty_table_name() {
330 let mut config = create_test_config();
331 config.table_name = "".to_string();
332
333 let result = config.validate();
334 assert!(result.is_err());
335
336 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
337 message,
338 }) = result
339 {
340 assert!(message.contains("table_name cannot be empty"));
341 } else {
342 panic!("Expected ConfigError for empty table_name");
343 }
344 }
345
346 #[test]
347 fn test_config_validation_invalid_table_name() {
348 let mut config = create_test_config();
349 config.table_name = "invalid_table".to_string(); let result = config.validate();
352 assert!(result.is_err());
353
354 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355 message,
356 }) = result
357 {
358 assert!(message.contains("catalog.schema.table"));
359 } else {
360 panic!("Expected ConfigError for invalid table_name format");
361 }
362 }
363
364 #[test]
365 fn test_config_validation_table_name_empty_segments() {
366 for bad in [
367 "catalog..table",
368 ".schema.table",
369 "catalog.schema.",
370 "..",
371 "catalog.schema.table.extra",
372 ] {
373 let mut config = create_test_config();
374 config.table_name = bad.to_string();
375 let result = config.validate();
376 assert!(result.is_err(), "expected error for table_name={bad:?}");
377 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
378 message,
379 }) = result
380 {
381 assert!(message.contains("catalog.schema.table"));
382 } else {
383 panic!("Expected ConfigError for table_name={bad:?}");
384 }
385 }
386 }
387
388 #[test]
389 fn test_config_validation_empty_unity_catalog_endpoint() {
390 let mut config = create_test_config();
391 config.unity_catalog_endpoint = "".to_string();
392
393 let result = config.validate();
394 assert!(result.is_err());
395
396 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
397 message,
398 }) = result
399 {
400 assert!(message.contains("unity_catalog_endpoint cannot be empty"));
401 } else {
402 panic!("Expected ConfigError for empty unity_catalog_endpoint");
403 }
404 }
405
406 #[test]
407 fn test_config_validation_empty_oauth_credentials() {
408 let mut config = create_test_config();
409 config.auth = DatabricksAuthentication::OAuth {
410 client_id: SensitiveString::from("".to_string()),
411 client_secret: SensitiveString::from("test-secret".to_string()),
412 };
413
414 let result = config.validate();
415 assert!(result.is_err());
416
417 if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
418 message,
419 }) = result
420 {
421 assert!(message.contains("OAuth client_id cannot be empty"));
422 } else {
423 panic!("Expected ConfigError for empty OAuth client_id");
424 }
425 }
426
427 #[test]
433 fn test_batch_max_bytes_none_defaults_to_10mb() {
434 let mut config = create_test_config();
435 config.batch.max_bytes = None;
436
437 let settings = config
438 .batch
439 .into_batcher_settings()
440 .expect("batch settings should build");
441
442 assert_eq!(settings.size_limit, 10_000_000);
443 }
444}