vector/sinks/pulsar/
config.rs

1use std::{path::Path, time::Duration};
2
3use futures_util::{FutureExt, TryFutureExt};
4use pulsar::{
5    Authentication, ConnectionRetryOptions, Error as PulsarError, OperationRetryOptions,
6    ProducerOptions, Pulsar, TokioExecutor,
7    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
8    compression,
9    error::AuthenticationError,
10    message::proto,
11};
12use vector_lib::{
13    codecs::{TextSerializerConfig, encoding::SerializerConfig},
14    config::DataType,
15    lookup::lookup_v2::OptionalTargetPath,
16    sensitive_string::SensitiveString,
17};
18use vrl::value::Kind;
19
20use crate::{
21    schema,
22    sinks::{
23        prelude::*,
24        pulsar::sink::{PulsarSink, healthcheck},
25    },
26};
27
28/// Configuration for the `pulsar` sink.
29#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
30#[derive(Clone, Debug)]
31pub struct PulsarSinkConfig {
32    /// The endpoint to which the Pulsar client should connect to.
33    ///
34    /// The endpoint should specify the pulsar protocol and port.
35    #[serde(alias = "address")]
36    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
37    pub(crate) endpoint: String,
38
39    /// The Pulsar topic name to write events to.
40    #[configurable(metadata(docs::examples = "topic-1234"))]
41    pub(crate) topic: Template,
42
43    /// The name of the producer. If not specified, the default name assigned by Pulsar is used.
44    #[configurable(metadata(docs::examples = "producer-name"))]
45    pub(crate) producer_name: Option<String>,
46
47    /// The log field name or tags key to use for the partition key.
48    ///
49    /// If the field does not exist in the log event or metric tags, a blank value will be used.
50    ///
51    /// If omitted, the key is not sent.
52    ///
53    /// Pulsar uses a hash of the key to choose the topic-partition or uses round-robin if the record has no key.
54    #[configurable(metadata(docs::examples = "message"))]
55    #[configurable(metadata(docs::examples = "my_field"))]
56    pub(crate) partition_key_field: Option<OptionalTargetPath>,
57
58    /// The log field name to use for the Pulsar properties key.
59    ///
60    /// If omitted, no properties will be written.
61    pub properties_key: Option<OptionalTargetPath>,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub(crate) batch: PulsarBatchConfig,
66
67    #[configurable(derived)]
68    #[serde(default)]
69    pub compression: PulsarCompression,
70
71    #[configurable(derived)]
72    pub encoding: EncodingConfig,
73
74    #[configurable(derived)]
75    pub(crate) auth: Option<PulsarAuthConfig>,
76
77    #[configurable(derived)]
78    #[serde(
79        default,
80        deserialize_with = "crate::serde::bool_or_struct",
81        skip_serializing_if = "crate::serde::is_default"
82    )]
83    pub acknowledgements: AcknowledgementsConfig,
84
85    #[configurable(derived)]
86    #[serde(default)]
87    pub connection_retry_options: Option<CustomConnectionRetryOptions>,
88
89    #[configurable(derived)]
90    #[serde(default)]
91    pub(crate) tls: Option<PulsarTlsOptions>,
92}
93
94/// Event batching behavior.
95#[configurable_component]
96#[derive(Clone, Copy, Debug, Default)]
97pub(crate) struct PulsarBatchConfig {
98    /// The maximum amount of events in a batch before it is flushed.
99    ///
100    /// Note this is an unsigned 32 bit integer which is a smaller capacity than
101    /// many of the other sink batch settings.
102    #[configurable(metadata(docs::type_unit = "events"))]
103    #[configurable(metadata(docs::examples = 1000))]
104    pub max_events: Option<u32>,
105
106    /// The maximum size of a batch before it is flushed.
107    #[configurable(metadata(docs::type_unit = "bytes"))]
108    pub max_bytes: Option<usize>,
109}
110
111/// Authentication configuration.
112#[configurable_component]
113#[derive(Clone, Debug)]
114pub(crate) struct PulsarAuthConfig {
115    /// Basic authentication name/username.
116    ///
117    /// This can be used either for basic authentication (username/password) or JWT authentication.
118    /// When used for JWT, the value should be `token`.
119    #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120    #[configurable(metadata(docs::examples = "name123"))]
121    name: Option<String>,
122
123    /// Basic authentication password/token.
124    ///
125    /// This can be used either for basic authentication (username/password) or JWT authentication.
126    /// When used for JWT, the value should be the signed JWT, in the compact representation.
127    #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128    #[configurable(metadata(docs::examples = "123456789"))]
129    token: Option<SensitiveString>,
130
131    #[configurable(derived)]
132    oauth2: Option<OAuth2Config>,
133}
134
135/// OAuth2-specific authentication configuration.
136#[configurable_component]
137#[derive(Clone, Debug)]
138pub struct OAuth2Config {
139    /// The issuer URL.
140    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
141    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
142    issuer_url: String,
143
144    /// The credentials URL.
145    ///
146    /// A data URL is also supported.
147    #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
148    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
149    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
150    credentials_url: String,
151
152    /// The OAuth2 audience.
153    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
154    #[configurable(metadata(docs::examples = "pulsar"))]
155    audience: Option<String>,
156
157    /// The OAuth2 scope.
158    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
159    #[configurable(metadata(docs::examples = "admin"))]
160    scope: Option<String>,
161}
162
163/// Supported compression types for Pulsar.
164#[configurable_component]
165#[derive(Clone, Copy, Debug, Default)]
166#[serde(rename_all = "lowercase")]
167pub enum PulsarCompression {
168    /// No compression.
169    #[default]
170    None,
171
172    /// LZ4.
173    Lz4,
174
175    /// Zlib.
176    Zlib,
177
178    /// Zstandard.
179    Zstd,
180
181    /// Snappy.
182    Snappy,
183}
184
185#[configurable_component]
186#[configurable(
187    description = "Custom connection retry options configuration for the Pulsar client."
188)]
189#[derive(Clone, Debug)]
190pub struct CustomConnectionRetryOptions {
191    /// Minimum delay between connection retries.
192    #[configurable(metadata(docs::type_unit = "milliseconds"))]
193    pub min_backoff_ms: Option<u64>,
194
195    /// Maximum delay between reconnection retries.
196    #[configurable(metadata(docs::type_unit = "seconds"))]
197    #[configurable(metadata(docs::examples = 30))]
198    pub max_backoff_secs: Option<u64>,
199
200    /// Maximum number of connection retries.
201    #[configurable(metadata(docs::examples = 12))]
202    pub max_retries: Option<u32>,
203
204    /// Time limit to establish a connection.
205    #[configurable(metadata(docs::type_unit = "seconds"))]
206    #[configurable(metadata(docs::examples = 10))]
207    pub connection_timeout_secs: Option<u64>,
208
209    /// Keep-alive interval for each broker connection.
210    #[configurable(metadata(docs::type_unit = "seconds"))]
211    #[configurable(metadata(docs::examples = 60))]
212    pub keep_alive_secs: Option<u64>,
213}
214
215#[configurable_component]
216#[configurable(description = "TLS options configuration for the Pulsar client.")]
217#[derive(Clone, Debug)]
218pub struct PulsarTlsOptions {
219    /// File path containing a list of PEM encoded certificates.
220    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
221    pub ca_file: String,
222
223    /// Enables certificate verification.
224    ///
225    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
226    pub verify_certificate: Option<bool>,
227
228    /// Whether hostname verification is enabled when verify_certificate is false.
229    ///
230    /// Set to true if not specified.
231    pub verify_hostname: Option<bool>,
232}
233
234impl Default for PulsarSinkConfig {
235    fn default() -> Self {
236        Self {
237            endpoint: "pulsar://127.0.0.1:6650".to_string(),
238            topic: Template::try_from("topic-1234")
239                .expect("Unable to parse default template topic"),
240            producer_name: None,
241            properties_key: None,
242            partition_key_field: None,
243            batch: Default::default(),
244            compression: Default::default(),
245            encoding: TextSerializerConfig::default().into(),
246            auth: None,
247            acknowledgements: Default::default(),
248            connection_retry_options: None,
249            tls: None,
250        }
251    }
252}
253
254impl PulsarSinkConfig {
255    pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
256        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
257        if let Some(auth) = &self.auth {
258            builder = match (
259                auth.name.as_ref(),
260                auth.token.as_ref(),
261                auth.oauth2.as_ref(),
262            ) {
263                (Some(name), Some(token), None) => builder.with_auth(Authentication {
264                    name: name.clone(),
265                    data: token.inner().as_bytes().to_vec(),
266                }),
267                (None, None, Some(oauth2)) => builder.with_auth_provider(
268                    OAuth2Authentication::client_credentials(OAuth2Params {
269                        issuer_url: oauth2.issuer_url.clone(),
270                        credentials_url: oauth2.credentials_url.clone(),
271                        audience: oauth2.audience.clone(),
272                        scope: oauth2.scope.clone(),
273                    }),
274                ),
275                _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
276                    "Invalid auth config: can only specify name and token or oauth2 configuration"
277                        .to_string(),
278                ))))?,
279            };
280        }
281
282        // Apply configuration for reconnection exponential backoff.
283        let default_retry_options = ConnectionRetryOptions::default();
284        let retry_options =
285            self.connection_retry_options
286                .as_ref()
287                .map_or(default_retry_options.clone(), |opts| {
288                    ConnectionRetryOptions {
289                        min_backoff: opts
290                            .min_backoff_ms
291                            .map_or(default_retry_options.min_backoff, |ms| {
292                                Duration::from_millis(ms)
293                            }),
294                        max_backoff: opts
295                            .max_backoff_secs
296                            .map_or(default_retry_options.max_backoff, |secs| {
297                                Duration::from_secs(secs)
298                            }),
299                        max_retries: opts
300                            .max_retries
301                            .unwrap_or(default_retry_options.max_retries),
302                        connection_timeout: opts
303                            .connection_timeout_secs
304                            .map_or(default_retry_options.connection_timeout, |secs| {
305                                Duration::from_secs(secs)
306                            }),
307                        keep_alive: opts
308                            .keep_alive_secs
309                            .map_or(default_retry_options.keep_alive, |secs| {
310                                Duration::from_secs(secs)
311                            }),
312                        connection_max_idle: Default::default(),
313                    }
314                });
315
316        builder = builder.with_connection_retry_options(retry_options);
317
318        // Apply configuration for retrying Pulsar operations.
319        let operation_retry_opts = OperationRetryOptions::default();
320        builder = builder.with_operation_retry_options(operation_retry_opts);
321
322        if let Some(options) = &self.tls {
323            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
324            builder =
325                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
326            builder = builder
327                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
328        }
329        builder.build().map_err(|e| e.into()).await
330    }
331
332    pub(crate) fn build_producer_options(&self) -> ProducerOptions {
333        let mut opts = ProducerOptions {
334            encrypted: None,
335            access_mode: Some(0),
336            metadata: Default::default(),
337            schema: None,
338            batch_size: self.batch.max_events,
339            batch_byte_size: self.batch.max_bytes,
340            compression: None,
341            batch_timeout: Default::default(),
342            block_queue_if_full: Default::default(),
343            routing_policy: Default::default(),
344        };
345
346        match &self.compression {
347            PulsarCompression::None => opts.compression = Some(compression::Compression::None),
348            PulsarCompression::Lz4 => {
349                opts.compression = Some(compression::Compression::Lz4(
350                    compression::CompressionLz4::default(),
351                ))
352            }
353            PulsarCompression::Zlib => {
354                opts.compression = Some(compression::Compression::Zlib(
355                    compression::CompressionZlib::default(),
356                ))
357            }
358            PulsarCompression::Zstd => {
359                opts.compression = Some(compression::Compression::Zstd(
360                    compression::CompressionZstd::default(),
361                ))
362            }
363            PulsarCompression::Snappy => {
364                opts.compression = Some(compression::Compression::Snappy(
365                    compression::CompressionSnappy::default(),
366                ))
367            }
368        }
369
370        if let SerializerConfig::Avro { avro } = self.encoding.config() {
371            opts.schema = Some(proto::Schema {
372                schema_data: avro.schema.as_bytes().into(),
373                r#type: proto::schema::Type::Avro as i32,
374                ..Default::default()
375            });
376        }
377        opts
378    }
379}
380
381impl GenerateConfig for PulsarSinkConfig {
382    fn generate_config() -> toml::Value {
383        toml::Value::try_from(Self::default()).unwrap()
384    }
385}
386
387#[async_trait::async_trait]
388#[typetag::serde(name = "pulsar")]
389impl SinkConfig for PulsarSinkConfig {
390    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
391        let client = self
392            .create_pulsar_client()
393            .await
394            .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
395
396        let sink = PulsarSink::new(client, self.clone())?;
397        let hc = healthcheck(self.clone()).boxed();
398
399        Ok((VectorSink::from_event_streamsink(sink), hc))
400    }
401
402    fn input(&self) -> Input {
403        let requirement =
404            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
405
406        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
407            .with_schema_requirement(requirement)
408    }
409
410    fn acknowledgements(&self) -> &AcknowledgementsConfig {
411        &self.acknowledgements
412    }
413}