1use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10 Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12 consumer::Message,
13 message::proto::MessageIdData,
14};
15use tokio_util::codec::FramedRead;
16use vector_lib::{
17 EstimatedJsonEncodedSizeOf,
18 codecs::{
19 Decoder, DecodingConfig, StreamDecodingError,
20 decoding::{DeserializerConfig, FramingConfig},
21 },
22 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
23 configurable::configurable_component,
24 event::Event,
25 finalization::BatchStatus,
26 finalizer::OrderedFinalizer,
27 internal_event::{
28 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
29 Registered,
30 },
31 sensitive_string::SensitiveString,
32 shutdown::ShutdownSignal,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37 SourceSender,
38 config::{SourceConfig, SourceContext},
39 event::BatchNotifier,
40 internal_events::{
41 PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
42 },
43 serde::{bool_or_struct, default_decoding, default_framing_message_based},
44};
45
46#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
48#[derive(Clone, Debug, Derivative)]
49#[derivative(Default)]
50#[serde(deny_unknown_fields)]
51pub struct PulsarSourceConfig {
52 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
54 #[serde(alias = "address")]
55 endpoint: String,
56
57 #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
59 topics: Vec<String>,
60
61 #[configurable(metadata(docs::examples = "consumer-name"))]
63 consumer_name: Option<String>,
64
65 #[configurable(metadata(docs::examples = "subscription_name"))]
67 subscription_name: Option<String>,
68
69 priority_level: Option<i32>,
75
76 batch_size: Option<u32>,
78
79 #[configurable(derived)]
80 auth: Option<AuthConfig>,
81
82 #[configurable(derived)]
83 dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
84
85 #[configurable(derived)]
86 #[serde(default = "default_framing_message_based")]
87 #[derivative(Default(value = "default_framing_message_based()"))]
88 framing: FramingConfig,
89
90 #[configurable(derived)]
91 #[serde(default = "default_decoding")]
92 #[derivative(Default(value = "default_decoding()"))]
93 decoding: DeserializerConfig,
94
95 #[configurable(derived)]
96 #[serde(default, deserialize_with = "bool_or_struct")]
97 acknowledgements: SourceAcknowledgementsConfig,
98
99 #[configurable(metadata(docs::hidden))]
101 #[serde(default)]
102 log_namespace: Option<bool>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 tls: Option<TlsOptions>,
107}
108
109#[configurable_component]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields, untagged)]
113enum AuthConfig {
114 Basic {
116 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
121 #[configurable(metadata(docs::examples = "name123"))]
122 name: String,
123
124 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
129 #[configurable(metadata(docs::examples = "123456789"))]
130 token: SensitiveString,
131 },
132
133 OAuth {
135 #[configurable(derived)]
136 oauth2: OAuth2Config,
137 },
138}
139
140#[configurable_component]
142#[derive(Clone, Debug)]
143pub struct OAuth2Config {
144 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
146 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
147 issuer_url: String,
148
149 #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
153 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
154 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
155 credentials_url: String,
156
157 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
159 #[configurable(metadata(docs::examples = "pulsar"))]
160 audience: Option<String>,
161
162 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
164 #[configurable(metadata(docs::examples = "admin"))]
165 scope: Option<String>,
166}
167
168#[configurable_component]
170#[derive(Clone, Debug)]
171struct DeadLetterQueuePolicy {
172 pub max_redeliver_count: usize,
174
175 pub dead_letter_topic: String,
177}
178
179#[configurable_component]
180#[configurable(description = "TLS options configuration for the Pulsar client.")]
181#[derive(Clone, Debug)]
182pub struct TlsOptions {
183 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
185 pub ca_file: String,
186
187 pub verify_certificate: Option<bool>,
191
192 pub verify_hostname: Option<bool>,
196}
197
198#[derive(Debug)]
199struct FinalizerEntry {
200 topic: String,
201 message_id: MessageIdData,
202}
203
204impl_generate_config_from_default!(PulsarSourceConfig);
205
206#[async_trait::async_trait]
207#[typetag::serde(name = "pulsar")]
208impl SourceConfig for PulsarSourceConfig {
209 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
210 let log_namespace = cx.log_namespace(self.log_namespace);
211
212 let consumer = self.create_consumer().await?;
213 let decoder =
214 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
215 .build()?;
216 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
217
218 Ok(Box::pin(pulsar_source(
219 consumer,
220 decoder,
221 cx.shutdown,
222 cx.out,
223 acknowledgements,
224 log_namespace,
225 )))
226 }
227
228 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
229 let log_namespace = global_log_namespace.merge(self.log_namespace);
230
231 let schema_definition = self
232 .decoding
233 .schema_definition(log_namespace)
234 .with_standard_vector_source_metadata()
235 .with_source_metadata(
236 Self::NAME,
237 Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
238 &owned_value_path!("publish_time"),
239 Kind::timestamp(),
240 Some("publish_time"),
241 )
242 .with_source_metadata(
243 Self::NAME,
244 Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
245 &owned_value_path!("topic"),
246 Kind::bytes(),
247 Some("topic"),
248 )
249 .with_source_metadata(
250 Self::NAME,
251 Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
252 &owned_value_path!("producer_name"),
253 Kind::bytes(),
254 Some("producer_name"),
255 );
256 vec![SourceOutput::new_maybe_logs(
257 self.decoding.output_type(),
258 schema_definition,
259 )]
260 }
261
262 fn can_acknowledge(&self) -> bool {
263 true
264 }
265}
266
267impl PulsarSourceConfig {
268 async fn create_consumer(
269 &self,
270 ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
271 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
272
273 if let Some(auth) = &self.auth {
274 builder = match auth {
275 AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
276 name: name.clone(),
277 data: token.inner().as_bytes().to_vec(),
278 }),
279 AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
280 OAuth2Authentication::client_credentials(OAuth2Params {
281 issuer_url: oauth2.issuer_url.clone(),
282 credentials_url: oauth2.credentials_url.clone(),
283 audience: oauth2.audience.clone(),
284 scope: oauth2.scope.clone(),
285 }),
286 ),
287 };
288 }
289 if let Some(options) = &self.tls {
290 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
291 builder =
292 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
293 builder = builder
294 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
295 }
296
297 let pulsar = builder.build().await?;
298
299 let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
300 .consumer()
301 .with_topics(&self.topics)
302 .with_subscription_type(SubType::Shared)
303 .with_options(pulsar::consumer::ConsumerOptions {
304 priority_level: self.priority_level,
305 ..Default::default()
306 });
307
308 if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
309 consumer_builder =
310 consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
311 max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
312 dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
313 });
314 }
315
316 if let Some(batch_size) = self.batch_size {
317 consumer_builder = consumer_builder.with_batch_size(batch_size);
318 }
319 if let Some(consumer_name) = &self.consumer_name {
320 consumer_builder = consumer_builder.with_consumer_name(consumer_name);
321 }
322 if let Some(subscription_name) = &self.subscription_name {
323 consumer_builder = consumer_builder.with_subscription(subscription_name);
324 }
325
326 let consumer = consumer_builder.build::<String>().await?;
327
328 Ok(consumer)
329 }
330}
331
332async fn pulsar_source(
333 mut consumer: Consumer<String, TokioExecutor>,
334 decoder: Decoder,
335 mut shutdown: ShutdownSignal,
336 mut out: SourceSender,
337 acknowledgements: bool,
338 log_namespace: LogNamespace,
339) -> Result<(), ()> {
340 let (finalizer, mut ack_stream) =
341 OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
342
343 let bytes_received = register!(BytesReceived::from(Protocol::TCP));
344 let events_received = register!(EventsReceived);
345 let pulsar_error_events = register!(PulsarErrorEvent);
346
347 loop {
348 tokio::select! {
349 _ = &mut shutdown => break,
350 entry = ack_stream.next() => {
351 if let Some((status, entry)) = entry {
352 handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
353 }
354 },
355 Some(maybe_message) = consumer.next() => {
356 match maybe_message {
357 Ok(msg) => {
358 bytes_received.emit(ByteSize(msg.payload.data.len()));
359 parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
360 }
361 Err(error) => {
362 pulsar_error_events.emit(PulsarErrorEventData{
363 msg: error.to_string(),
364 error_type:PulsarErrorEventType::Read,
365 });
366 }
367 }
368 },
369 }
370 }
371
372 Ok(())
373}
374
375#[allow(clippy::too_many_arguments)]
376async fn parse_message(
377 msg: Message<String>,
378 decoder: &Decoder,
379 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
380 out: &mut SourceSender,
381 consumer: &mut Consumer<String, TokioExecutor>,
382 log_namespace: LogNamespace,
383 events_received: &Registered<EventsReceived>,
384 pulsar_error_events: &Registered<PulsarErrorEvent>,
385) {
386 let publish_time = i64::try_from(msg.payload.metadata.publish_time)
387 .ok()
388 .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
389 let topic = msg.topic.clone();
390 let producer_name = msg.payload.metadata.producer_name.clone();
391
392 let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
393 let stream = async_stream::stream! {
394 while let Some(next) = stream.next().await {
395 match next {
396 Ok((events, _byte_size)) => {
397 events_received.emit(CountByteSize(
398 events.len(),
399 events.estimated_json_encoded_size_of(),
400 ));
401
402 let now = chrono::Utc::now();
403
404 let events = events.into_iter().map(|mut event| {
405 if let Event::Log(ref mut log) = event {
406 log_namespace.insert_standard_vector_source_metadata(
407 log,
408 PulsarSourceConfig::NAME,
409 now,
410 );
411
412 log_namespace.insert_source_metadata(
413 PulsarSourceConfig::NAME,
414 log,
415 Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
416 path!("publish_time"),
417 publish_time,
418 );
419
420 log_namespace.insert_source_metadata(
421 PulsarSourceConfig::NAME,
422 log,
423 Some(LegacyKey::InsertIfEmpty(path!("topic"))),
424 path!("topic"),
425 topic.clone(),
426 );
427
428 log_namespace.insert_source_metadata(
429 PulsarSourceConfig::NAME,
430 log,
431 Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
432 path!("producer_name"),
433 producer_name.clone(),
434 );
435 }
436 event
437 });
438
439 for event in events {
440 yield event;
441 }
442 }
443 Err(error) => {
444 if !error.can_continue() {
447 break;
448 }
449 }
450 }
451 }
452 }
453 .boxed();
454
455 finalize_event_stream(
456 consumer,
457 finalizer,
458 out,
459 stream,
460 msg.topic.clone(),
461 msg.message_id().clone(),
462 pulsar_error_events,
463 )
464 .await;
465}
466
467async fn finalize_event_stream(
469 consumer: &mut Consumer<String, TokioExecutor>,
470 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
471 out: &mut SourceSender,
472 mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
473 topic: String,
474 message_id: MessageIdData,
475 pulsar_error_events: &Registered<PulsarErrorEvent>,
476) {
477 match finalizer {
478 Some(finalizer) => {
479 let (batch, receiver) = BatchNotifier::new_with_receiver();
480 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
481
482 match out.send_event_stream(&mut stream).await {
483 Err(_error) => {
484 emit!(StreamClosedError { count: 1 });
485 }
486 Ok(_) => {
487 finalizer.add(FinalizerEntry { topic, message_id }, receiver);
488 }
489 }
490 }
491 None => match out.send_event_stream(&mut stream).await {
492 Err(_error) => {
493 emit!(StreamClosedError { count: 1 });
494 }
495 Ok(_) => {
496 if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
497 pulsar_error_events.emit(PulsarErrorEventData {
498 msg: error.to_string(),
499 error_type: PulsarErrorEventType::Ack,
500 });
501 }
502 }
503 },
504 }
505}
506
507async fn handle_ack(
508 consumer: &mut Consumer<String, TokioExecutor>,
509 status: BatchStatus,
510 entry: FinalizerEntry,
511 pulsar_error_events: &Registered<PulsarErrorEvent>,
512) {
513 match status {
514 BatchStatus::Delivered => {
515 if let Err(error) = consumer
516 .ack_with_id(entry.topic.as_str(), entry.message_id)
517 .await
518 {
519 pulsar_error_events.emit(PulsarErrorEventData {
520 msg: error.to_string(),
521 error_type: PulsarErrorEventType::Ack,
522 });
523 }
524 }
525 BatchStatus::Errored | BatchStatus::Rejected => {
526 if let Err(error) = consumer
527 .nack_with_id(entry.topic.as_str(), entry.message_id)
528 .await
529 {
530 pulsar_error_events.emit(PulsarErrorEventData {
531 msg: error.to_string(),
532 error_type: PulsarErrorEventType::NAck,
533 });
534 }
535 }
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use crate::sources::pulsar::PulsarSourceConfig;
542
543 #[test]
544 fn generate_config() {
545 crate::test_util::test_generate_config::<PulsarSourceConfig>();
546 }
547}
548
549#[cfg(feature = "pulsar-integration-tests")]
550#[cfg(test)]
551mod integration_tests {
552 use super::*;
553 use crate::{
554 config::log_schema,
555 test_util::{
556 collect_n,
557 components::{SOURCE_TAGS, assert_source_compliance},
558 random_string, trace_init,
559 },
560 tls::TEST_PEM_INTERMEDIATE_CA_PATH,
561 };
562
563 fn pulsar_host() -> String {
564 std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
565 }
566
567 fn pulsar_address(scheme: &str, port: u16) -> String {
568 format!("{}://{}:{}", scheme, pulsar_host(), port)
569 }
570 #[tokio::test]
571 async fn consumes_event_with_acknowledgements() {
572 pulsar_send_receive(
573 &pulsar_address("pulsar", 6650),
574 true,
575 LogNamespace::Legacy,
576 None,
577 )
578 .await;
579 }
580
581 #[tokio::test]
582 async fn consumes_event_with_acknowledgements_vector_namespace() {
583 pulsar_send_receive(
584 &pulsar_address("pulsar", 6650),
585 true,
586 LogNamespace::Vector,
587 None,
588 )
589 .await;
590 }
591
592 #[tokio::test]
593 async fn consumes_event_without_acknowledgements() {
594 pulsar_send_receive(
595 &pulsar_address("pulsar", 6650),
596 false,
597 LogNamespace::Legacy,
598 None,
599 )
600 .await;
601 }
602
603 #[tokio::test]
604 async fn consumes_event_without_acknowledgements_vector_namespace() {
605 pulsar_send_receive(
606 &pulsar_address("pulsar", 6650),
607 false,
608 LogNamespace::Vector,
609 None,
610 )
611 .await;
612 }
613
614 #[tokio::test]
615 async fn consumes_event_with_tls() {
616 pulsar_send_receive(
617 &pulsar_address("pulsar+ssl", 6651),
618 false,
619 LogNamespace::Vector,
620 Some(TlsOptions {
621 ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
622 verify_certificate: None,
623 verify_hostname: None,
624 }),
625 )
626 .await;
627 }
628
629 async fn pulsar_send_receive(
630 endpoint: &str,
631 acknowledgements: bool,
632 log_namespace: LogNamespace,
633 tls: Option<TlsOptions>,
634 ) {
635 trace_init();
636
637 let topic = format!("test-{}", random_string(10));
638 let cnf = PulsarSourceConfig {
639 endpoint: endpoint.into(),
640 topics: vec![topic.clone()],
641 consumer_name: None,
642 subscription_name: None,
643 priority_level: None,
644 batch_size: None,
645 auth: None,
646 dead_letter_queue_policy: None,
647 framing: FramingConfig::Bytes,
648 decoding: DeserializerConfig::Bytes,
649 acknowledgements: acknowledgements.into(),
650 log_namespace: None,
651 tls: tls.clone(),
652 };
653 let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
654 if let Some(options) = &tls {
655 builder = builder
656 .with_certificate_chain_file(Path::new(&options.ca_file))
657 .unwrap();
658 builder =
659 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
660 builder = builder
661 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
662 }
663
664 let pulsar = builder.build().await.unwrap();
665
666 let consumer = cnf.create_consumer().await.unwrap();
667 let decoder = DecodingConfig::new(
668 cnf.framing.clone(),
669 cnf.decoding.clone(),
670 LogNamespace::Legacy,
671 )
672 .build()
673 .unwrap();
674
675 let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
676
677 let msg = "test message";
678
679 let events = assert_source_compliance(&SOURCE_TAGS, async move {
680 let (tx, rx) = SourceSender::new_test();
681 tokio::spawn(pulsar_source(
682 consumer,
683 decoder,
684 ShutdownSignal::noop(),
685 tx,
686 acknowledgements,
687 log_namespace,
688 ));
689 producer.send_non_blocking(msg).await.unwrap();
690
691 collect_n(rx, 1).await
692 })
693 .await;
694
695 assert_eq!(
696 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
697 msg.into()
698 );
699 }
700}