1use std::{path::Path, time::Duration};
2
3use futures_util::{FutureExt, TryFutureExt};
4use pulsar::{
5 Authentication, ConnectionRetryOptions, Error as PulsarError, OperationRetryOptions,
6 ProducerOptions, Pulsar, TokioExecutor,
7 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
8 compression,
9 error::AuthenticationError,
10 message::proto,
11};
12use vector_lib::{
13 codecs::{TextSerializerConfig, encoding::SerializerConfig},
14 config::DataType,
15 lookup::lookup_v2::OptionalTargetPath,
16 sensitive_string::SensitiveString,
17};
18use vrl::value::Kind;
19
20use crate::{
21 schema,
22 sinks::{
23 prelude::*,
24 pulsar::sink::{PulsarSink, healthcheck},
25 },
26};
27
28#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
30#[derive(Clone, Debug)]
31pub struct PulsarSinkConfig {
32 #[serde(alias = "address")]
36 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
37 pub(crate) endpoint: String,
38
39 #[configurable(metadata(docs::examples = "topic-1234"))]
41 pub(crate) topic: Template,
42
43 #[configurable(metadata(docs::examples = "producer-name"))]
45 pub(crate) producer_name: Option<String>,
46
47 #[configurable(metadata(docs::examples = "message"))]
55 #[configurable(metadata(docs::examples = "my_field"))]
56 pub(crate) partition_key_field: Option<OptionalTargetPath>,
57
58 pub properties_key: Option<OptionalTargetPath>,
62
63 #[configurable(derived)]
64 #[serde(default)]
65 pub(crate) batch: PulsarBatchConfig,
66
67 #[configurable(derived)]
68 #[serde(default)]
69 pub compression: PulsarCompression,
70
71 #[configurable(derived)]
72 pub encoding: EncodingConfig,
73
74 #[configurable(derived)]
75 pub(crate) auth: Option<PulsarAuthConfig>,
76
77 #[configurable(derived)]
78 #[serde(
79 default,
80 deserialize_with = "crate::serde::bool_or_struct",
81 skip_serializing_if = "crate::serde::is_default"
82 )]
83 pub acknowledgements: AcknowledgementsConfig,
84
85 #[configurable(derived)]
86 #[serde(default)]
87 pub connection_retry_options: Option<CustomConnectionRetryOptions>,
88
89 #[configurable(derived)]
90 #[serde(default)]
91 pub(crate) tls: Option<PulsarTlsOptions>,
92}
93
94#[configurable_component]
96#[derive(Clone, Copy, Debug, Default)]
97pub(crate) struct PulsarBatchConfig {
98 #[configurable(metadata(docs::type_unit = "events"))]
103 #[configurable(metadata(docs::examples = 1000))]
104 pub max_events: Option<u32>,
105
106 #[configurable(metadata(docs::type_unit = "bytes"))]
108 pub max_bytes: Option<usize>,
109}
110
111#[configurable_component]
113#[derive(Clone, Debug)]
114pub(crate) struct PulsarAuthConfig {
115 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120 #[configurable(metadata(docs::examples = "name123"))]
121 name: Option<String>,
122
123 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128 #[configurable(metadata(docs::examples = "123456789"))]
129 token: Option<SensitiveString>,
130
131 #[configurable(derived)]
132 oauth2: Option<OAuth2Config>,
133}
134
135#[configurable_component]
137#[derive(Clone, Debug)]
138pub struct OAuth2Config {
139 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
141 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
142 issuer_url: String,
143
144 #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
148 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
149 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
150 credentials_url: String,
151
152 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
154 #[configurable(metadata(docs::examples = "pulsar"))]
155 audience: Option<String>,
156
157 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
159 #[configurable(metadata(docs::examples = "admin"))]
160 scope: Option<String>,
161}
162
163#[configurable_component]
165#[derive(Clone, Copy, Debug, Derivative)]
166#[derivative(Default)]
167#[serde(rename_all = "lowercase")]
168pub enum PulsarCompression {
169 #[derivative(Default)]
171 None,
172
173 Lz4,
175
176 Zlib,
178
179 Zstd,
181
182 Snappy,
184}
185
186#[configurable_component]
187#[configurable(
188 description = "Custom connection retry options configuration for the Pulsar client."
189)]
190#[derive(Clone, Debug)]
191pub struct CustomConnectionRetryOptions {
192 #[configurable(metadata(docs::type_unit = "milliseconds"))]
194 pub min_backoff_ms: Option<u64>,
195
196 #[configurable(metadata(docs::type_unit = "seconds"))]
198 #[configurable(metadata(docs::examples = 30))]
199 pub max_backoff_secs: Option<u64>,
200
201 #[configurable(metadata(docs::examples = 12))]
203 pub max_retries: Option<u32>,
204
205 #[configurable(metadata(docs::type_unit = "seconds"))]
207 #[configurable(metadata(docs::examples = 10))]
208 pub connection_timeout_secs: Option<u64>,
209
210 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 60))]
213 pub keep_alive_secs: Option<u64>,
214}
215
216#[configurable_component]
217#[configurable(description = "TLS options configuration for the Pulsar client.")]
218#[derive(Clone, Debug)]
219pub struct PulsarTlsOptions {
220 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
222 pub ca_file: String,
223
224 pub verify_certificate: Option<bool>,
228
229 pub verify_hostname: Option<bool>,
233}
234
235impl Default for PulsarSinkConfig {
236 fn default() -> Self {
237 Self {
238 endpoint: "pulsar://127.0.0.1:6650".to_string(),
239 topic: Template::try_from("topic-1234")
240 .expect("Unable to parse default template topic"),
241 producer_name: None,
242 properties_key: None,
243 partition_key_field: None,
244 batch: Default::default(),
245 compression: Default::default(),
246 encoding: TextSerializerConfig::default().into(),
247 auth: None,
248 acknowledgements: Default::default(),
249 connection_retry_options: None,
250 tls: None,
251 }
252 }
253}
254
255impl PulsarSinkConfig {
256 pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
257 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
258 if let Some(auth) = &self.auth {
259 builder = match (
260 auth.name.as_ref(),
261 auth.token.as_ref(),
262 auth.oauth2.as_ref(),
263 ) {
264 (Some(name), Some(token), None) => builder.with_auth(Authentication {
265 name: name.clone(),
266 data: token.inner().as_bytes().to_vec(),
267 }),
268 (None, None, Some(oauth2)) => builder.with_auth_provider(
269 OAuth2Authentication::client_credentials(OAuth2Params {
270 issuer_url: oauth2.issuer_url.clone(),
271 credentials_url: oauth2.credentials_url.clone(),
272 audience: oauth2.audience.clone(),
273 scope: oauth2.scope.clone(),
274 }),
275 ),
276 _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
277 "Invalid auth config: can only specify name and token or oauth2 configuration"
278 .to_string(),
279 ))))?,
280 };
281 }
282
283 let default_retry_options = ConnectionRetryOptions::default();
285 let retry_options =
286 self.connection_retry_options
287 .as_ref()
288 .map_or(default_retry_options.clone(), |opts| {
289 ConnectionRetryOptions {
290 min_backoff: opts
291 .min_backoff_ms
292 .map_or(default_retry_options.min_backoff, |ms| {
293 Duration::from_millis(ms)
294 }),
295 max_backoff: opts
296 .max_backoff_secs
297 .map_or(default_retry_options.max_backoff, |secs| {
298 Duration::from_secs(secs)
299 }),
300 max_retries: opts
301 .max_retries
302 .unwrap_or(default_retry_options.max_retries),
303 connection_timeout: opts
304 .connection_timeout_secs
305 .map_or(default_retry_options.connection_timeout, |secs| {
306 Duration::from_secs(secs)
307 }),
308 keep_alive: opts
309 .keep_alive_secs
310 .map_or(default_retry_options.keep_alive, |secs| {
311 Duration::from_secs(secs)
312 }),
313 }
314 });
315
316 builder = builder.with_connection_retry_options(retry_options);
317
318 let operation_retry_opts = OperationRetryOptions::default();
320 builder = builder.with_operation_retry_options(operation_retry_opts);
321
322 if let Some(options) = &self.tls {
323 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
324 builder =
325 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
326 builder = builder
327 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
328 }
329 builder.build().map_err(|e| e.into()).await
330 }
331
332 pub(crate) fn build_producer_options(&self) -> ProducerOptions {
333 let mut opts = ProducerOptions {
334 encrypted: None,
335 access_mode: Some(0),
336 metadata: Default::default(),
337 schema: None,
338 batch_size: self.batch.max_events,
339 batch_byte_size: self.batch.max_bytes,
340 compression: None,
341 };
342
343 match &self.compression {
344 PulsarCompression::None => opts.compression = Some(compression::Compression::None),
345 PulsarCompression::Lz4 => {
346 opts.compression = Some(compression::Compression::Lz4(
347 compression::CompressionLz4::default(),
348 ))
349 }
350 PulsarCompression::Zlib => {
351 opts.compression = Some(compression::Compression::Zlib(
352 compression::CompressionZlib::default(),
353 ))
354 }
355 PulsarCompression::Zstd => {
356 opts.compression = Some(compression::Compression::Zstd(
357 compression::CompressionZstd::default(),
358 ))
359 }
360 PulsarCompression::Snappy => {
361 opts.compression = Some(compression::Compression::Snappy(
362 compression::CompressionSnappy::default(),
363 ))
364 }
365 }
366
367 if let SerializerConfig::Avro { avro } = self.encoding.config() {
368 opts.schema = Some(proto::Schema {
369 schema_data: avro.schema.as_bytes().into(),
370 r#type: proto::schema::Type::Avro as i32,
371 ..Default::default()
372 });
373 }
374 opts
375 }
376}
377
378impl GenerateConfig for PulsarSinkConfig {
379 fn generate_config() -> toml::Value {
380 toml::Value::try_from(Self::default()).unwrap()
381 }
382}
383
384#[async_trait::async_trait]
385#[typetag::serde(name = "pulsar")]
386impl SinkConfig for PulsarSinkConfig {
387 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
388 let client = self
389 .create_pulsar_client()
390 .await
391 .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
392
393 let sink = PulsarSink::new(client, self.clone())?;
394 let hc = healthcheck(self.clone()).boxed();
395
396 Ok((VectorSink::from_event_streamsink(sink), hc))
397 }
398
399 fn input(&self) -> Input {
400 let requirement =
401 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
402
403 Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
404 .with_schema_requirement(requirement)
405 }
406
407 fn acknowledgements(&self) -> &AcknowledgementsConfig {
408 &self.acknowledgements
409 }
410}