vector/sinks/pulsar/
config.rs

1use crate::{
2    schema,
3    sinks::{
4        prelude::*,
5        pulsar::sink::{healthcheck, PulsarSink},
6    },
7};
8use futures_util::{FutureExt, TryFutureExt};
9use pulsar::{
10    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
11    compression,
12    message::proto,
13    Authentication, ConnectionRetryOptions, Error as PulsarError, ProducerOptions, Pulsar,
14    TokioExecutor,
15};
16use pulsar::{error::AuthenticationError, OperationRetryOptions};
17use std::path::Path;
18use std::time::Duration;
19use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig};
20use vector_lib::config::DataType;
21use vector_lib::lookup::lookup_v2::OptionalTargetPath;
22use vector_lib::sensitive_string::SensitiveString;
23use vrl::value::Kind;
24
25/// Configuration for the `pulsar` sink.
26#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
27#[derive(Clone, Debug)]
28pub struct PulsarSinkConfig {
29    /// The endpoint to which the Pulsar client should connect to.
30    ///
31    /// The endpoint should specify the pulsar protocol and port.
32    #[serde(alias = "address")]
33    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
34    pub(crate) endpoint: String,
35
36    /// The Pulsar topic name to write events to.
37    #[configurable(metadata(docs::examples = "topic-1234"))]
38    pub(crate) topic: Template,
39
40    /// The name of the producer. If not specified, the default name assigned by Pulsar is used.
41    #[configurable(metadata(docs::examples = "producer-name"))]
42    pub(crate) producer_name: Option<String>,
43
44    /// The log field name or tags key to use for the partition key.
45    ///
46    /// If the field does not exist in the log event or metric tags, a blank value will be used.
47    ///
48    /// If omitted, the key is not sent.
49    ///
50    /// Pulsar uses a hash of the key to choose the topic-partition or uses round-robin if the record has no key.
51    #[configurable(metadata(docs::examples = "message"))]
52    #[configurable(metadata(docs::examples = "my_field"))]
53    pub(crate) partition_key_field: Option<OptionalTargetPath>,
54
55    /// The log field name to use for the Pulsar properties key.
56    ///
57    /// If omitted, no properties will be written.
58    pub properties_key: Option<OptionalTargetPath>,
59
60    #[configurable(derived)]
61    #[serde(default)]
62    pub(crate) batch: PulsarBatchConfig,
63
64    #[configurable(derived)]
65    #[serde(default)]
66    pub compression: PulsarCompression,
67
68    #[configurable(derived)]
69    pub encoding: EncodingConfig,
70
71    #[configurable(derived)]
72    pub(crate) auth: Option<PulsarAuthConfig>,
73
74    #[configurable(derived)]
75    #[serde(
76        default,
77        deserialize_with = "crate::serde::bool_or_struct",
78        skip_serializing_if = "crate::serde::is_default"
79    )]
80    pub acknowledgements: AcknowledgementsConfig,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub connection_retry_options: Option<CustomConnectionRetryOptions>,
85
86    #[configurable(derived)]
87    #[serde(default)]
88    pub(crate) tls: Option<PulsarTlsOptions>,
89}
90
91/// Event batching behavior.
92#[configurable_component]
93#[derive(Clone, Copy, Debug, Default)]
94pub(crate) struct PulsarBatchConfig {
95    /// The maximum amount of events in a batch before it is flushed.
96    ///
97    /// Note this is an unsigned 32 bit integer which is a smaller capacity than
98    /// many of the other sink batch settings.
99    #[configurable(metadata(docs::type_unit = "events"))]
100    #[configurable(metadata(docs::examples = 1000))]
101    pub max_events: Option<u32>,
102
103    /// The maximum size of a batch before it is flushed.
104    #[configurable(metadata(docs::type_unit = "bytes"))]
105    pub max_bytes: Option<usize>,
106}
107
108/// Authentication configuration.
109#[configurable_component]
110#[derive(Clone, Debug)]
111pub(crate) struct PulsarAuthConfig {
112    /// Basic authentication name/username.
113    ///
114    /// This can be used either for basic authentication (username/password) or JWT authentication.
115    /// When used for JWT, the value should be `token`.
116    #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
117    #[configurable(metadata(docs::examples = "name123"))]
118    name: Option<String>,
119
120    /// Basic authentication password/token.
121    ///
122    /// This can be used either for basic authentication (username/password) or JWT authentication.
123    /// When used for JWT, the value should be the signed JWT, in the compact representation.
124    #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
125    #[configurable(metadata(docs::examples = "123456789"))]
126    token: Option<SensitiveString>,
127
128    #[configurable(derived)]
129    oauth2: Option<OAuth2Config>,
130}
131
132/// OAuth2-specific authentication configuration.
133#[configurable_component]
134#[derive(Clone, Debug)]
135pub struct OAuth2Config {
136    /// The issuer URL.
137    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
138    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
139    issuer_url: String,
140
141    /// The credentials URL.
142    ///
143    /// A data URL is also supported.
144    #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
145    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
146    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
147    credentials_url: String,
148
149    /// The OAuth2 audience.
150    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
151    #[configurable(metadata(docs::examples = "pulsar"))]
152    audience: Option<String>,
153
154    /// The OAuth2 scope.
155    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
156    #[configurable(metadata(docs::examples = "admin"))]
157    scope: Option<String>,
158}
159
160/// Supported compression types for Pulsar.
161#[configurable_component]
162#[derive(Clone, Copy, Debug, Derivative)]
163#[derivative(Default)]
164#[serde(rename_all = "lowercase")]
165pub enum PulsarCompression {
166    /// No compression.
167    #[derivative(Default)]
168    None,
169
170    /// LZ4.
171    Lz4,
172
173    /// Zlib.
174    Zlib,
175
176    /// Zstandard.
177    Zstd,
178
179    /// Snappy.
180    Snappy,
181}
182
183#[configurable_component]
184#[configurable(
185    description = "Custom connection retry options configuration for the Pulsar client."
186)]
187#[derive(Clone, Debug)]
188pub struct CustomConnectionRetryOptions {
189    /// Minimum delay between connection retries.
190    #[configurable(metadata(docs::type_unit = "milliseconds"))]
191    pub min_backoff_ms: Option<u64>,
192
193    /// Maximum delay between reconnection retries.
194    #[configurable(metadata(docs::type_unit = "seconds"))]
195    #[configurable(metadata(docs::examples = 30))]
196    pub max_backoff_secs: Option<u64>,
197
198    /// Maximum number of connection retries.
199    #[configurable(metadata(docs::examples = 12))]
200    pub max_retries: Option<u32>,
201
202    /// Time limit to establish a connection.
203    #[configurable(metadata(docs::type_unit = "seconds"))]
204    #[configurable(metadata(docs::examples = 10))]
205    pub connection_timeout_secs: Option<u64>,
206
207    /// Keep-alive interval for each broker connection.
208    #[configurable(metadata(docs::type_unit = "seconds"))]
209    #[configurable(metadata(docs::examples = 60))]
210    pub keep_alive_secs: Option<u64>,
211}
212
213#[configurable_component]
214#[configurable(description = "TLS options configuration for the Pulsar client.")]
215#[derive(Clone, Debug)]
216pub struct PulsarTlsOptions {
217    /// File path containing a list of PEM encoded certificates.
218    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
219    pub ca_file: String,
220
221    /// Enables certificate verification.
222    ///
223    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
224    pub verify_certificate: Option<bool>,
225
226    /// Whether hostname verification is enabled when verify_certificate is false.
227    ///
228    /// Set to true if not specified.
229    pub verify_hostname: Option<bool>,
230}
231
232impl Default for PulsarSinkConfig {
233    fn default() -> Self {
234        Self {
235            endpoint: "pulsar://127.0.0.1:6650".to_string(),
236            topic: Template::try_from("topic-1234")
237                .expect("Unable to parse default template topic"),
238            producer_name: None,
239            properties_key: None,
240            partition_key_field: None,
241            batch: Default::default(),
242            compression: Default::default(),
243            encoding: TextSerializerConfig::default().into(),
244            auth: None,
245            acknowledgements: Default::default(),
246            connection_retry_options: None,
247            tls: None,
248        }
249    }
250}
251
252impl PulsarSinkConfig {
253    pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
254        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
255        if let Some(auth) = &self.auth {
256            builder = match (
257                auth.name.as_ref(),
258                auth.token.as_ref(),
259                auth.oauth2.as_ref(),
260            ) {
261                (Some(name), Some(token), None) => builder.with_auth(Authentication {
262                    name: name.clone(),
263                    data: token.inner().as_bytes().to_vec(),
264                }),
265                (None, None, Some(oauth2)) => builder.with_auth_provider(
266                    OAuth2Authentication::client_credentials(OAuth2Params {
267                        issuer_url: oauth2.issuer_url.clone(),
268                        credentials_url: oauth2.credentials_url.clone(),
269                        audience: oauth2.audience.clone(),
270                        scope: oauth2.scope.clone(),
271                    }),
272                ),
273                _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
274                    "Invalid auth config: can only specify name and token or oauth2 configuration"
275                        .to_string(),
276                ))))?,
277            };
278        }
279
280        // Apply configuration for reconnection exponential backoff.
281        let default_retry_options = ConnectionRetryOptions::default();
282        let retry_options =
283            self.connection_retry_options
284                .as_ref()
285                .map_or(default_retry_options.clone(), |opts| {
286                    ConnectionRetryOptions {
287                        min_backoff: opts
288                            .min_backoff_ms
289                            .map_or(default_retry_options.min_backoff, |ms| {
290                                Duration::from_millis(ms)
291                            }),
292                        max_backoff: opts
293                            .max_backoff_secs
294                            .map_or(default_retry_options.max_backoff, |secs| {
295                                Duration::from_secs(secs)
296                            }),
297                        max_retries: opts
298                            .max_retries
299                            .unwrap_or(default_retry_options.max_retries),
300                        connection_timeout: opts
301                            .connection_timeout_secs
302                            .map_or(default_retry_options.connection_timeout, |secs| {
303                                Duration::from_secs(secs)
304                            }),
305                        keep_alive: opts
306                            .keep_alive_secs
307                            .map_or(default_retry_options.keep_alive, |secs| {
308                                Duration::from_secs(secs)
309                            }),
310                    }
311                });
312
313        builder = builder.with_connection_retry_options(retry_options);
314
315        // Apply configuration for retrying Pulsar operations.
316        let operation_retry_opts = OperationRetryOptions::default();
317        builder = builder.with_operation_retry_options(operation_retry_opts);
318
319        if let Some(options) = &self.tls {
320            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
321            builder =
322                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
323            builder = builder
324                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
325        }
326        builder.build().map_err(|e| e.into()).await
327    }
328
329    pub(crate) fn build_producer_options(&self) -> ProducerOptions {
330        let mut opts = ProducerOptions {
331            encrypted: None,
332            access_mode: Some(0),
333            metadata: Default::default(),
334            schema: None,
335            batch_size: self.batch.max_events,
336            batch_byte_size: self.batch.max_bytes,
337            compression: None,
338        };
339
340        match &self.compression {
341            PulsarCompression::None => opts.compression = Some(compression::Compression::None),
342            PulsarCompression::Lz4 => {
343                opts.compression = Some(compression::Compression::Lz4(
344                    compression::CompressionLz4::default(),
345                ))
346            }
347            PulsarCompression::Zlib => {
348                opts.compression = Some(compression::Compression::Zlib(
349                    compression::CompressionZlib::default(),
350                ))
351            }
352            PulsarCompression::Zstd => {
353                opts.compression = Some(compression::Compression::Zstd(
354                    compression::CompressionZstd::default(),
355                ))
356            }
357            PulsarCompression::Snappy => {
358                opts.compression = Some(compression::Compression::Snappy(
359                    compression::CompressionSnappy::default(),
360                ))
361            }
362        }
363
364        if let SerializerConfig::Avro { avro } = self.encoding.config() {
365            opts.schema = Some(proto::Schema {
366                schema_data: avro.schema.as_bytes().into(),
367                r#type: proto::schema::Type::Avro as i32,
368                ..Default::default()
369            });
370        }
371        opts
372    }
373}
374
375impl GenerateConfig for PulsarSinkConfig {
376    fn generate_config() -> toml::Value {
377        toml::Value::try_from(Self::default()).unwrap()
378    }
379}
380
381#[async_trait::async_trait]
382#[typetag::serde(name = "pulsar")]
383impl SinkConfig for PulsarSinkConfig {
384    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
385        let client = self
386            .create_pulsar_client()
387            .await
388            .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
389
390        let sink = PulsarSink::new(client, self.clone())?;
391        let hc = healthcheck(self.clone()).boxed();
392
393        Ok((VectorSink::from_event_streamsink(sink), hc))
394    }
395
396    fn input(&self) -> Input {
397        let requirement =
398            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
399
400        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
401            .with_schema_requirement(requirement)
402    }
403
404    fn acknowledgements(&self) -> &AcknowledgementsConfig {
405        &self.acknowledgements
406    }
407}