1use crate::{
2 schema,
3 sinks::{
4 prelude::*,
5 pulsar::sink::{healthcheck, PulsarSink},
6 },
7};
8use futures_util::{FutureExt, TryFutureExt};
9use pulsar::{
10 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
11 compression,
12 message::proto,
13 Authentication, ConnectionRetryOptions, Error as PulsarError, ProducerOptions, Pulsar,
14 TokioExecutor,
15};
16use pulsar::{error::AuthenticationError, OperationRetryOptions};
17use std::path::Path;
18use std::time::Duration;
19use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig};
20use vector_lib::config::DataType;
21use vector_lib::lookup::lookup_v2::OptionalTargetPath;
22use vector_lib::sensitive_string::SensitiveString;
23use vrl::value::Kind;
24
25#[configurable_component(sink("pulsar", "Publish observability events to Apache Pulsar topics."))]
27#[derive(Clone, Debug)]
28pub struct PulsarSinkConfig {
29 #[serde(alias = "address")]
33 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
34 pub(crate) endpoint: String,
35
36 #[configurable(metadata(docs::examples = "topic-1234"))]
38 pub(crate) topic: Template,
39
40 #[configurable(metadata(docs::examples = "producer-name"))]
42 pub(crate) producer_name: Option<String>,
43
44 #[configurable(metadata(docs::examples = "message"))]
52 #[configurable(metadata(docs::examples = "my_field"))]
53 pub(crate) partition_key_field: Option<OptionalTargetPath>,
54
55 pub properties_key: Option<OptionalTargetPath>,
59
60 #[configurable(derived)]
61 #[serde(default)]
62 pub(crate) batch: PulsarBatchConfig,
63
64 #[configurable(derived)]
65 #[serde(default)]
66 pub compression: PulsarCompression,
67
68 #[configurable(derived)]
69 pub encoding: EncodingConfig,
70
71 #[configurable(derived)]
72 pub(crate) auth: Option<PulsarAuthConfig>,
73
74 #[configurable(derived)]
75 #[serde(
76 default,
77 deserialize_with = "crate::serde::bool_or_struct",
78 skip_serializing_if = "crate::serde::is_default"
79 )]
80 pub acknowledgements: AcknowledgementsConfig,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub connection_retry_options: Option<CustomConnectionRetryOptions>,
85
86 #[configurable(derived)]
87 #[serde(default)]
88 pub(crate) tls: Option<PulsarTlsOptions>,
89}
90
91#[configurable_component]
93#[derive(Clone, Copy, Debug, Default)]
94pub(crate) struct PulsarBatchConfig {
95 #[configurable(metadata(docs::type_unit = "events"))]
100 #[configurable(metadata(docs::examples = 1000))]
101 pub max_events: Option<u32>,
102
103 #[configurable(metadata(docs::type_unit = "bytes"))]
105 pub max_bytes: Option<usize>,
106}
107
108#[configurable_component]
110#[derive(Clone, Debug)]
111pub(crate) struct PulsarAuthConfig {
112 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
117 #[configurable(metadata(docs::examples = "name123"))]
118 name: Option<String>,
119
120 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
125 #[configurable(metadata(docs::examples = "123456789"))]
126 token: Option<SensitiveString>,
127
128 #[configurable(derived)]
129 oauth2: Option<OAuth2Config>,
130}
131
132#[configurable_component]
134#[derive(Clone, Debug)]
135pub struct OAuth2Config {
136 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
138 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
139 issuer_url: String,
140
141 #[configurable(metadata(docs::examples = "{OAUTH2_CREDENTIALS_URL}"))]
145 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
146 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
147 credentials_url: String,
148
149 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
151 #[configurable(metadata(docs::examples = "pulsar"))]
152 audience: Option<String>,
153
154 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
156 #[configurable(metadata(docs::examples = "admin"))]
157 scope: Option<String>,
158}
159
160#[configurable_component]
162#[derive(Clone, Copy, Debug, Derivative)]
163#[derivative(Default)]
164#[serde(rename_all = "lowercase")]
165pub enum PulsarCompression {
166 #[derivative(Default)]
168 None,
169
170 Lz4,
172
173 Zlib,
175
176 Zstd,
178
179 Snappy,
181}
182
183#[configurable_component]
184#[configurable(
185 description = "Custom connection retry options configuration for the Pulsar client."
186)]
187#[derive(Clone, Debug)]
188pub struct CustomConnectionRetryOptions {
189 #[configurable(metadata(docs::type_unit = "milliseconds"))]
191 pub min_backoff_ms: Option<u64>,
192
193 #[configurable(metadata(docs::type_unit = "seconds"))]
195 #[configurable(metadata(docs::examples = 30))]
196 pub max_backoff_secs: Option<u64>,
197
198 #[configurable(metadata(docs::examples = 12))]
200 pub max_retries: Option<u32>,
201
202 #[configurable(metadata(docs::type_unit = "seconds"))]
204 #[configurable(metadata(docs::examples = 10))]
205 pub connection_timeout_secs: Option<u64>,
206
207 #[configurable(metadata(docs::type_unit = "seconds"))]
209 #[configurable(metadata(docs::examples = 60))]
210 pub keep_alive_secs: Option<u64>,
211}
212
213#[configurable_component]
214#[configurable(description = "TLS options configuration for the Pulsar client.")]
215#[derive(Clone, Debug)]
216pub struct PulsarTlsOptions {
217 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
219 pub ca_file: String,
220
221 pub verify_certificate: Option<bool>,
225
226 pub verify_hostname: Option<bool>,
230}
231
232impl Default for PulsarSinkConfig {
233 fn default() -> Self {
234 Self {
235 endpoint: "pulsar://127.0.0.1:6650".to_string(),
236 topic: Template::try_from("topic-1234")
237 .expect("Unable to parse default template topic"),
238 producer_name: None,
239 properties_key: None,
240 partition_key_field: None,
241 batch: Default::default(),
242 compression: Default::default(),
243 encoding: TextSerializerConfig::default().into(),
244 auth: None,
245 acknowledgements: Default::default(),
246 connection_retry_options: None,
247 tls: None,
248 }
249 }
250}
251
252impl PulsarSinkConfig {
253 pub(crate) async fn create_pulsar_client(&self) -> crate::Result<Pulsar<TokioExecutor>> {
254 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
255 if let Some(auth) = &self.auth {
256 builder = match (
257 auth.name.as_ref(),
258 auth.token.as_ref(),
259 auth.oauth2.as_ref(),
260 ) {
261 (Some(name), Some(token), None) => builder.with_auth(Authentication {
262 name: name.clone(),
263 data: token.inner().as_bytes().to_vec(),
264 }),
265 (None, None, Some(oauth2)) => builder.with_auth_provider(
266 OAuth2Authentication::client_credentials(OAuth2Params {
267 issuer_url: oauth2.issuer_url.clone(),
268 credentials_url: oauth2.credentials_url.clone(),
269 audience: oauth2.audience.clone(),
270 scope: oauth2.scope.clone(),
271 }),
272 ),
273 _ => return Err(Box::new(PulsarError::Authentication(AuthenticationError::Custom(
274 "Invalid auth config: can only specify name and token or oauth2 configuration"
275 .to_string(),
276 ))))?,
277 };
278 }
279
280 let default_retry_options = ConnectionRetryOptions::default();
282 let retry_options =
283 self.connection_retry_options
284 .as_ref()
285 .map_or(default_retry_options.clone(), |opts| {
286 ConnectionRetryOptions {
287 min_backoff: opts
288 .min_backoff_ms
289 .map_or(default_retry_options.min_backoff, |ms| {
290 Duration::from_millis(ms)
291 }),
292 max_backoff: opts
293 .max_backoff_secs
294 .map_or(default_retry_options.max_backoff, |secs| {
295 Duration::from_secs(secs)
296 }),
297 max_retries: opts
298 .max_retries
299 .unwrap_or(default_retry_options.max_retries),
300 connection_timeout: opts
301 .connection_timeout_secs
302 .map_or(default_retry_options.connection_timeout, |secs| {
303 Duration::from_secs(secs)
304 }),
305 keep_alive: opts
306 .keep_alive_secs
307 .map_or(default_retry_options.keep_alive, |secs| {
308 Duration::from_secs(secs)
309 }),
310 }
311 });
312
313 builder = builder.with_connection_retry_options(retry_options);
314
315 let operation_retry_opts = OperationRetryOptions::default();
317 builder = builder.with_operation_retry_options(operation_retry_opts);
318
319 if let Some(options) = &self.tls {
320 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
321 builder =
322 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
323 builder = builder
324 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
325 }
326 builder.build().map_err(|e| e.into()).await
327 }
328
329 pub(crate) fn build_producer_options(&self) -> ProducerOptions {
330 let mut opts = ProducerOptions {
331 encrypted: None,
332 access_mode: Some(0),
333 metadata: Default::default(),
334 schema: None,
335 batch_size: self.batch.max_events,
336 batch_byte_size: self.batch.max_bytes,
337 compression: None,
338 };
339
340 match &self.compression {
341 PulsarCompression::None => opts.compression = Some(compression::Compression::None),
342 PulsarCompression::Lz4 => {
343 opts.compression = Some(compression::Compression::Lz4(
344 compression::CompressionLz4::default(),
345 ))
346 }
347 PulsarCompression::Zlib => {
348 opts.compression = Some(compression::Compression::Zlib(
349 compression::CompressionZlib::default(),
350 ))
351 }
352 PulsarCompression::Zstd => {
353 opts.compression = Some(compression::Compression::Zstd(
354 compression::CompressionZstd::default(),
355 ))
356 }
357 PulsarCompression::Snappy => {
358 opts.compression = Some(compression::Compression::Snappy(
359 compression::CompressionSnappy::default(),
360 ))
361 }
362 }
363
364 if let SerializerConfig::Avro { avro } = self.encoding.config() {
365 opts.schema = Some(proto::Schema {
366 schema_data: avro.schema.as_bytes().into(),
367 r#type: proto::schema::Type::Avro as i32,
368 ..Default::default()
369 });
370 }
371 opts
372 }
373}
374
375impl GenerateConfig for PulsarSinkConfig {
376 fn generate_config() -> toml::Value {
377 toml::Value::try_from(Self::default()).unwrap()
378 }
379}
380
381#[async_trait::async_trait]
382#[typetag::serde(name = "pulsar")]
383impl SinkConfig for PulsarSinkConfig {
384 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
385 let client = self
386 .create_pulsar_client()
387 .await
388 .map_err(|e| super::sink::BuildError::CreatePulsarSink { source: e })?;
389
390 let sink = PulsarSink::new(client, self.clone())?;
391 let hc = healthcheck(self.clone()).boxed();
392
393 Ok((VectorSink::from_event_streamsink(sink), hc))
394 }
395
396 fn input(&self) -> Input {
397 let requirement =
398 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
399
400 Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
401 .with_schema_requirement(requirement)
402 }
403
404 fn acknowledgements(&self) -> &AcknowledgementsConfig {
405 &self.acknowledgements
406 }
407}