1use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10 Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11 authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12 consumer::Message,
13 message::proto::MessageIdData,
14};
15use vector_lib::{
16 EstimatedJsonEncodedSizeOf,
17 codecs::{
18 Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
19 decoding::{DeserializerConfig, FramingConfig},
20 },
21 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
22 configurable::configurable_component,
23 event::Event,
24 finalization::BatchStatus,
25 finalizer::OrderedFinalizer,
26 internal_event::{
27 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
28 Registered,
29 },
30 sensitive_string::SensitiveString,
31 shutdown::ShutdownSignal,
32};
33use vrl::{owned_value_path, path, value::Kind};
34
35use crate::{
36 SourceSender,
37 config::{SourceConfig, SourceContext},
38 event::BatchNotifier,
39 internal_events::{
40 PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
41 },
42 serde::{bool_or_struct, default_decoding, default_framing_message_based},
43};
44
45#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
47#[derive(Clone, Debug, Derivative)]
48#[derivative(Default)]
49#[serde(deny_unknown_fields)]
50pub struct PulsarSourceConfig {
51 #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
53 #[serde(alias = "address")]
54 endpoint: String,
55
56 #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
58 topics: Vec<String>,
59
60 #[configurable(metadata(docs::examples = "consumer-name"))]
62 consumer_name: Option<String>,
63
64 #[configurable(metadata(docs::examples = "subscription_name"))]
66 subscription_name: Option<String>,
67
68 priority_level: Option<i32>,
74
75 batch_size: Option<u32>,
77
78 #[configurable(derived)]
79 auth: Option<AuthConfig>,
80
81 #[configurable(derived)]
82 dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
83
84 #[configurable(derived)]
85 #[serde(default = "default_framing_message_based")]
86 #[derivative(Default(value = "default_framing_message_based()"))]
87 framing: FramingConfig,
88
89 #[configurable(derived)]
90 #[serde(default = "default_decoding")]
91 #[derivative(Default(value = "default_decoding()"))]
92 decoding: DeserializerConfig,
93
94 #[configurable(derived)]
95 #[serde(default, deserialize_with = "bool_or_struct")]
96 acknowledgements: SourceAcknowledgementsConfig,
97
98 #[configurable(metadata(docs::hidden))]
100 #[serde(default)]
101 log_namespace: Option<bool>,
102
103 #[configurable(derived)]
104 #[serde(default)]
105 tls: Option<TlsOptions>,
106}
107
108#[configurable_component]
110#[derive(Clone, Debug)]
111#[serde(deny_unknown_fields, untagged)]
112enum AuthConfig {
113 Basic {
115 #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120 #[configurable(metadata(docs::examples = "name123"))]
121 name: String,
122
123 #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128 #[configurable(metadata(docs::examples = "123456789"))]
129 token: SensitiveString,
130 },
131
132 OAuth {
134 #[configurable(derived)]
135 oauth2: OAuth2Config,
136 },
137}
138
139#[configurable_component]
141#[derive(Clone, Debug)]
142pub struct OAuth2Config {
143 #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
145 #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
146 issuer_url: String,
147
148 #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
152 #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
153 #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
154 credentials_url: String,
155
156 #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
158 #[configurable(metadata(docs::examples = "pulsar"))]
159 audience: Option<String>,
160
161 #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
163 #[configurable(metadata(docs::examples = "admin"))]
164 scope: Option<String>,
165}
166
167#[configurable_component]
169#[derive(Clone, Debug)]
170struct DeadLetterQueuePolicy {
171 pub max_redeliver_count: usize,
173
174 pub dead_letter_topic: String,
176}
177
178#[configurable_component]
179#[configurable(description = "TLS options configuration for the Pulsar client.")]
180#[derive(Clone, Debug)]
181pub struct TlsOptions {
182 #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
184 pub ca_file: String,
185
186 pub verify_certificate: Option<bool>,
190
191 pub verify_hostname: Option<bool>,
195}
196
197#[derive(Debug)]
198struct FinalizerEntry {
199 topic: String,
200 message_id: MessageIdData,
201}
202
203impl_generate_config_from_default!(PulsarSourceConfig);
204
205#[async_trait::async_trait]
206#[typetag::serde(name = "pulsar")]
207impl SourceConfig for PulsarSourceConfig {
208 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
209 let log_namespace = cx.log_namespace(self.log_namespace);
210
211 let consumer = self.create_consumer().await?;
212 let decoder =
213 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
214 .build()?;
215 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
216
217 Ok(Box::pin(pulsar_source(
218 consumer,
219 decoder,
220 cx.shutdown,
221 cx.out,
222 acknowledgements,
223 log_namespace,
224 )))
225 }
226
227 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
228 let log_namespace = global_log_namespace.merge(self.log_namespace);
229
230 let schema_definition = self
231 .decoding
232 .schema_definition(log_namespace)
233 .with_standard_vector_source_metadata()
234 .with_source_metadata(
235 Self::NAME,
236 Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
237 &owned_value_path!("publish_time"),
238 Kind::timestamp(),
239 Some("publish_time"),
240 )
241 .with_source_metadata(
242 Self::NAME,
243 Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
244 &owned_value_path!("topic"),
245 Kind::bytes(),
246 Some("topic"),
247 )
248 .with_source_metadata(
249 Self::NAME,
250 Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
251 &owned_value_path!("producer_name"),
252 Kind::bytes(),
253 Some("producer_name"),
254 );
255 vec![SourceOutput::new_maybe_logs(
256 self.decoding.output_type(),
257 schema_definition,
258 )]
259 }
260
261 fn can_acknowledge(&self) -> bool {
262 true
263 }
264}
265
266impl PulsarSourceConfig {
267 async fn create_consumer(
268 &self,
269 ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
270 let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
271
272 if let Some(auth) = &self.auth {
273 builder = match auth {
274 AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
275 name: name.clone(),
276 data: token.inner().as_bytes().to_vec(),
277 }),
278 AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
279 OAuth2Authentication::client_credentials(OAuth2Params {
280 issuer_url: oauth2.issuer_url.clone(),
281 credentials_url: oauth2.credentials_url.clone(),
282 audience: oauth2.audience.clone(),
283 scope: oauth2.scope.clone(),
284 }),
285 ),
286 };
287 }
288 if let Some(options) = &self.tls {
289 builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
290 builder =
291 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
292 builder = builder
293 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
294 }
295
296 let pulsar = builder.build().await?;
297
298 let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
299 .consumer()
300 .with_topics(&self.topics)
301 .with_subscription_type(SubType::Shared)
302 .with_options(pulsar::consumer::ConsumerOptions {
303 priority_level: self.priority_level,
304 ..Default::default()
305 });
306
307 if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
308 consumer_builder =
309 consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
310 max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
311 dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
312 });
313 }
314
315 if let Some(batch_size) = self.batch_size {
316 consumer_builder = consumer_builder.with_batch_size(batch_size);
317 }
318 if let Some(consumer_name) = &self.consumer_name {
319 consumer_builder = consumer_builder.with_consumer_name(consumer_name);
320 }
321 if let Some(subscription_name) = &self.subscription_name {
322 consumer_builder = consumer_builder.with_subscription(subscription_name);
323 }
324
325 let consumer = consumer_builder.build::<String>().await?;
326
327 Ok(consumer)
328 }
329}
330
331async fn pulsar_source(
332 mut consumer: Consumer<String, TokioExecutor>,
333 decoder: Decoder,
334 mut shutdown: ShutdownSignal,
335 mut out: SourceSender,
336 acknowledgements: bool,
337 log_namespace: LogNamespace,
338) -> Result<(), ()> {
339 let (finalizer, mut ack_stream) =
340 OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
341
342 let bytes_received = register!(BytesReceived::from(Protocol::TCP));
343 let events_received = register!(EventsReceived);
344 let pulsar_error_events = register!(PulsarErrorEvent);
345
346 loop {
347 tokio::select! {
348 _ = &mut shutdown => break,
349 entry = ack_stream.next() => {
350 if let Some((status, entry)) = entry {
351 handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
352 }
353 },
354 Some(maybe_message) = consumer.next() => {
355 match maybe_message {
356 Ok(msg) => {
357 bytes_received.emit(ByteSize(msg.payload.data.len()));
358 parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
359 }
360 Err(error) => {
361 pulsar_error_events.emit(PulsarErrorEventData{
362 msg: error.to_string(),
363 error_type:PulsarErrorEventType::Read,
364 });
365 }
366 }
367 },
368 }
369 }
370
371 Ok(())
372}
373
374#[allow(clippy::too_many_arguments)]
375async fn parse_message(
376 msg: Message<String>,
377 decoder: &Decoder,
378 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
379 out: &mut SourceSender,
380 consumer: &mut Consumer<String, TokioExecutor>,
381 log_namespace: LogNamespace,
382 events_received: &Registered<EventsReceived>,
383 pulsar_error_events: &Registered<PulsarErrorEvent>,
384) {
385 let publish_time = i64::try_from(msg.payload.metadata.publish_time)
386 .ok()
387 .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
388 let topic = msg.topic.clone();
389 let producer_name = msg.payload.metadata.producer_name.clone();
390
391 let mut stream = DecoderFramedRead::new(msg.payload.data.as_ref(), decoder.clone());
392 let stream = async_stream::stream! {
393 while let Some(next) = stream.next().await {
394 match next {
395 Ok((events, _byte_size)) => {
396 events_received.emit(CountByteSize(
397 events.len(),
398 events.estimated_json_encoded_size_of(),
399 ));
400
401 let now = chrono::Utc::now();
402
403 let events = events.into_iter().map(|mut event| {
404 if let Event::Log(ref mut log) = event {
405 log_namespace.insert_standard_vector_source_metadata(
406 log,
407 PulsarSourceConfig::NAME,
408 now,
409 );
410
411 log_namespace.insert_source_metadata(
412 PulsarSourceConfig::NAME,
413 log,
414 Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
415 path!("publish_time"),
416 publish_time,
417 );
418
419 log_namespace.insert_source_metadata(
420 PulsarSourceConfig::NAME,
421 log,
422 Some(LegacyKey::InsertIfEmpty(path!("topic"))),
423 path!("topic"),
424 topic.clone(),
425 );
426
427 log_namespace.insert_source_metadata(
428 PulsarSourceConfig::NAME,
429 log,
430 Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
431 path!("producer_name"),
432 producer_name.clone(),
433 );
434 }
435 event
436 });
437
438 for event in events {
439 yield event;
440 }
441 }
442 Err(error) => {
443 if !error.can_continue() {
446 break;
447 }
448 }
449 }
450 }
451 }
452 .boxed();
453
454 finalize_event_stream(
455 consumer,
456 finalizer,
457 out,
458 stream,
459 msg.topic.clone(),
460 msg.message_id().clone(),
461 pulsar_error_events,
462 )
463 .await;
464}
465
466async fn finalize_event_stream(
468 consumer: &mut Consumer<String, TokioExecutor>,
469 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
470 out: &mut SourceSender,
471 mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
472 topic: String,
473 message_id: MessageIdData,
474 pulsar_error_events: &Registered<PulsarErrorEvent>,
475) {
476 match finalizer {
477 Some(finalizer) => {
478 let (batch, receiver) = BatchNotifier::new_with_receiver();
479 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
480
481 match out.send_event_stream(&mut stream).await {
482 Err(_error) => {
483 emit!(StreamClosedError { count: 1 });
484 }
485 Ok(_) => {
486 finalizer.add(FinalizerEntry { topic, message_id }, receiver);
487 }
488 }
489 }
490 None => match out.send_event_stream(&mut stream).await {
491 Err(_error) => {
492 emit!(StreamClosedError { count: 1 });
493 }
494 Ok(_) => {
495 if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
496 pulsar_error_events.emit(PulsarErrorEventData {
497 msg: error.to_string(),
498 error_type: PulsarErrorEventType::Ack,
499 });
500 }
501 }
502 },
503 }
504}
505
506async fn handle_ack(
507 consumer: &mut Consumer<String, TokioExecutor>,
508 status: BatchStatus,
509 entry: FinalizerEntry,
510 pulsar_error_events: &Registered<PulsarErrorEvent>,
511) {
512 match status {
513 BatchStatus::Delivered => {
514 if let Err(error) = consumer
515 .ack_with_id(entry.topic.as_str(), entry.message_id)
516 .await
517 {
518 pulsar_error_events.emit(PulsarErrorEventData {
519 msg: error.to_string(),
520 error_type: PulsarErrorEventType::Ack,
521 });
522 }
523 }
524 BatchStatus::Errored | BatchStatus::Rejected => {
525 if let Err(error) = consumer
526 .nack_with_id(entry.topic.as_str(), entry.message_id)
527 .await
528 {
529 pulsar_error_events.emit(PulsarErrorEventData {
530 msg: error.to_string(),
531 error_type: PulsarErrorEventType::NAck,
532 });
533 }
534 }
535 }
536}
537
538#[cfg(test)]
539mod tests {
540 use crate::sources::pulsar::PulsarSourceConfig;
541
542 #[test]
543 fn generate_config() {
544 crate::test_util::test_generate_config::<PulsarSourceConfig>();
545 }
546}
547
548#[cfg(feature = "pulsar-integration-tests")]
549#[cfg(test)]
550mod integration_tests {
551 use super::*;
552 use crate::{
553 config::log_schema,
554 test_util::{
555 collect_n,
556 components::{SOURCE_TAGS, assert_source_compliance},
557 random_string, trace_init,
558 },
559 tls::TEST_PEM_INTERMEDIATE_CA_PATH,
560 };
561
562 fn pulsar_host() -> String {
563 std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
564 }
565
566 fn pulsar_address(scheme: &str, port: u16) -> String {
567 format!("{}://{}:{}", scheme, pulsar_host(), port)
568 }
569 #[tokio::test]
570 async fn consumes_event_with_acknowledgements() {
571 pulsar_send_receive(
572 &pulsar_address("pulsar", 6650),
573 true,
574 LogNamespace::Legacy,
575 None,
576 )
577 .await;
578 }
579
580 #[tokio::test]
581 async fn consumes_event_with_acknowledgements_vector_namespace() {
582 pulsar_send_receive(
583 &pulsar_address("pulsar", 6650),
584 true,
585 LogNamespace::Vector,
586 None,
587 )
588 .await;
589 }
590
591 #[tokio::test]
592 async fn consumes_event_without_acknowledgements() {
593 pulsar_send_receive(
594 &pulsar_address("pulsar", 6650),
595 false,
596 LogNamespace::Legacy,
597 None,
598 )
599 .await;
600 }
601
602 #[tokio::test]
603 async fn consumes_event_without_acknowledgements_vector_namespace() {
604 pulsar_send_receive(
605 &pulsar_address("pulsar", 6650),
606 false,
607 LogNamespace::Vector,
608 None,
609 )
610 .await;
611 }
612
613 #[tokio::test]
614 async fn consumes_event_with_tls() {
615 pulsar_send_receive(
616 &pulsar_address("pulsar+ssl", 6651),
617 false,
618 LogNamespace::Vector,
619 Some(TlsOptions {
620 ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
621 verify_certificate: None,
622 verify_hostname: None,
623 }),
624 )
625 .await;
626 }
627
628 async fn pulsar_send_receive(
629 endpoint: &str,
630 acknowledgements: bool,
631 log_namespace: LogNamespace,
632 tls: Option<TlsOptions>,
633 ) {
634 trace_init();
635
636 let topic = format!("test-{}", random_string(10));
637 let cnf = PulsarSourceConfig {
638 endpoint: endpoint.into(),
639 topics: vec![topic.clone()],
640 consumer_name: None,
641 subscription_name: None,
642 priority_level: None,
643 batch_size: None,
644 auth: None,
645 dead_letter_queue_policy: None,
646 framing: FramingConfig::Bytes,
647 decoding: DeserializerConfig::Bytes,
648 acknowledgements: acknowledgements.into(),
649 log_namespace: None,
650 tls: tls.clone(),
651 };
652 let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
653 if let Some(options) = &tls {
654 builder = builder
655 .with_certificate_chain_file(Path::new(&options.ca_file))
656 .unwrap();
657 builder =
658 builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
659 builder = builder
660 .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
661 }
662
663 let pulsar = builder.build().await.unwrap();
664
665 let consumer = cnf.create_consumer().await.unwrap();
666 let decoder = DecodingConfig::new(
667 cnf.framing.clone(),
668 cnf.decoding.clone(),
669 LogNamespace::Legacy,
670 )
671 .build()
672 .unwrap();
673
674 let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
675
676 let msg = "test message";
677
678 let events = assert_source_compliance(&SOURCE_TAGS, async move {
679 let (tx, rx) = SourceSender::new_test();
680 tokio::spawn(pulsar_source(
681 consumer,
682 decoder,
683 ShutdownSignal::noop(),
684 tx,
685 acknowledgements,
686 log_namespace,
687 ));
688 producer.send_non_blocking(msg).await.unwrap();
689
690 collect_n(rx, 1).await
691 })
692 .await;
693
694 assert_eq!(
695 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
696 msg.into()
697 );
698 }
699}