1use std::{path::Path, time::Duration};
2
3use futures_util::{FutureExt, TryFutureExt};
4use pulsar::{
5 Authentication, ConnectionRetryOptions, Error as PulsarError, OperationRetryOptions,
6 ProducerOptions, Pulsar, TokioExecutor,
7 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
8 compression,
9 error::AuthenticationError,
10 message::proto,
11};
12use vector_lib::{
13 codecs::{TextSerializerConfig, encoding::SerializerConfig},
14 config::DataType,
15 lookup::lookup_v2::OptionalTargetPath,
16 sensitive_string::SensitiveString,
17};
18use vrl::value::Kind;
19
20use crate::{
21 schema,
22 sinks::{
23 prelude::*,
24 pulsar::sink::{PulsarSink, healthcheck},
25 },
26};
27
28#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
30#[derive(Clone, Debug)]
31pub struct PulsarSinkConfig {
32 #[serde(alias = "address")]
36 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
37 pub(crate) endpoint: String,
38
39 #[configurable(metadata(docs::examples = "topic-1234"))]
41 pub(crate) topic: Template,
42
43 #[configurable(metadata(docs::examples = "producer-name"))]
45 pub(crate) producer_name: Option<String>,
46
47 #[configurable(metadata(docs::examples = "message"))]
55 #[configurable(metadata(docs::examples = "my_field"))]
56 pub(crate) partition_key_field: Option<OptionalTargetPath>,
57
58 pub properties_key: Option<OptionalTargetPath>,
62
63 #[configurable(derived)]
64 #[serde(default)]
65 pub(crate) batch: PulsarBatchConfig,
66
67 #[configurable(derived)]
68 #[serde(default)]
69 pub compression: PulsarCompression,
70
71 #[configurable(derived)]
72 pub encoding: EncodingConfig,
73
74 #[configurable(derived)]
75 pub(crate) auth: Option<PulsarAuthConfig>,
76
77 #[configurable(derived)]
78 #[serde(
79 default,
80 deserialize_with = "crate::serde::bool_or_struct",
81 skip_serializing_if = "crate::serde::is_default"
82 )]
83 pub acknowledgements: AcknowledgementsConfig,
84
85 #[configurable(derived)]
86 #[serde(default)]
87 pub connection_retry_options: Option<CustomConnectionRetryOptions>,
88
89 #[configurable(derived)]
90 #[serde(default)]
91 pub(crate) tls: Option<PulsarTlsOptions>,
92}
93
94#[configurable_component]
96#[derive(Clone, Copy, Debug, Default)]
97pub(crate) struct PulsarBatchConfig {
98 #[configurable(metadata(docs::type_unit = "events"))]
103 #[configurable(metadata(docs::examples = 1000))]
104 pub max_events: Option<u32>,
105
106 #[configurable(metadata(docs::type_unit = "bytes"))]
108 pub max_bytes: Option<usize>,
109}
110
111#[configurable_component]
113#[derive(Clone, Debug)]
114pub(crate) struct PulsarAuthConfig {
115 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120 #[configurable(metadata(docs::examples = "name123"))]
121 name: Option<String>,
122
123 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128 #[configurable(metadata(docs::examples = "123456789"))]
129 token: Option<SensitiveString>,
130
131 #[configurable(derived)]
132 oauth2: Option<OAuth2Config>,
133}
134
135#[configurable_component]
137#[derive(Clone, Debug)]
138pub struct OAuth2Config {
139 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
141 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
142 issuer_url: String,
143
144 #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
148 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
149 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
150 credentials_url: String,
151
152 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
154 #[configurable(metadata(docs::examples = "pulsar"))]
155 audience: Option<String>,
156
157 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
159 #[configurable(metadata(docs::examples = "admin"))]
160 scope: Option<String>,
161}
162
163#[configurable_component]
165#[derive(Clone, Copy, Debug, Default)]
166#[serde(rename_all = "lowercase")]
167pub enum PulsarCompression {
168 #[default]
170 None,
171
172 Lz4,
174
175 Zlib,
177
178 Zstd,
180
181 Snappy,
183}
184
185#[configurable_component]
186#[configurable(
187 description = "Custom connection retry options configuration for the Pulsar client."
188)]
189#[derive(Clone, Debug)]
190pub struct CustomConnectionRetryOptions {
191 #[configurable(metadata(docs::type_unit = "milliseconds"))]
193 pub min_backoff_ms: Option<u64>,
194
195 #[configurable(metadata(docs::type_unit = "seconds"))]
197 #[configurable(metadata(docs::examples = 30))]
198 pub max_backoff_secs: Option<u64>,
199
200 #[configurable(metadata(docs::examples = 12))]
202 pub max_retries: Option<u32>,
203
204 #[configurable(metadata(docs::type_unit = "seconds"))]
206 #[configurable(metadata(docs::examples = 10))]
207 pub connection_timeout_secs: Option<u64>,
208
209 #[configurable(metadata(docs::type_unit = "seconds"))]
211 #[configurable(metadata(docs::examples = 60))]
212 pub keep_alive_secs: Option<u64>,
213}
214
215#[configurable_component]
216#[configurable(description = "TLS options configuration for the Pulsar client.")]
217#[derive(Clone, Debug)]
218pub struct PulsarTlsOptions {
219 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
221 pub ca_file: String,
222
223 pub verify_certificate: Option<bool>,
227
228 pub verify_hostname: Option<bool>,
232}
233
234impl Default for PulsarSinkConfig {
235 fn default() -> Self {
236 Self {
237 endpoint: "pulsar://127.0.0.1:6650".to_string(),
238 topic: Template::try_from("topic-1234")
239 .expect("Unable to parse default template topic"),
240 producer_name: None,
241 properties_key: None,
242 partition_key_field: None,
243 batch: Default::default(),
244 compression: Default::default(),
245 encoding: TextSerializerConfig::default().into(),
246 auth: None,
247 acknowledgements: Default::default(),
248 connection_retry_options: None,
249 tls: None,
250 }
251 }
252}
253
254impl PulsarSinkConfig {
255 pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
256 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
257 if let Some(auth) = &self.auth {
258 builder = match (
259 auth.name.as_ref(),
260 auth.token.as_ref(),
261 auth.oauth2.as_ref(),
262 ) {
263 (Some(name), Some(token), None) => builder.with_auth(Authentication {
264 name: name.clone(),
265 data: token.inner().as_bytes().to_vec(),
266 }),
267 (None, None, Some(oauth2)) => builder.with_auth_provider(
268 OAuth2Authentication::client_credentials(OAuth2Params {
269 issuer_url: oauth2.issuer_url.clone(),
270 credentials_url: oauth2.credentials_url.clone(),
271 audience: oauth2.audience.clone(),
272 scope: oauth2.scope.clone(),
273 }),
274 ),
275 _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
276 "Invalid auth config: can only specify name and token or oauth2 configuration"
277 .to_string(),
278 ))))?,
279 };
280 }
281
282 let default_retry_options = ConnectionRetryOptions::default();
284 let retry_options =
285 self.connection_retry_options
286 .as_ref()
287 .map_or(default_retry_options.clone(), |opts| {
288 ConnectionRetryOptions {
289 min_backoff: opts
290 .min_backoff_ms
291 .map_or(default_retry_options.min_backoff, |ms| {
292 Duration::from_millis(ms)
293 }),
294 max_backoff: opts
295 .max_backoff_secs
296 .map_or(default_retry_options.max_backoff, |secs| {
297 Duration::from_secs(secs)
298 }),
299 max_retries: opts
300 .max_retries
301 .unwrap_or(default_retry_options.max_retries),
302 connection_timeout: opts
303 .connection_timeout_secs
304 .map_or(default_retry_options.connection_timeout, |secs| {
305 Duration::from_secs(secs)
306 }),
307 keep_alive: opts
308 .keep_alive_secs
309 .map_or(default_retry_options.keep_alive, |secs| {
310 Duration::from_secs(secs)
311 }),
312 connection_max_idle: Default::default(),
313 }
314 });
315
316 builder = builder.with_connection_retry_options(retry_options);
317
318 let operation_retry_opts = OperationRetryOptions::default();
320 builder = builder.with_operation_retry_options(operation_retry_opts);
321
322 if let Some(options) = &self.tls {
323 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
324 builder =
325 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
326 builder = builder
327 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
328 }
329 builder.build().map_err(|e| e.into()).await
330 }
331
332 pub(crate) fn build_producer_options(&self) -> ProducerOptions {
333 let mut opts = ProducerOptions {
334 encrypted: None,
335 access_mode: Some(0),
336 metadata: Default::default(),
337 schema: None,
338 batch_size: self.batch.max_events,
339 batch_byte_size: self.batch.max_bytes,
340 compression: None,
341 batch_timeout: Default::default(),
342 block_queue_if_full: Default::default(),
343 routing_policy: Default::default(),
344 };
345
346 match &self.compression {
347 PulsarCompression::None => opts.compression = Some(compression::Compression::None),
348 PulsarCompression::Lz4 => {
349 opts.compression = Some(compression::Compression::Lz4(
350 compression::CompressionLz4::default(),
351 ))
352 }
353 PulsarCompression::Zlib => {
354 opts.compression = Some(compression::Compression::Zlib(
355 compression::CompressionZlib::default(),
356 ))
357 }
358 PulsarCompression::Zstd => {
359 opts.compression = Some(compression::Compression::Zstd(
360 compression::CompressionZstd::default(),
361 ))
362 }
363 PulsarCompression::Snappy => {
364 opts.compression = Some(compression::Compression::Snappy(
365 compression::CompressionSnappy::default(),
366 ))
367 }
368 }
369
370 if let SerializerConfig::Avro { avro } = self.encoding.config() {
371 opts.schema = Some(proto::Schema {
372 schema_data: avro.schema.as_bytes().into(),
373 r#type: proto::schema::Type::Avro as i32,
374 ..Default::default()
375 });
376 }
377 opts
378 }
379}
380
381impl GenerateConfig for PulsarSinkConfig {
382 fn generate_config() -> toml::Value {
383 toml::Value::try_from(Self::default()).unwrap()
384 }
385}
386
387#[async_trait::async_trait]
388#[typetag::serde(name = "pulsar")]
389impl SinkConfig for PulsarSinkConfig {
390 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
391 let client = self
392 .create_pulsar_client()
393 .await
394 .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
395
396 let sink = PulsarSink::new(client, self.clone())?;
397 let hc = healthcheck(self.clone()).boxed();
398
399 Ok((VectorSink::from_event_streamsink(sink), hc))
400 }
401
402 fn input(&self) -> Input {
403 let requirement =
404 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
405
406 Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
407 .with_schema_requirement(requirement)
408 }
409
410 fn acknowledgements(&self) -> &AcknowledgementsConfig {
411 &self.acknowledgements
412 }
413}