1use chrono::TimeZone;
6use futures_util::StreamExt;
7use pulsar::{
8 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
9 consumer::Message,
10 message::proto::MessageIdData,
11 Authentication, Consumer, Pulsar, SubType, TokioExecutor,
12};
13use std::path::Path;
14use tokio_util::codec::FramedRead;
15
16use vector_lib::{
17 codecs::{
18 decoding::{DeserializerConfig, FramingConfig},
19 StreamDecodingError,
20 },
21 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
22 configurable::configurable_component,
23 event::Event,
24 finalization::BatchStatus,
25 finalizer::OrderedFinalizer,
26 internal_event::{
27 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
28 Registered,
29 },
30 sensitive_string::SensitiveString,
31 shutdown::ShutdownSignal,
32 EstimatedJsonEncodedSizeOf,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37 codecs::{Decoder, DecodingConfig},
38 config::{SourceConfig, SourceContext},
39 event::BatchNotifier,
40 internal_events::{
41 PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
42 },
43 serde::{bool_or_struct, default_decoding, default_framing_message_based},
44 SourceSender,
45};
46
47#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
49#[derive(Clone, Debug, Derivative)]
50#[derivative(Default)]
51#[serde(deny_unknown_fields)]
52pub struct PulsarSourceConfig {
53 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
55 #[serde(alias = "address")]
56 endpoint: String,
57
58 #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
60 topics: Vec<String>,
61
62 #[configurable(metadata(docs::examples = "consumer-name"))]
64 consumer_name: Option<String>,
65
66 #[configurable(metadata(docs::examples = "subscription_name"))]
68 subscription_name: Option<String>,
69
70 priority_level: Option<i32>,
76
77 batch_size: Option<u32>,
79
80 #[configurable(derived)]
81 auth: Option<AuthConfig>,
82
83 #[configurable(derived)]
84 dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
85
86 #[configurable(derived)]
87 #[serde(default = "default_framing_message_based")]
88 #[derivative(Default(value = "default_framing_message_based()"))]
89 framing: FramingConfig,
90
91 #[configurable(derived)]
92 #[serde(default = "default_decoding")]
93 #[derivative(Default(value = "default_decoding()"))]
94 decoding: DeserializerConfig,
95
96 #[configurable(derived)]
97 #[serde(default, deserialize_with = "bool_or_struct")]
98 acknowledgements: SourceAcknowledgementsConfig,
99
100 #[configurable(metadata(docs::hidden))]
102 #[serde(default)]
103 log_namespace: Option<bool>,
104
105 #[configurable(derived)]
106 #[serde(default)]
107 tls: Option<TlsOptions>,
108}
109
110#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields, untagged)]
114enum AuthConfig {
115 Basic {
117 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
122 #[configurable(metadata(docs::examples = "name123"))]
123 name: String,
124
125 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
130 #[configurable(metadata(docs::examples = "123456789"))]
131 token: SensitiveString,
132 },
133
134 OAuth {
136 #[configurable(derived)]
137 oauth2: OAuth2Config,
138 },
139}
140
141#[configurable_component]
143#[derive(Clone, Debug)]
144pub struct OAuth2Config {
145 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
147 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
148 issuer_url: String,
149
150 #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
154 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
155 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
156 credentials_url: String,
157
158 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
160 #[configurable(metadata(docs::examples = "pulsar"))]
161 audience: Option<String>,
162
163 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
165 #[configurable(metadata(docs::examples = "admin"))]
166 scope: Option<String>,
167}
168
169#[configurable_component]
171#[derive(Clone, Debug)]
172struct DeadLetterQueuePolicy {
173 pub max_redeliver_count: usize,
175
176 pub dead_letter_topic: String,
178}
179
180#[configurable_component]
181#[configurable(description = "TLS options configuration for the Pulsar client.")]
182#[derive(Clone, Debug)]
183pub struct TlsOptions {
184 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
186 pub ca_file: String,
187
188 pub verify_certificate: Option<bool>,
192
193 pub verify_hostname: Option<bool>,
197}
198
199#[derive(Debug)]
200struct FinalizerEntry {
201 topic: String,
202 message_id: MessageIdData,
203}
204
205impl_generate_config_from_default!(PulsarSourceConfig);
206
207#[async_trait::async_trait]
208#[typetag::serde(name = "pulsar")]
209impl SourceConfig for PulsarSourceConfig {
210 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
211 let log_namespace = cx.log_namespace(self.log_namespace);
212
213 let consumer = self.create_consumer().await?;
214 let decoder =
215 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
216 .build()?;
217 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
218
219 Ok(Box::pin(pulsar_source(
220 consumer,
221 decoder,
222 cx.shutdown,
223 cx.out,
224 acknowledgements,
225 log_namespace,
226 )))
227 }
228
229 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
230 let log_namespace = global_log_namespace.merge(self.log_namespace);
231
232 let schema_definition = self
233 .decoding
234 .schema_definition(log_namespace)
235 .with_standard_vector_source_metadata()
236 .with_source_metadata(
237 Self::NAME,
238 Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
239 &owned_value_path!("publish_time"),
240 Kind::timestamp(),
241 Some("publish_time"),
242 )
243 .with_source_metadata(
244 Self::NAME,
245 Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
246 &owned_value_path!("topic"),
247 Kind::bytes(),
248 Some("topic"),
249 )
250 .with_source_metadata(
251 Self::NAME,
252 Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
253 &owned_value_path!("producer_name"),
254 Kind::bytes(),
255 Some("producer_name"),
256 );
257 vec![SourceOutput::new_maybe_logs(
258 self.decoding.output_type(),
259 schema_definition,
260 )]
261 }
262
263 fn can_acknowledge(&self) -> bool {
264 true
265 }
266}
267
268impl PulsarSourceConfig {
269 async fn create_consumer(
270 &self,
271 ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
272 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
273
274 if let Some(auth) = &self.auth {
275 builder = match auth {
276 AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
277 name: name.clone(),
278 data: token.inner().as_bytes().to_vec(),
279 }),
280 AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
281 OAuth2Authentication::client_credentials(OAuth2Params {
282 issuer_url: oauth2.issuer_url.clone(),
283 credentials_url: oauth2.credentials_url.clone(),
284 audience: oauth2.audience.clone(),
285 scope: oauth2.scope.clone(),
286 }),
287 ),
288 };
289 }
290 if let Some(options) = &self.tls {
291 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
292 builder =
293 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
294 builder = builder
295 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
296 }
297
298 let pulsar = builder.build().await?;
299
300 let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
301 .consumer()
302 .with_topics(&self.topics)
303 .with_subscription_type(SubType::Shared)
304 .with_options(pulsar::consumer::ConsumerOptions {
305 priority_level: self.priority_level,
306 ..Default::default()
307 });
308
309 if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
310 consumer_builder =
311 consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
312 max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
313 dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
314 });
315 }
316
317 if let Some(batch_size) = self.batch_size {
318 consumer_builder = consumer_builder.with_batch_size(batch_size);
319 }
320 if let Some(consumer_name) = &self.consumer_name {
321 consumer_builder = consumer_builder.with_consumer_name(consumer_name);
322 }
323 if let Some(subscription_name) = &self.subscription_name {
324 consumer_builder = consumer_builder.with_subscription(subscription_name);
325 }
326
327 let consumer = consumer_builder.build::<String>().await?;
328
329 Ok(consumer)
330 }
331}
332
333async fn pulsar_source(
334 mut consumer: Consumer<String, TokioExecutor>,
335 decoder: Decoder,
336 mut shutdown: ShutdownSignal,
337 mut out: SourceSender,
338 acknowledgements: bool,
339 log_namespace: LogNamespace,
340) -> Result<(), ()> {
341 let (finalizer, mut ack_stream) =
342 OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
343
344 let bytes_received = register!(BytesReceived::from(Protocol::TCP));
345 let events_received = register!(EventsReceived);
346 let pulsar_error_events = register!(PulsarErrorEvent);
347
348 loop {
349 tokio::select! {
350 _ = &mut shutdown => break,
351 entry = ack_stream.next() => {
352 if let Some((status, entry)) = entry {
353 handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
354 }
355 },
356 Some(maybe_message) = consumer.next() => {
357 match maybe_message {
358 Ok(msg) => {
359 bytes_received.emit(ByteSize(msg.payload.data.len()));
360 parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
361 }
362 Err(error) => {
363 pulsar_error_events.emit(PulsarErrorEventData{
364 msg: error.to_string(),
365 error_type:PulsarErrorEventType::Read,
366 });
367 }
368 }
369 },
370 }
371 }
372
373 Ok(())
374}
375
376#[allow(clippy::too_many_arguments)]
377async fn parse_message(
378 msg: Message<String>,
379 decoder: &Decoder,
380 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
381 out: &mut SourceSender,
382 consumer: &mut Consumer<String, TokioExecutor>,
383 log_namespace: LogNamespace,
384 events_received: &Registered<EventsReceived>,
385 pulsar_error_events: &Registered<PulsarErrorEvent>,
386) {
387 let publish_time = i64::try_from(msg.payload.metadata.publish_time)
388 .ok()
389 .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
390 let topic = msg.topic.clone();
391 let producer_name = msg.payload.metadata.producer_name.clone();
392
393 let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
394 let stream = async_stream::stream! {
395 while let Some(next) = stream.next().await {
396 match next {
397 Ok((events, _byte_size)) => {
398 events_received.emit(CountByteSize(
399 events.len(),
400 events.estimated_json_encoded_size_of(),
401 ));
402
403 let now = chrono::Utc::now();
404
405 let events = events.into_iter().map(|mut event| {
406 if let Event::Log(ref mut log) = event {
407 log_namespace.insert_standard_vector_source_metadata(
408 log,
409 PulsarSourceConfig::NAME,
410 now,
411 );
412
413 log_namespace.insert_source_metadata(
414 PulsarSourceConfig::NAME,
415 log,
416 Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
417 path!("publish_time"),
418 publish_time,
419 );
420
421 log_namespace.insert_source_metadata(
422 PulsarSourceConfig::NAME,
423 log,
424 Some(LegacyKey::InsertIfEmpty(path!("topic"))),
425 path!("topic"),
426 topic.clone(),
427 );
428
429 log_namespace.insert_source_metadata(
430 PulsarSourceConfig::NAME,
431 log,
432 Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
433 path!("producer_name"),
434 producer_name.clone(),
435 );
436 }
437 event
438 });
439
440 for event in events {
441 yield event;
442 }
443 }
444 Err(error) => {
445 if !error.can_continue() {
448 break;
449 }
450 }
451 }
452 }
453 }
454 .boxed();
455
456 finalize_event_stream(
457 consumer,
458 finalizer,
459 out,
460 stream,
461 msg.topic.clone(),
462 msg.message_id().clone(),
463 pulsar_error_events,
464 )
465 .await;
466}
467
468async fn finalize_event_stream(
470 consumer: &mut Consumer<String, TokioExecutor>,
471 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
472 out: &mut SourceSender,
473 mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
474 topic: String,
475 message_id: MessageIdData,
476 pulsar_error_events: &Registered<PulsarErrorEvent>,
477) {
478 match finalizer {
479 Some(finalizer) => {
480 let (batch, receiver) = BatchNotifier::new_with_receiver();
481 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
482
483 match out.send_event_stream(&mut stream).await {
484 Err(_error) => {
485 emit!(StreamClosedError { count: 1 });
486 }
487 Ok(_) => {
488 finalizer.add(FinalizerEntry { topic, message_id }, receiver);
489 }
490 }
491 }
492 None => match out.send_event_stream(&mut stream).await {
493 Err(_error) => {
494 emit!(StreamClosedError { count: 1 });
495 }
496 Ok(_) => {
497 if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
498 pulsar_error_events.emit(PulsarErrorEventData {
499 msg: error.to_string(),
500 error_type: PulsarErrorEventType::Ack,
501 });
502 }
503 }
504 },
505 }
506}
507
508async fn handle_ack(
509 consumer: &mut Consumer<String, TokioExecutor>,
510 status: BatchStatus,
511 entry: FinalizerEntry,
512 pulsar_error_events: &Registered<PulsarErrorEvent>,
513) {
514 match status {
515 BatchStatus::Delivered => {
516 if let Err(error) = consumer
517 .ack_with_id(entry.topic.as_str(), entry.message_id)
518 .await
519 {
520 pulsar_error_events.emit(PulsarErrorEventData {
521 msg: error.to_string(),
522 error_type: PulsarErrorEventType::Ack,
523 });
524 }
525 }
526 BatchStatus::Errored | BatchStatus::Rejected => {
527 if let Err(error) = consumer
528 .nack_with_id(entry.topic.as_str(), entry.message_id)
529 .await
530 {
531 pulsar_error_events.emit(PulsarErrorEventData {
532 msg: error.to_string(),
533 error_type: PulsarErrorEventType::NAck,
534 });
535 }
536 }
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use crate::sources::pulsar::PulsarSourceConfig;
543
544 #[test]
545 fn generate_config() {
546 crate::test_util::test_generate_config::<PulsarSourceConfig>();
547 }
548}
549
550#[cfg(feature = "pulsar-integration-tests")]
551#[cfg(test)]
552mod integration_tests {
553 use super::*;
554 use crate::config::log_schema;
555 use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
556 use crate::test_util::{collect_n, random_string, trace_init};
557 use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;
558
559 fn pulsar_host() -> String {
560 std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
561 }
562
563 fn pulsar_address(scheme: &str, port: u16) -> String {
564 format!("{}://{}:{}", scheme, pulsar_host(), port)
565 }
566 #[tokio::test]
567 async fn consumes_event_with_acknowledgements() {
568 pulsar_send_receive(
569 &pulsar_address("pulsar", 6650),
570 true,
571 LogNamespace::Legacy,
572 None,
573 )
574 .await;
575 }
576
577 #[tokio::test]
578 async fn consumes_event_with_acknowledgements_vector_namespace() {
579 pulsar_send_receive(
580 &pulsar_address("pulsar", 6650),
581 true,
582 LogNamespace::Vector,
583 None,
584 )
585 .await;
586 }
587
588 #[tokio::test]
589 async fn consumes_event_without_acknowledgements() {
590 pulsar_send_receive(
591 &pulsar_address("pulsar", 6650),
592 false,
593 LogNamespace::Legacy,
594 None,
595 )
596 .await;
597 }
598
599 #[tokio::test]
600 async fn consumes_event_without_acknowledgements_vector_namespace() {
601 pulsar_send_receive(
602 &pulsar_address("pulsar", 6650),
603 false,
604 LogNamespace::Vector,
605 None,
606 )
607 .await;
608 }
609
610 #[tokio::test]
611 async fn consumes_event_with_tls() {
612 pulsar_send_receive(
613 &pulsar_address("pulsar+ssl", 6651),
614 false,
615 LogNamespace::Vector,
616 Some(TlsOptions {
617 ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
618 verify_certificate: None,
619 verify_hostname: None,
620 }),
621 )
622 .await;
623 }
624
625 async fn pulsar_send_receive(
626 endpoint: &str,
627 acknowledgements: bool,
628 log_namespace: LogNamespace,
629 tls: Option<TlsOptions>,
630 ) {
631 trace_init();
632
633 let topic = format!("test-{}", random_string(10));
634 let cnf = PulsarSourceConfig {
635 endpoint: endpoint.into(),
636 topics: vec![topic.clone()],
637 consumer_name: None,
638 subscription_name: None,
639 priority_level: None,
640 batch_size: None,
641 auth: None,
642 dead_letter_queue_policy: None,
643 framing: FramingConfig::Bytes,
644 decoding: DeserializerConfig::Bytes,
645 acknowledgements: acknowledgements.into(),
646 log_namespace: None,
647 tls: tls.clone(),
648 };
649 let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
650 if let Some(options) = &tls {
651 builder = builder
652 .with_certificate_chain_file(Path::new(&options.ca_file))
653 .unwrap();
654 builder =
655 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
656 builder = builder
657 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
658 }
659
660 let pulsar = builder.build().await.unwrap();
661
662 let consumer = cnf.create_consumer().await.unwrap();
663 let decoder = DecodingConfig::new(
664 cnf.framing.clone(),
665 cnf.decoding.clone(),
666 LogNamespace::Legacy,
667 )
668 .build()
669 .unwrap();
670
671 let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
672
673 let msg = "test message";
674
675 let events = assert_source_compliance(&SOURCE_TAGS, async move {
676 let (tx, rx) = SourceSender::new_test();
677 tokio::spawn(pulsar_source(
678 consumer,
679 decoder,
680 ShutdownSignal::noop(),
681 tx,
682 acknowledgements,
683 log_namespace,
684 ));
685 producer.send_non_blocking(msg).await.unwrap();
686
687 collect_n(rx, 1).await
688 })
689 .await;
690
691 assert_eq!(
692 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
693 msg.into()
694 );
695 }
696}