1use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10 Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12 consumer::Message,
13 message::proto::MessageIdData,
14};
15use tokio_util::codec::FramedRead;
16use vector_lib::{
17 EstimatedJsonEncodedSizeOf,
18 codecs::{
19 StreamDecodingError,
20 decoding::{DeserializerConfig, FramingConfig},
21 },
22 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
23 configurable::configurable_component,
24 event::Event,
25 finalization::BatchStatus,
26 finalizer::OrderedFinalizer,
27 internal_event::{
28 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
29 Registered,
30 },
31 sensitive_string::SensitiveString,
32 shutdown::ShutdownSignal,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37 SourceSender,
38 codecs::{Decoder, DecodingConfig},
39 config::{SourceConfig, SourceContext},
40 event::BatchNotifier,
41 internal_events::{
42 PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
43 },
44 serde::{bool_or_struct, default_decoding, default_framing_message_based},
45};
46
47#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
49#[derive(Clone, Debug, Derivative)]
50#[derivative(Default)]
51#[serde(deny_unknown_fields)]
52pub struct PulsarSourceConfig {
53 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
55 #[serde(alias = "address")]
56 endpoint: String,
57
58 #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
60 topics: Vec<String>,
61
62 #[configurable(metadata(docs::examples = "consumer-name"))]
64 consumer_name: Option<String>,
65
66 #[configurable(metadata(docs::examples = "subscription_name"))]
68 subscription_name: Option<String>,
69
70 priority_level: Option<i32>,
76
77 batch_size: Option<u32>,
79
80 #[configurable(derived)]
81 auth: Option<AuthConfig>,
82
83 #[configurable(derived)]
84 dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
85
86 #[configurable(derived)]
87 #[serde(default = "default_framing_message_based")]
88 #[derivative(Default(value = "default_framing_message_based()"))]
89 framing: FramingConfig,
90
91 #[configurable(derived)]
92 #[serde(default = "default_decoding")]
93 #[derivative(Default(value = "default_decoding()"))]
94 decoding: DeserializerConfig,
95
96 #[configurable(derived)]
97 #[serde(default, deserialize_with = "bool_or_struct")]
98 acknowledgements: SourceAcknowledgementsConfig,
99
100 #[configurable(metadata(docs::hidden))]
102 #[serde(default)]
103 log_namespace: Option<bool>,
104
105 #[configurable(derived)]
106 #[serde(default)]
107 tls: Option<TlsOptions>,
108}
109
110#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields, untagged)]
114enum AuthConfig {
115 Basic {
117 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
122 #[configurable(metadata(docs::examples = "name123"))]
123 name: String,
124
125 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
130 #[configurable(metadata(docs::examples = "123456789"))]
131 token: SensitiveString,
132 },
133
134 OAuth {
136 #[configurable(derived)]
137 oauth2: OAuth2Config,
138 },
139}
140
141#[configurable_component]
143#[derive(Clone, Debug)]
144pub struct OAuth2Config {
145 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
147 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
148 issuer_url: String,
149
150 #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
154 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
155 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
156 credentials_url: String,
157
158 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
160 #[configurable(metadata(docs::examples = "pulsar"))]
161 audience: Option<String>,
162
163 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
165 #[configurable(metadata(docs::examples = "admin"))]
166 scope: Option<String>,
167}
168
169#[configurable_component]
171#[derive(Clone, Debug)]
172struct DeadLetterQueuePolicy {
173 pub max_redeliver_count: usize,
175
176 pub dead_letter_topic: String,
178}
179
180#[configurable_component]
181#[configurable(description = "TLS options configuration for the Pulsar client.")]
182#[derive(Clone, Debug)]
183pub struct TlsOptions {
184 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
186 pub ca_file: String,
187
188 pub verify_certificate: Option<bool>,
192
193 pub verify_hostname: Option<bool>,
197}
198
199#[derive(Debug)]
200struct FinalizerEntry {
201 topic: String,
202 message_id: MessageIdData,
203}
204
205impl_generate_config_from_default!(PulsarSourceConfig);
206
207#[async_trait::async_trait]
208#[typetag::serde(name = "pulsar")]
209impl SourceConfig for PulsarSourceConfig {
210 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
211 let log_namespace = cx.log_namespace(self.log_namespace);
212
213 let consumer = self.create_consumer().await?;
214 let decoder =
215 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
216 .build()?;
217 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
218
219 Ok(Box::pin(pulsar_source(
220 consumer,
221 decoder,
222 cx.shutdown,
223 cx.out,
224 acknowledgements,
225 log_namespace,
226 )))
227 }
228
229 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
230 let log_namespace = global_log_namespace.merge(self.log_namespace);
231
232 let schema_definition = self
233 .decoding
234 .schema_definition(log_namespace)
235 .with_standard_vector_source_metadata()
236 .with_source_metadata(
237 Self::NAME,
238 Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
239 &owned_value_path!("publish_time"),
240 Kind::timestamp(),
241 Some("publish_time"),
242 )
243 .with_source_metadata(
244 Self::NAME,
245 Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
246 &owned_value_path!("topic"),
247 Kind::bytes(),
248 Some("topic"),
249 )
250 .with_source_metadata(
251 Self::NAME,
252 Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
253 &owned_value_path!("producer_name"),
254 Kind::bytes(),
255 Some("producer_name"),
256 );
257 vec![SourceOutput::new_maybe_logs(
258 self.decoding.output_type(),
259 schema_definition,
260 )]
261 }
262
263 fn can_acknowledge(&self) -> bool {
264 true
265 }
266}
267
268impl PulsarSourceConfig {
269 async fn create_consumer(
270 &self,
271 ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
272 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
273
274 if let Some(auth) = &self.auth {
275 builder = match auth {
276 AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
277 name: name.clone(),
278 data: token.inner().as_bytes().to_vec(),
279 }),
280 AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
281 OAuth2Authentication::client_credentials(OAuth2Params {
282 issuer_url: oauth2.issuer_url.clone(),
283 credentials_url: oauth2.credentials_url.clone(),
284 audience: oauth2.audience.clone(),
285 scope: oauth2.scope.clone(),
286 }),
287 ),
288 };
289 }
290 if let Some(options) = &self.tls {
291 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
292 builder =
293 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
294 builder = builder
295 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
296 }
297
298 let pulsar = builder.build().await?;
299
300 let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
301 .consumer()
302 .with_topics(&self.topics)
303 .with_subscription_type(SubType::Shared)
304 .with_options(pulsar::consumer::ConsumerOptions {
305 priority_level: self.priority_level,
306 ..Default::default()
307 });
308
309 if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
310 consumer_builder =
311 consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
312 max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
313 dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
314 });
315 }
316
317 if let Some(batch_size) = self.batch_size {
318 consumer_builder = consumer_builder.with_batch_size(batch_size);
319 }
320 if let Some(consumer_name) = &self.consumer_name {
321 consumer_builder = consumer_builder.with_consumer_name(consumer_name);
322 }
323 if let Some(subscription_name) = &self.subscription_name {
324 consumer_builder = consumer_builder.with_subscription(subscription_name);
325 }
326
327 let consumer = consumer_builder.build::<String>().await?;
328
329 Ok(consumer)
330 }
331}
332
333async fn pulsar_source(
334 mut consumer: Consumer<String, TokioExecutor>,
335 decoder: Decoder,
336 mut shutdown: ShutdownSignal,
337 mut out: SourceSender,
338 acknowledgements: bool,
339 log_namespace: LogNamespace,
340) -> Result<(), ()> {
341 let (finalizer, mut ack_stream) =
342 OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
343
344 let bytes_received = register!(BytesReceived::from(Protocol::TCP));
345 let events_received = register!(EventsReceived);
346 let pulsar_error_events = register!(PulsarErrorEvent);
347
348 loop {
349 tokio::select! {
350 _ = &mut shutdown => break,
351 entry = ack_stream.next() => {
352 if let Some((status, entry)) = entry {
353 handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
354 }
355 },
356 Some(maybe_message) = consumer.next() => {
357 match maybe_message {
358 Ok(msg) => {
359 bytes_received.emit(ByteSize(msg.payload.data.len()));
360 parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
361 }
362 Err(error) => {
363 pulsar_error_events.emit(PulsarErrorEventData{
364 msg: error.to_string(),
365 error_type:PulsarErrorEventType::Read,
366 });
367 }
368 }
369 },
370 }
371 }
372
373 Ok(())
374}
375
376#[allow(clippy::too_many_arguments)]
377async fn parse_message(
378 msg: Message<String>,
379 decoder: &Decoder,
380 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
381 out: &mut SourceSender,
382 consumer: &mut Consumer<String, TokioExecutor>,
383 log_namespace: LogNamespace,
384 events_received: &Registered<EventsReceived>,
385 pulsar_error_events: &Registered<PulsarErrorEvent>,
386) {
387 let publish_time = i64::try_from(msg.payload.metadata.publish_time)
388 .ok()
389 .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
390 let topic = msg.topic.clone();
391 let producer_name = msg.payload.metadata.producer_name.clone();
392
393 let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
394 let stream = async_stream::stream! {
395 while let Some(next) = stream.next().await {
396 match next {
397 Ok((events, _byte_size)) => {
398 events_received.emit(CountByteSize(
399 events.len(),
400 events.estimated_json_encoded_size_of(),
401 ));
402
403 let now = chrono::Utc::now();
404
405 let events = events.into_iter().map(|mut event| {
406 if let Event::Log(ref mut log) = event {
407 log_namespace.insert_standard_vector_source_metadata(
408 log,
409 PulsarSourceConfig::NAME,
410 now,
411 );
412
413 log_namespace.insert_source_metadata(
414 PulsarSourceConfig::NAME,
415 log,
416 Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
417 path!("publish_time"),
418 publish_time,
419 );
420
421 log_namespace.insert_source_metadata(
422 PulsarSourceConfig::NAME,
423 log,
424 Some(LegacyKey::InsertIfEmpty(path!("topic"))),
425 path!("topic"),
426 topic.clone(),
427 );
428
429 log_namespace.insert_source_metadata(
430 PulsarSourceConfig::NAME,
431 log,
432 Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
433 path!("producer_name"),
434 producer_name.clone(),
435 );
436 }
437 event
438 });
439
440 for event in events {
441 yield event;
442 }
443 }
444 Err(error) => {
445 if !error.can_continue() {
448 break;
449 }
450 }
451 }
452 }
453 }
454 .boxed();
455
456 finalize_event_stream(
457 consumer,
458 finalizer,
459 out,
460 stream,
461 msg.topic.clone(),
462 msg.message_id().clone(),
463 pulsar_error_events,
464 )
465 .await;
466}
467
468async fn finalize_event_stream(
470 consumer: &mut Consumer<String, TokioExecutor>,
471 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
472 out: &mut SourceSender,
473 mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
474 topic: String,
475 message_id: MessageIdData,
476 pulsar_error_events: &Registered<PulsarErrorEvent>,
477) {
478 match finalizer {
479 Some(finalizer) => {
480 let (batch, receiver) = BatchNotifier::new_with_receiver();
481 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
482
483 match out.send_event_stream(&mut stream).await {
484 Err(_error) => {
485 emit!(StreamClosedError { count: 1 });
486 }
487 Ok(_) => {
488 finalizer.add(FinalizerEntry { topic, message_id }, receiver);
489 }
490 }
491 }
492 None => match out.send_event_stream(&mut stream).await {
493 Err(_error) => {
494 emit!(StreamClosedError { count: 1 });
495 }
496 Ok(_) => {
497 if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
498 pulsar_error_events.emit(PulsarErrorEventData {
499 msg: error.to_string(),
500 error_type: PulsarErrorEventType::Ack,
501 });
502 }
503 }
504 },
505 }
506}
507
508async fn handle_ack(
509 consumer: &mut Consumer<String, TokioExecutor>,
510 status: BatchStatus,
511 entry: FinalizerEntry,
512 pulsar_error_events: &Registered<PulsarErrorEvent>,
513) {
514 match status {
515 BatchStatus::Delivered => {
516 if let Err(error) = consumer
517 .ack_with_id(entry.topic.as_str(), entry.message_id)
518 .await
519 {
520 pulsar_error_events.emit(PulsarErrorEventData {
521 msg: error.to_string(),
522 error_type: PulsarErrorEventType::Ack,
523 });
524 }
525 }
526 BatchStatus::Errored | BatchStatus::Rejected => {
527 if let Err(error) = consumer
528 .nack_with_id(entry.topic.as_str(), entry.message_id)
529 .await
530 {
531 pulsar_error_events.emit(PulsarErrorEventData {
532 msg: error.to_string(),
533 error_type: PulsarErrorEventType::NAck,
534 });
535 }
536 }
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use crate::sources::pulsar::PulsarSourceConfig;
543
544 #[test]
545 fn generate_config() {
546 crate::test_util::test_generate_config::<PulsarSourceConfig>();
547 }
548}
549
550#[cfg(feature = "pulsar-integration-tests")]
551#[cfg(test)]
552mod integration_tests {
553 use super::*;
554 use crate::{
555 config::log_schema,
556 test_util::{
557 collect_n,
558 components::{SOURCE_TAGS, assert_source_compliance},
559 random_string, trace_init,
560 },
561 tls::TEST_PEM_INTERMEDIATE_CA_PATH,
562 };
563
564 fn pulsar_host() -> String {
565 std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
566 }
567
568 fn pulsar_address(scheme: &str, port: u16) -> String {
569 format!("{}://{}:{}", scheme, pulsar_host(), port)
570 }
571 #[tokio::test]
572 async fn consumes_event_with_acknowledgements() {
573 pulsar_send_receive(
574 &pulsar_address("pulsar", 6650),
575 true,
576 LogNamespace::Legacy,
577 None,
578 )
579 .await;
580 }
581
582 #[tokio::test]
583 async fn consumes_event_with_acknowledgements_vector_namespace() {
584 pulsar_send_receive(
585 &pulsar_address("pulsar", 6650),
586 true,
587 LogNamespace::Vector,
588 None,
589 )
590 .await;
591 }
592
593 #[tokio::test]
594 async fn consumes_event_without_acknowledgements() {
595 pulsar_send_receive(
596 &pulsar_address("pulsar", 6650),
597 false,
598 LogNamespace::Legacy,
599 None,
600 )
601 .await;
602 }
603
604 #[tokio::test]
605 async fn consumes_event_without_acknowledgements_vector_namespace() {
606 pulsar_send_receive(
607 &pulsar_address("pulsar", 6650),
608 false,
609 LogNamespace::Vector,
610 None,
611 )
612 .await;
613 }
614
615 #[tokio::test]
616 async fn consumes_event_with_tls() {
617 pulsar_send_receive(
618 &pulsar_address("pulsar+ssl", 6651),
619 false,
620 LogNamespace::Vector,
621 Some(TlsOptions {
622 ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
623 verify_certificate: None,
624 verify_hostname: None,
625 }),
626 )
627 .await;
628 }
629
630 async fn pulsar_send_receive(
631 endpoint: &str,
632 acknowledgements: bool,
633 log_namespace: LogNamespace,
634 tls: Option<TlsOptions>,
635 ) {
636 trace_init();
637
638 let topic = format!("test-{}", random_string(10));
639 let cnf = PulsarSourceConfig {
640 endpoint: endpoint.into(),
641 topics: vec![topic.clone()],
642 consumer_name: None,
643 subscription_name: None,
644 priority_level: None,
645 batch_size: None,
646 auth: None,
647 dead_letter_queue_policy: None,
648 framing: FramingConfig::Bytes,
649 decoding: DeserializerConfig::Bytes,
650 acknowledgements: acknowledgements.into(),
651 log_namespace: None,
652 tls: tls.clone(),
653 };
654 let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
655 if let Some(options) = &tls {
656 builder = builder
657 .with_certificate_chain_file(Path::new(&options.ca_file))
658 .unwrap();
659 builder =
660 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
661 builder = builder
662 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
663 }
664
665 let pulsar = builder.build().await.unwrap();
666
667 let consumer = cnf.create_consumer().await.unwrap();
668 let decoder = DecodingConfig::new(
669 cnf.framing.clone(),
670 cnf.decoding.clone(),
671 LogNamespace::Legacy,
672 )
673 .build()
674 .unwrap();
675
676 let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
677
678 let msg = "test message";
679
680 let events = assert_source_compliance(&SOURCE_TAGS, async move {
681 let (tx, rx) = SourceSender::new_test();
682 tokio::spawn(pulsar_source(
683 consumer,
684 decoder,
685 ShutdownSignal::noop(),
686 tx,
687 acknowledgements,
688 log_namespace,
689 ));
690 producer.send_non_blocking(msg).await.unwrap();
691
692 collect_n(rx, 1).await
693 })
694 .await;
695
696 assert_eq!(
697 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
698 msg.into()
699 );
700 }
701}