vector/sinks/databricks_zerobus/
config.rs

1//! Configuration for the Zerobus sink.
2
3use vector_lib::configurable::configurable_component;
4use vector_lib::sensitive_string::SensitiveString;
5
6use crate::config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext};
7use crate::sinks::{
8    prelude::*,
9    util::{BatchConfig, RealtimeSizeBasedDefaultBatchSettings},
10};
11
12use super::{error::ZerobusSinkError, service::ZerobusService, sink::ZerobusSink};
13
14/// Authentication configuration for Databricks.
15#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(tag = "strategy", rename_all = "snake_case")]
18#[configurable(metadata(
19    docs::enum_tag_description = "The authentication strategy to use for Databricks."
20))]
21pub enum DatabricksAuthentication {
22    /// Authenticate using OAuth 2.0 client credentials.
23    #[serde(rename = "oauth")]
24    OAuth {
25        /// OAuth 2.0 client ID.
26        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_ID}"))]
27        #[configurable(metadata(docs::examples = "abc123..."))]
28        client_id: SensitiveString,
29
30        /// OAuth 2.0 client secret.
31        #[configurable(metadata(docs::examples = "${DATABRICKS_CLIENT_SECRET}"))]
32        #[configurable(metadata(docs::examples = "secret123..."))]
33        client_secret: SensitiveString,
34    },
35}
36
37impl DatabricksAuthentication {
38    /// Extract the client ID and client secret as string references.
39    pub fn credentials(&self) -> (&str, &str) {
40        match self {
41            DatabricksAuthentication::OAuth {
42                client_id,
43                client_secret,
44            } => (client_id.inner(), client_secret.inner()),
45        }
46    }
47}
48
49/// Zerobus stream configuration options.
50///
51/// This is a thin wrapper around the SDK's `StreamConfigurationOptions` with Vector-specific
52/// configuration attributes and custom defaults suitable for Vector's use case.
53#[configurable_component]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ZerobusStreamOptions {
57    /// Timeout in milliseconds for flush operations.
58    #[serde(default = "default_flush_timeout_ms")]
59    #[configurable(metadata(docs::examples = 30000))]
60    pub flush_timeout_ms: u64,
61
62    /// Timeout in milliseconds for server acknowledgements.
63    #[serde(default = "default_server_ack_timeout_ms")]
64    #[configurable(metadata(docs::examples = 60000))]
65    pub server_lack_of_ack_timeout_ms: u64,
66}
67
68impl Default for ZerobusStreamOptions {
69    fn default() -> Self {
70        Self {
71            flush_timeout_ms: default_flush_timeout_ms(),
72            server_lack_of_ack_timeout_ms: default_server_ack_timeout_ms(),
73        }
74    }
75}
76
77/// Configuration for the Databricks Zerobus sink.
78#[configurable_component(sink(
79    "databricks_zerobus",
80    "Stream observability data to Databricks Unity Catalog via Zerobus."
81))]
82#[derive(Clone, Debug)]
83#[serde(deny_unknown_fields)]
84pub struct ZerobusSinkConfig {
85    /// The Zerobus ingestion endpoint URL.
86    ///
87    /// This should be the full URL to the Zerobus ingestion service.
88    ///
89    /// See the [Databricks Zerobus documentation][zerobus_endpoint] to find your workspace URL and
90    /// Zerobus ingest endpoint.
91    ///
92    /// [zerobus_endpoint]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#get-your-workspace-url-and-zerobus-ingest-endpoint
93    #[configurable(metadata(
94        docs::examples = "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
95    ))]
96    #[configurable(metadata(
97        docs::examples = "https://6543210987654321.zerobus.us-east-1.cloud.databricks.com"
98    ))]
99    pub ingestion_endpoint: String,
100
101    /// The Unity Catalog table name to write to.
102    ///
103    /// This should be in the format `catalog.schema.table`.
104    ///
105    /// See the [Databricks Zerobus documentation][zerobus_table] to create or identify the target
106    /// table.
107    ///
108    /// [zerobus_table]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#create-or-identify-the-target-table
109    #[configurable(metadata(docs::examples = "main.default.logs"))]
110    #[configurable(metadata(docs::examples = "main.default.vector_logs"))]
111    pub table_name: String,
112
113    /// The Unity Catalog endpoint URL.
114    ///
115    /// This is used for authentication and table metadata.
116    ///
117    /// See the [Databricks Zerobus documentation][zerobus_endpoint] to find your workspace URL and
118    /// Zerobus ingest endpoint.
119    ///
120    /// [zerobus_endpoint]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#get-your-workspace-url-and-zerobus-ingest-endpoint
121    #[configurable(metadata(docs::examples = "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com"))]
122    #[configurable(metadata(docs::examples = "https://dbc-f6e5d4c3-b2a1.cloud.databricks.com"))]
123    pub unity_catalog_endpoint: String,
124
125    /// Databricks authentication configuration.
126    ///
127    /// See the [Databricks Zerobus documentation][zerobus_service_principal] to create a service
128    /// principal and grant it permissions to write to the target table.
129    ///
130    /// [zerobus_service_principal]: https://docs.databricks.com/aws/en/ingestion/zerobus-ingest#create-a-service-principal-and-grant-permissions
131    #[configurable(derived)]
132    pub auth: DatabricksAuthentication,
133
134    #[configurable(derived)]
135    #[serde(default)]
136    pub stream_options: ZerobusStreamOptions,
137
138    #[configurable(derived)]
139    #[serde(default)]
140    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
141
142    #[configurable(derived)]
143    #[serde(default)]
144    pub request: TowerRequestConfig,
145
146    #[configurable(derived)]
147    #[serde(
148        default,
149        deserialize_with = "crate::serde::bool_or_struct",
150        skip_serializing_if = "crate::serde::is_default"
151    )]
152    pub acknowledgements: AcknowledgementsConfig,
153}
154
155impl GenerateConfig for ZerobusSinkConfig {
156    fn generate_config() -> toml::Value {
157        toml::Value::try_from(Self {
158            ingestion_endpoint: "https://1234567890123456.zerobus.us-west-2.cloud.databricks.com"
159                .to_string(),
160            table_name: "main.default.logs".to_string(),
161            unity_catalog_endpoint: "https://dbc-a1b2c3d4-e5f6.cloud.databricks.com".to_string(),
162            auth: DatabricksAuthentication::OAuth {
163                client_id: SensitiveString::from("${DATABRICKS_CLIENT_ID}".to_string()),
164                client_secret: SensitiveString::from("${DATABRICKS_CLIENT_SECRET}".to_string()),
165            },
166            stream_options: ZerobusStreamOptions::default(),
167            batch: BatchConfig::default(),
168            request: TowerRequestConfig::default(),
169            acknowledgements: AcknowledgementsConfig::default(),
170        })
171        .unwrap()
172    }
173}
174
175#[async_trait::async_trait]
176#[typetag::serde(name = "databricks_zerobus")]
177impl SinkConfig for ZerobusSinkConfig {
178    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
179        self.validate()?;
180
181        let service = ZerobusService::new(self.clone(), cx.proxy()).await?;
182        let healthcheck_service = service.clone();
183
184        let request_limits = self.request.into_settings();
185
186        let sink = ZerobusSink::new(service, request_limits, self.batch)?;
187
188        let healthcheck = async move {
189            healthcheck_service
190                .ensure_stream()
191                .await
192                .map_err(|e| e.into())
193        };
194
195        Ok((
196            VectorSink::from_event_streamsink(sink),
197            Box::pin(healthcheck),
198        ))
199    }
200
201    fn input(&self) -> Input {
202        Input::log()
203    }
204
205    fn acknowledgements(&self) -> &AcknowledgementsConfig {
206        &self.acknowledgements
207    }
208}
209
210impl ZerobusSinkConfig {
211    pub fn validate(&self) -> Result<(), ZerobusSinkError> {
212        if self.ingestion_endpoint.is_empty() {
213            return Err(ZerobusSinkError::ConfigError {
214                message: "ingestion_endpoint cannot be empty".to_string(),
215            });
216        }
217
218        if self.table_name.is_empty() {
219            return Err(ZerobusSinkError::ConfigError {
220                message: "table_name cannot be empty".to_string(),
221            });
222        }
223
224        let parts: Vec<&str> = self.table_name.split('.').collect();
225        if parts.len() != 3 || parts.iter().any(|p| p.is_empty()) {
226            return Err(ZerobusSinkError::ConfigError {
227                message: "table_name must be in format 'catalog.schema.table' (exactly 3 non-empty parts)"
228                    .to_string(),
229            });
230        }
231
232        if self.unity_catalog_endpoint.is_empty() {
233            return Err(ZerobusSinkError::ConfigError {
234                message: "unity_catalog_endpoint cannot be empty".to_string(),
235            });
236        }
237
238        // Validate authentication credentials
239        match &self.auth {
240            DatabricksAuthentication::OAuth {
241                client_id,
242                client_secret,
243            } => {
244                if client_id.inner().is_empty() {
245                    return Err(ZerobusSinkError::ConfigError {
246                        message: "OAuth client_id cannot be empty".to_string(),
247                    });
248                }
249                if client_secret.inner().is_empty() {
250                    return Err(ZerobusSinkError::ConfigError {
251                        message: "OAuth client_secret cannot be empty".to_string(),
252                    });
253                }
254            }
255        }
256
257        if let Some(max_bytes) = self.batch.max_bytes {
258            // Zerobus SDK limits max bytes to 10MB. This cap is a conservative safety limit:
259            // it's measured against Vector's pre-serialization sizing, not the protobuf bytes
260            // the SDK actually sends. Vector's pre-serialization size is generally larger than
261            // the SDK's protobuf-encoded size, so enforcing the 10MB cap here ensures the SDK's
262            // 10MB limit cannot be exceeded at runtime.
263            if max_bytes > 10_000_000 {
264                return Err(ZerobusSinkError::ConfigError {
265                    message: "max_bytes must be less than or equal to 10MB".to_string(),
266                });
267            }
268        }
269
270        Ok(())
271    }
272}
273
274// Default value functions
275const fn default_flush_timeout_ms() -> u64 {
276    30000
277}
278
279const fn default_server_ack_timeout_ms() -> u64 {
280    60000
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use vector_lib::sensitive_string::SensitiveString;
287
288    fn create_test_config() -> ZerobusSinkConfig {
289        ZerobusSinkConfig {
290            ingestion_endpoint: "https://test.databricks.com".to_string(),
291            table_name: "test.default.logs".to_string(),
292            unity_catalog_endpoint: "https://test-workspace.databricks.com".to_string(),
293            auth: DatabricksAuthentication::OAuth {
294                client_id: SensitiveString::from("test-client-id".to_string()),
295                client_secret: SensitiveString::from("test-client-secret".to_string()),
296            },
297            stream_options: ZerobusStreamOptions::default(),
298            batch: Default::default(),
299            request: Default::default(),
300            acknowledgements: Default::default(),
301        }
302    }
303
304    #[test]
305    fn test_config_validation_success() {
306        let config = create_test_config();
307        assert!(config.validate().is_ok());
308    }
309
310    #[test]
311    fn test_config_validation_empty_endpoint() {
312        let mut config = create_test_config();
313        config.ingestion_endpoint = "".to_string();
314
315        let result = config.validate();
316        assert!(result.is_err());
317
318        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
319            message,
320        }) = result
321        {
322            assert!(message.contains("ingestion_endpoint cannot be empty"));
323        } else {
324            panic!("Expected ConfigError for empty ingestion_endpoint");
325        }
326    }
327
328    #[test]
329    fn test_config_validation_empty_table_name() {
330        let mut config = create_test_config();
331        config.table_name = "".to_string();
332
333        let result = config.validate();
334        assert!(result.is_err());
335
336        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
337            message,
338        }) = result
339        {
340            assert!(message.contains("table_name cannot be empty"));
341        } else {
342            panic!("Expected ConfigError for empty table_name");
343        }
344    }
345
346    #[test]
347    fn test_config_validation_invalid_table_name() {
348        let mut config = create_test_config();
349        config.table_name = "invalid_table".to_string(); // Missing dots
350
351        let result = config.validate();
352        assert!(result.is_err());
353
354        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
355            message,
356        }) = result
357        {
358            assert!(message.contains("catalog.schema.table"));
359        } else {
360            panic!("Expected ConfigError for invalid table_name format");
361        }
362    }
363
364    #[test]
365    fn test_config_validation_table_name_empty_segments() {
366        for bad in [
367            "catalog..table",
368            ".schema.table",
369            "catalog.schema.",
370            "..",
371            "catalog.schema.table.extra",
372        ] {
373            let mut config = create_test_config();
374            config.table_name = bad.to_string();
375            let result = config.validate();
376            assert!(result.is_err(), "expected error for table_name={bad:?}");
377            if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
378                message,
379            }) = result
380            {
381                assert!(message.contains("catalog.schema.table"));
382            } else {
383                panic!("Expected ConfigError for table_name={bad:?}");
384            }
385        }
386    }
387
388    #[test]
389    fn test_config_validation_empty_unity_catalog_endpoint() {
390        let mut config = create_test_config();
391        config.unity_catalog_endpoint = "".to_string();
392
393        let result = config.validate();
394        assert!(result.is_err());
395
396        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
397            message,
398        }) = result
399        {
400            assert!(message.contains("unity_catalog_endpoint cannot be empty"));
401        } else {
402            panic!("Expected ConfigError for empty unity_catalog_endpoint");
403        }
404    }
405
406    #[test]
407    fn test_config_validation_empty_oauth_credentials() {
408        let mut config = create_test_config();
409        config.auth = DatabricksAuthentication::OAuth {
410            client_id: SensitiveString::from("".to_string()),
411            client_secret: SensitiveString::from("test-secret".to_string()),
412        };
413
414        let result = config.validate();
415        assert!(result.is_err());
416
417        if let Err(crate::sinks::databricks_zerobus::error::ZerobusSinkError::ConfigError {
418            message,
419        }) = result
420        {
421            assert!(message.contains("OAuth client_id cannot be empty"));
422        } else {
423            panic!("Expected ConfigError for empty OAuth client_id");
424        }
425    }
426
427    /// When `batch.max_bytes` is `None` (user omitted the field or set it to `null`),
428    /// `into_batcher_settings()` must merge it against
429    /// `RealtimeSizeBasedDefaultBatchSettings::MAX_BYTES` (10MB) — never unbounded.
430    /// This guarantees the Zerobus SDK's 10MB limit cannot be exceeded at runtime
431    /// even without an explicit user cap.
432    #[test]
433    fn test_batch_max_bytes_none_defaults_to_10mb() {
434        let mut config = create_test_config();
435        config.batch.max_bytes = None;
436
437        let settings = config
438            .batch
439            .into_batcher_settings()
440            .expect("batch settings should build");
441
442        assert_eq!(settings.size_limit, 10_000_000);
443    }
444}