vector/sinks/pulsar/
config.rs

1use std::{path::Path, time::Duration};
2
3use futures_util::{FutureExt, TryFutureExt};
4use pulsar::{
5    Authentication, ConnectionRetryOptions, Error as PulsarError, OperationRetryOptions,
6    ProducerOptions, Pulsar, TokioExecutor,
7    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
8    compression,
9    error::AuthenticationError,
10    message::proto,
11};
12use vector_lib::{
13    codecs::{TextSerializerConfig, encoding::SerializerConfig},
14    config::DataType,
15    lookup::lookup_v2::OptionalTargetPath,
16    sensitive_string::SensitiveString,
17};
18use vrl::value::Kind;
19
20use crate::{
21    schema,
22    sinks::{
23        prelude::*,
24        pulsar::sink::{PulsarSink, healthcheck},
25    },
26};
27
28/// Configuration for the `pulsar` sink.
29#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
30#[derive(Clone, Debug)]
31pub struct PulsarSinkConfig {
32    /// The endpoint to which the Pulsar client should connect to.
33    ///
34    /// The endpoint should specify the pulsar protocol and port.
35    #[serde(alias = "address")]
36    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
37    pub(crate) endpoint: String,
38
39    /// The Pulsar topic name to write events to.
40    #[configurable(metadata(docs::examples = "topic-1234"))]
41    pub(crate) topic: Template,
42
43    /// The name of the producer. If not specified, the default name assigned by Pulsar is used.
44    #[configurable(metadata(docs::examples = "producer-name"))]
45    pub(crate) producer_name: Option<String>,
46
47    /// The log field name or tags key to use for the partition key.
48    ///
49    /// If the field does not exist in the log event or metric tags, a blank value will be used.
50    ///
51    /// If omitted, the key is not sent.
52    ///
53    /// Pulsar uses a hash of the key to choose the topic-partition or uses round-robin if the record has no key.
54    #[configurable(metadata(docs::examples = "message"))]
55    #[configurable(metadata(docs::examples = "my_field"))]
56    pub(crate) partition_key_field: Option<OptionalTargetPath>,
57
58    /// The log field name to use for the Pulsar properties key.
59    ///
60    /// If omitted, no properties will be written.
61    pub properties_key: Option<OptionalTargetPath>,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub(crate) batch: PulsarBatchConfig,
66
67    #[configurable(derived)]
68    #[serde(default)]
69    pub compression: PulsarCompression,
70
71    #[configurable(derived)]
72    pub encoding: EncodingConfig,
73
74    #[configurable(derived)]
75    pub(crate) auth: Option<PulsarAuthConfig>,
76
77    #[configurable(derived)]
78    #[serde(
79        default,
80        deserialize_with = "crate::serde::bool_or_struct",
81        skip_serializing_if = "crate::serde::is_default"
82    )]
83    pub acknowledgements: AcknowledgementsConfig,
84
85    #[configurable(derived)]
86    #[serde(default)]
87    pub connection_retry_options: Option<CustomConnectionRetryOptions>,
88
89    #[configurable(derived)]
90    #[serde(default)]
91    pub(crate) tls: Option<PulsarTlsOptions>,
92}
93
94/// Event batching behavior.
95#[configurable_component]
96#[derive(Clone, Copy, Debug, Default)]
97pub(crate) struct PulsarBatchConfig {
98    /// The maximum amount of events in a batch before it is flushed.
99    ///
100    /// Note this is an unsigned 32 bit integer which is a smaller capacity than
101    /// many of the other sink batch settings.
102    #[configurable(metadata(docs::type_unit = "events"))]
103    #[configurable(metadata(docs::examples = 1000))]
104    pub max_events: Option<u32>,
105
106    /// The maximum size of a batch before it is flushed.
107    #[configurable(metadata(docs::type_unit = "bytes"))]
108    pub max_bytes: Option<usize>,
109}
110
111/// Authentication configuration.
112#[configurable_component]
113#[derive(Clone, Debug)]
114pub(crate) struct PulsarAuthConfig {
115    /// Basic authentication name/username.
116    ///
117    /// This can be used either for basic authentication (username/password) or JWT authentication.
118    /// When used for JWT, the value should be `token`.
119    #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120    #[configurable(metadata(docs::examples = "name123"))]
121    name: Option<String>,
122
123    /// Basic authentication password/token.
124    ///
125    /// This can be used either for basic authentication (username/password) or JWT authentication.
126    /// When used for JWT, the value should be the signed JWT, in the compact representation.
127    #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128    #[configurable(metadata(docs::examples = "123456789"))]
129    token: Option<SensitiveString>,
130
131    #[configurable(derived)]
132    oauth2: Option<OAuth2Config>,
133}
134
135/// OAuth2-specific authentication configuration.
136#[configurable_component]
137#[derive(Clone, Debug)]
138pub struct OAuth2Config {
139    /// The issuer URL.
140    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
141    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
142    issuer_url: String,
143
144    /// The credentials URL.
145    ///
146    /// A data URL is also supported.
147    #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
148    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
149    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
150    credentials_url: String,
151
152    /// The OAuth2 audience.
153    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
154    #[configurable(metadata(docs::examples = "pulsar"))]
155    audience: Option<String>,
156
157    /// The OAuth2 scope.
158    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
159    #[configurable(metadata(docs::examples = "admin"))]
160    scope: Option<String>,
161}
162
163/// Supported compression types for Pulsar.
164#[configurable_component]
165#[derive(Clone, Copy, Debug, Derivative)]
166#[derivative(Default)]
167#[serde(rename_all = "lowercase")]
168pub enum PulsarCompression {
169    /// No compression.
170    #[derivative(Default)]
171    None,
172
173    /// LZ4.
174    Lz4,
175
176    /// Zlib.
177    Zlib,
178
179    /// Zstandard.
180    Zstd,
181
182    /// Snappy.
183    Snappy,
184}
185
186#[configurable_component]
187#[configurable(
188    description = "Custom connection retry options configuration for the Pulsar client."
189)]
190#[derive(Clone, Debug)]
191pub struct CustomConnectionRetryOptions {
192    /// Minimum delay between connection retries.
193    #[configurable(metadata(docs::type_unit = "milliseconds"))]
194    pub min_backoff_ms: Option<u64>,
195
196    /// Maximum delay between reconnection retries.
197    #[configurable(metadata(docs::type_unit = "seconds"))]
198    #[configurable(metadata(docs::examples = 30))]
199    pub max_backoff_secs: Option<u64>,
200
201    /// Maximum number of connection retries.
202    #[configurable(metadata(docs::examples = 12))]
203    pub max_retries: Option<u32>,
204
205    /// Time limit to establish a connection.
206    #[configurable(metadata(docs::type_unit = "seconds"))]
207    #[configurable(metadata(docs::examples = 10))]
208    pub connection_timeout_secs: Option<u64>,
209
210    /// Keep-alive interval for each broker connection.
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 60))]
213    pub keep_alive_secs: Option<u64>,
214}
215
216#[configurable_component]
217#[configurable(description = "TLS options configuration for the Pulsar client.")]
218#[derive(Clone, Debug)]
219pub struct PulsarTlsOptions {
220    /// File path containing a list of PEM encoded certificates.
221    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
222    pub ca_file: String,
223
224    /// Enables certificate verification.
225    ///
226    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
227    pub verify_certificate: Option<bool>,
228
229    /// Whether hostname verification is enabled when verify_certificate is false.
230    ///
231    /// Set to true if not specified.
232    pub verify_hostname: Option<bool>,
233}
234
235impl Default for PulsarSinkConfig {
236    fn default() -> Self {
237        Self {
238            endpoint: "pulsar://127.0.0.1:6650".to_string(),
239            topic: Template::try_from("topic-1234")
240                .expect("Unable to parse default template topic"),
241            producer_name: None,
242            properties_key: None,
243            partition_key_field: None,
244            batch: Default::default(),
245            compression: Default::default(),
246            encoding: TextSerializerConfig::default().into(),
247            auth: None,
248            acknowledgements: Default::default(),
249            connection_retry_options: None,
250            tls: None,
251        }
252    }
253}
254
255impl PulsarSinkConfig {
256    pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
257        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
258        if let Some(auth) = &self.auth {
259            builder = match (
260                auth.name.as_ref(),
261                auth.token.as_ref(),
262                auth.oauth2.as_ref(),
263            ) {
264                (Some(name), Some(token), None) => builder.with_auth(Authentication {
265                    name: name.clone(),
266                    data: token.inner().as_bytes().to_vec(),
267                }),
268                (None, None, Some(oauth2)) => builder.with_auth_provider(
269                    OAuth2Authentication::client_credentials(OAuth2Params {
270                        issuer_url: oauth2.issuer_url.clone(),
271                        credentials_url: oauth2.credentials_url.clone(),
272                        audience: oauth2.audience.clone(),
273                        scope: oauth2.scope.clone(),
274                    }),
275                ),
276                _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
277                    "Invalid auth config: can only specify name and token or oauth2 configuration"
278                        .to_string(),
279                ))))?,
280            };
281        }
282
283        // Apply configuration for reconnection exponential backoff.
284        let default_retry_options = ConnectionRetryOptions::default();
285        let retry_options =
286            self.connection_retry_options
287                .as_ref()
288                .map_or(default_retry_options.clone(), |opts| {
289                    ConnectionRetryOptions {
290                        min_backoff: opts
291                            .min_backoff_ms
292                            .map_or(default_retry_options.min_backoff, |ms| {
293                                Duration::from_millis(ms)
294                            }),
295                        max_backoff: opts
296                            .max_backoff_secs
297                            .map_or(default_retry_options.max_backoff, |secs| {
298                                Duration::from_secs(secs)
299                            }),
300                        max_retries: opts
301                            .max_retries
302                            .unwrap_or(default_retry_options.max_retries),
303                        connection_timeout: opts
304                            .connection_timeout_secs
305                            .map_or(default_retry_options.connection_timeout, |secs| {
306                                Duration::from_secs(secs)
307                            }),
308                        keep_alive: opts
309                            .keep_alive_secs
310                            .map_or(default_retry_options.keep_alive, |secs| {
311                                Duration::from_secs(secs)
312                            }),
313                    }
314                });
315
316        builder = builder.with_connection_retry_options(retry_options);
317
318        // Apply configuration for retrying Pulsar operations.
319        let operation_retry_opts = OperationRetryOptions::default();
320        builder = builder.with_operation_retry_options(operation_retry_opts);
321
322        if let Some(options) = &self.tls {
323            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
324            builder =
325                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
326            builder = builder
327                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
328        }
329        builder.build().map_err(|e| e.into()).await
330    }
331
332    pub(crate) fn build_producer_options(&self) -> ProducerOptions {
333        let mut opts = ProducerOptions {
334            encrypted: None,
335            access_mode: Some(0),
336            metadata: Default::default(),
337            schema: None,
338            batch_size: self.batch.max_events,
339            batch_byte_size: self.batch.max_bytes,
340            compression: None,
341        };
342
343        match &self.compression {
344            PulsarCompression::None => opts.compression = Some(compression::Compression::None),
345            PulsarCompression::Lz4 => {
346                opts.compression = Some(compression::Compression::Lz4(
347                    compression::CompressionLz4::default(),
348                ))
349            }
350            PulsarCompression::Zlib => {
351                opts.compression = Some(compression::Compression::Zlib(
352                    compression::CompressionZlib::default(),
353                ))
354            }
355            PulsarCompression::Zstd => {
356                opts.compression = Some(compression::Compression::Zstd(
357                    compression::CompressionZstd::default(),
358                ))
359            }
360            PulsarCompression::Snappy => {
361                opts.compression = Some(compression::Compression::Snappy(
362                    compression::CompressionSnappy::default(),
363                ))
364            }
365        }
366
367        if let SerializerConfig::Avro { avro } = self.encoding.config() {
368            opts.schema = Some(proto::Schema {
369                schema_data: avro.schema.as_bytes().into(),
370                r#type: proto::schema::Type::Avro as i32,
371                ..Default::default()
372            });
373        }
374        opts
375    }
376}
377
378impl GenerateConfig for PulsarSinkConfig {
379    fn generate_config() -> toml::Value {
380        toml::Value::try_from(Self::default()).unwrap()
381    }
382}
383
384#[async_trait::async_trait]
385#[typetag::serde(name = "pulsar")]
386impl SinkConfig for PulsarSinkConfig {
387    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
388        let client = self
389            .create_pulsar_client()
390            .await
391            .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
392
393        let sink = PulsarSink::new(client, self.clone())?;
394        let hc = healthcheck(self.clone()).boxed();
395
396        Ok((VectorSink::from_event_streamsink(sink), hc))
397    }
398
399    fn input(&self) -> Input {
400        let requirement =
401            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
402
403        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
404            .with_schema_requirement(requirement)
405    }
406
407    fn acknowledgements(&self) -> &AcknowledgementsConfig {
408        &self.acknowledgements
409    }
410}