vector/sources/
pulsar.rs

1//! `Pulsar` source.
2//! Accepts log events streamed from [`Apache Pulsar`][pulsar].
3//!
4//! [pulsar]: https://pulsar.apache.org/
5use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10    Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12    consumer::Message,
13    message::proto::MessageIdData,
14};
15use vector_lib::{
16    EstimatedJsonEncodedSizeOf,
17    codecs::{
18        Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
19        decoding::{DeserializerConfig, FramingConfig},
20    },
21    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
22    configurable::configurable_component,
23    event::Event,
24    finalization::BatchStatus,
25    finalizer::OrderedFinalizer,
26    internal_event::{
27        ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
28        Registered,
29    },
30    sensitive_string::SensitiveString,
31    shutdown::ShutdownSignal,
32};
33use vrl::{owned_value_path, path, value::Kind};
34
35use crate::{
36    SourceSender,
37    config::{SourceConfig, SourceContext},
38    event::BatchNotifier,
39    internal_events::{
40        PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
41    },
42    serde::{bool_or_struct, default_decoding, default_framing_message_based},
43};
44
45/// Configuration for the `pulsar` source.
46#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
47#[derive(Clone, Debug, Derivative)]
48#[derivative(Default)]
49#[serde(deny_unknown_fields)]
50pub struct PulsarSourceConfig {
51    /// The endpoint to which the Pulsar client should connect to.
52    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
53    #[serde(alias = "address")]
54    endpoint: String,
55
56    /// The Pulsar topic names to read events from.
57    #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
58    topics: Vec<String>,
59
60    /// The Pulsar consumer name.
61    #[configurable(metadata(docs::examples = "consumer-name"))]
62    consumer_name: Option<String>,
63
64    /// The Pulsar subscription name.
65    #[configurable(metadata(docs::examples = "subscription_name"))]
66    subscription_name: Option<String>,
67
68    /// The consumer's priority level.
69    ///
70    /// The broker follows descending priorities. For example, 0=max-priority, 1, 2,...
71    ///
72    /// In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
73    priority_level: Option<i32>,
74
75    /// Max count of messages in a batch.
76    batch_size: Option<u32>,
77
78    #[configurable(derived)]
79    auth: Option<AuthConfig>,
80
81    #[configurable(derived)]
82    dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
83
84    #[configurable(derived)]
85    #[serde(default = "default_framing_message_based")]
86    #[derivative(Default(value = "default_framing_message_based()"))]
87    framing: FramingConfig,
88
89    #[configurable(derived)]
90    #[serde(default = "default_decoding")]
91    #[derivative(Default(value = "default_decoding()"))]
92    decoding: DeserializerConfig,
93
94    #[configurable(derived)]
95    #[serde(default, deserialize_with = "bool_or_struct")]
96    acknowledgements: SourceAcknowledgementsConfig,
97
98    /// The namespace to use for logs. This overrides the global setting.
99    #[configurable(metadata(docs::hidden))]
100    #[serde(default)]
101    log_namespace: Option<bool>,
102
103    #[configurable(derived)]
104    #[serde(default)]
105    tls: Option<TlsOptions>,
106}
107
108/// Authentication configuration.
109#[configurable_component]
110#[derive(Clone, Debug)]
111#[serde(deny_unknown_fields, untagged)]
112enum AuthConfig {
113    /// Basic authentication.
114    Basic {
115        /// Basic authentication name/username.
116        ///
117        /// This can be used either for basic authentication (username/password) or JWT authentication.
118        /// When used for JWT, the value should be `token`.
119        #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
120        #[configurable(metadata(docs::examples = "name123"))]
121        name: String,
122
123        /// Basic authentication password/token.
124        ///
125        /// This can be used either for basic authentication (username/password) or JWT authentication.
126        /// When used for JWT, the value should be the signed JWT, in the compact representation.
127        #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
128        #[configurable(metadata(docs::examples = "123456789"))]
129        token: SensitiveString,
130    },
131
132    /// OAuth authentication.
133    OAuth {
134        #[configurable(derived)]
135        oauth2: OAuth2Config,
136    },
137}
138
139/// OAuth2-specific authentication configuration.
140#[configurable_component]
141#[derive(Clone, Debug)]
142pub struct OAuth2Config {
143    /// The issuer URL.
144    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
145    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
146    issuer_url: String,
147
148    /// The credentials URL.
149    ///
150    /// A data URL is also supported.
151    #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
152    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
153    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
154    credentials_url: String,
155
156    /// The OAuth2 audience.
157    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
158    #[configurable(metadata(docs::examples = "pulsar"))]
159    audience: Option<String>,
160
161    /// The OAuth2 scope.
162    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
163    #[configurable(metadata(docs::examples = "admin"))]
164    scope: Option<String>,
165}
166
167/// Dead Letter Queue policy configuration.
168#[configurable_component]
169#[derive(Clone, Debug)]
170struct DeadLetterQueuePolicy {
171    /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
172    pub max_redeliver_count: usize,
173
174    /// Name of the dead letter topic where the failing messages will be sent.
175    pub dead_letter_topic: String,
176}
177
178#[configurable_component]
179#[configurable(description = "TLS options configuration for the Pulsar client.")]
180#[derive(Clone, Debug)]
181pub struct TlsOptions {
182    /// File path containing a list of PEM encoded certificates
183    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
184    pub ca_file: String,
185
186    /// Enables certificate verification.
187    ///
188    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
189    pub verify_certificate: Option<bool>,
190
191    /// Whether hostname verification is enabled when verify_certificate is false
192    ///
193    /// Set to true if not specified.
194    pub verify_hostname: Option<bool>,
195}
196
197#[derive(Debug)]
198struct FinalizerEntry {
199    topic: String,
200    message_id: MessageIdData,
201}
202
203impl_generate_config_from_default!(PulsarSourceConfig);
204
205#[async_trait::async_trait]
206#[typetag::serde(name = "pulsar")]
207impl SourceConfig for PulsarSourceConfig {
208    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
209        let log_namespace = cx.log_namespace(self.log_namespace);
210
211        let consumer = self.create_consumer().await?;
212        let decoder =
213            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
214                .build()?;
215        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
216
217        Ok(Box::pin(pulsar_source(
218            consumer,
219            decoder,
220            cx.shutdown,
221            cx.out,
222            acknowledgements,
223            log_namespace,
224        )))
225    }
226
227    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
228        let log_namespace = global_log_namespace.merge(self.log_namespace);
229
230        let schema_definition = self
231            .decoding
232            .schema_definition(log_namespace)
233            .with_standard_vector_source_metadata()
234            .with_source_metadata(
235                Self::NAME,
236                Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
237                &owned_value_path!("publish_time"),
238                Kind::timestamp(),
239                Some("publish_time"),
240            )
241            .with_source_metadata(
242                Self::NAME,
243                Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
244                &owned_value_path!("topic"),
245                Kind::bytes(),
246                Some("topic"),
247            )
248            .with_source_metadata(
249                Self::NAME,
250                Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
251                &owned_value_path!("producer_name"),
252                Kind::bytes(),
253                Some("producer_name"),
254            );
255        vec![SourceOutput::new_maybe_logs(
256            self.decoding.output_type(),
257            schema_definition,
258        )]
259    }
260
261    fn can_acknowledge(&self) -> bool {
262        true
263    }
264}
265
266impl PulsarSourceConfig {
267    async fn create_consumer(
268        &self,
269    ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
270        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
271
272        if let Some(auth) = &self.auth {
273            builder = match auth {
274                AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
275                    name: name.clone(),
276                    data: token.inner().as_bytes().to_vec(),
277                }),
278                AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
279                    OAuth2Authentication::client_credentials(OAuth2Params {
280                        issuer_url: oauth2.issuer_url.clone(),
281                        credentials_url: oauth2.credentials_url.clone(),
282                        audience: oauth2.audience.clone(),
283                        scope: oauth2.scope.clone(),
284                    }),
285                ),
286            };
287        }
288        if let Some(options) = &self.tls {
289            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
290            builder =
291                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
292            builder = builder
293                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
294        }
295
296        let pulsar = builder.build().await?;
297
298        let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
299            .consumer()
300            .with_topics(&self.topics)
301            .with_subscription_type(SubType::Shared)
302            .with_options(pulsar::consumer::ConsumerOptions {
303                priority_level: self.priority_level,
304                ..Default::default()
305            });
306
307        if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
308            consumer_builder =
309                consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
310                    max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
311                    dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
312                });
313        }
314
315        if let Some(batch_size) = self.batch_size {
316            consumer_builder = consumer_builder.with_batch_size(batch_size);
317        }
318        if let Some(consumer_name) = &self.consumer_name {
319            consumer_builder = consumer_builder.with_consumer_name(consumer_name);
320        }
321        if let Some(subscription_name) = &self.subscription_name {
322            consumer_builder = consumer_builder.with_subscription(subscription_name);
323        }
324
325        let consumer = consumer_builder.build::<String>().await?;
326
327        Ok(consumer)
328    }
329}
330
331async fn pulsar_source(
332    mut consumer: Consumer<String, TokioExecutor>,
333    decoder: Decoder,
334    mut shutdown: ShutdownSignal,
335    mut out: SourceSender,
336    acknowledgements: bool,
337    log_namespace: LogNamespace,
338) -> Result<(), ()> {
339    let (finalizer, mut ack_stream) =
340        OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
341
342    let bytes_received = register!(BytesReceived::from(Protocol::TCP));
343    let events_received = register!(EventsReceived);
344    let pulsar_error_events = register!(PulsarErrorEvent);
345
346    loop {
347        tokio::select! {
348            _ = &mut shutdown => break,
349            entry = ack_stream.next() => {
350                if let Some((status, entry)) = entry {
351                    handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
352                }
353            },
354            Some(maybe_message) = consumer.next() => {
355                match maybe_message {
356                    Ok(msg) => {
357                        bytes_received.emit(ByteSize(msg.payload.data.len()));
358                        parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
359                    }
360                    Err(error) => {
361                        pulsar_error_events.emit(PulsarErrorEventData{
362                            msg: error.to_string(),
363                            error_type:PulsarErrorEventType::Read,
364                        });
365                    }
366                }
367            },
368        }
369    }
370
371    Ok(())
372}
373
374#[allow(clippy::too_many_arguments)]
375async fn parse_message(
376    msg: Message<String>,
377    decoder: &Decoder,
378    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
379    out: &mut SourceSender,
380    consumer: &mut Consumer<String, TokioExecutor>,
381    log_namespace: LogNamespace,
382    events_received: &Registered<EventsReceived>,
383    pulsar_error_events: &Registered<PulsarErrorEvent>,
384) {
385    let publish_time = i64::try_from(msg.payload.metadata.publish_time)
386        .ok()
387        .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
388    let topic = msg.topic.clone();
389    let producer_name = msg.payload.metadata.producer_name.clone();
390
391    let mut stream = DecoderFramedRead::new(msg.payload.data.as_ref(), decoder.clone());
392    let stream = async_stream::stream! {
393        while let Some(next) = stream.next().await {
394            match next {
395                Ok((events, _byte_size)) => {
396                    events_received.emit(CountByteSize(
397                        events.len(),
398                        events.estimated_json_encoded_size_of(),
399                    ));
400
401                    let now = chrono::Utc::now();
402
403                    let events = events.into_iter().map(|mut event| {
404                        if let Event::Log(ref mut log) = event {
405                            log_namespace.insert_standard_vector_source_metadata(
406                                log,
407                                PulsarSourceConfig::NAME,
408                                now,
409                            );
410
411                            log_namespace.insert_source_metadata(
412                                PulsarSourceConfig::NAME,
413                                log,
414                                Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
415                                path!("publish_time"),
416                                publish_time,
417                            );
418
419                            log_namespace.insert_source_metadata(
420                                PulsarSourceConfig::NAME,
421                                log,
422                                Some(LegacyKey::InsertIfEmpty(path!("topic"))),
423                                path!("topic"),
424                                topic.clone(),
425                            );
426
427                            log_namespace.insert_source_metadata(
428                                PulsarSourceConfig::NAME,
429                                log,
430                                Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
431                                path!("producer_name"),
432                                producer_name.clone(),
433                            );
434                        }
435                        event
436                    });
437
438                    for event in events {
439                        yield event;
440                    }
441                }
442                Err(error) => {
443                    // Error is logged by `vector_lib::codecs`, no further
444                    // handling is needed here.
445                    if !error.can_continue() {
446                        break;
447                    }
448                }
449            }
450        }
451    }
452    .boxed();
453
454    finalize_event_stream(
455        consumer,
456        finalizer,
457        out,
458        stream,
459        msg.topic.clone(),
460        msg.message_id().clone(),
461        pulsar_error_events,
462    )
463    .await;
464}
465
466/// Send the event stream created by the framed read to the `out` stream.
467async fn finalize_event_stream(
468    consumer: &mut Consumer<String, TokioExecutor>,
469    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
470    out: &mut SourceSender,
471    mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
472    topic: String,
473    message_id: MessageIdData,
474    pulsar_error_events: &Registered<PulsarErrorEvent>,
475) {
476    match finalizer {
477        Some(finalizer) => {
478            let (batch, receiver) = BatchNotifier::new_with_receiver();
479            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
480
481            match out.send_event_stream(&mut stream).await {
482                Err(_error) => {
483                    emit!(StreamClosedError { count: 1 });
484                }
485                Ok(_) => {
486                    finalizer.add(FinalizerEntry { topic, message_id }, receiver);
487                }
488            }
489        }
490        None => match out.send_event_stream(&mut stream).await {
491            Err(_error) => {
492                emit!(StreamClosedError { count: 1 });
493            }
494            Ok(_) => {
495                if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
496                    pulsar_error_events.emit(PulsarErrorEventData {
497                        msg: error.to_string(),
498                        error_type: PulsarErrorEventType::Ack,
499                    });
500                }
501            }
502        },
503    }
504}
505
506async fn handle_ack(
507    consumer: &mut Consumer<String, TokioExecutor>,
508    status: BatchStatus,
509    entry: FinalizerEntry,
510    pulsar_error_events: &Registered<PulsarErrorEvent>,
511) {
512    match status {
513        BatchStatus::Delivered => {
514            if let Err(error) = consumer
515                .ack_with_id(entry.topic.as_str(), entry.message_id)
516                .await
517            {
518                pulsar_error_events.emit(PulsarErrorEventData {
519                    msg: error.to_string(),
520                    error_type: PulsarErrorEventType::Ack,
521                });
522            }
523        }
524        BatchStatus::Errored | BatchStatus::Rejected => {
525            if let Err(error) = consumer
526                .nack_with_id(entry.topic.as_str(), entry.message_id)
527                .await
528            {
529                pulsar_error_events.emit(PulsarErrorEventData {
530                    msg: error.to_string(),
531                    error_type: PulsarErrorEventType::NAck,
532                });
533            }
534        }
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use crate::sources::pulsar::PulsarSourceConfig;
541
542    #[test]
543    fn generate_config() {
544        crate::test_util::test_generate_config::<PulsarSourceConfig>();
545    }
546}
547
548#[cfg(feature = "pulsar-integration-tests")]
549#[cfg(test)]
550mod integration_tests {
551    use super::*;
552    use crate::{
553        config::log_schema,
554        test_util::{
555            collect_n,
556            components::{SOURCE_TAGS, assert_source_compliance},
557            random_string, trace_init,
558        },
559        tls::TEST_PEM_INTERMEDIATE_CA_PATH,
560    };
561
562    fn pulsar_host() -> String {
563        std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
564    }
565
566    fn pulsar_address(scheme: &str, port: u16) -> String {
567        format!("{}://{}:{}", scheme, pulsar_host(), port)
568    }
569    #[tokio::test]
570    async fn consumes_event_with_acknowledgements() {
571        pulsar_send_receive(
572            &pulsar_address("pulsar", 6650),
573            true,
574            LogNamespace::Legacy,
575            None,
576        )
577        .await;
578    }
579
580    #[tokio::test]
581    async fn consumes_event_with_acknowledgements_vector_namespace() {
582        pulsar_send_receive(
583            &pulsar_address("pulsar", 6650),
584            true,
585            LogNamespace::Vector,
586            None,
587        )
588        .await;
589    }
590
591    #[tokio::test]
592    async fn consumes_event_without_acknowledgements() {
593        pulsar_send_receive(
594            &pulsar_address("pulsar", 6650),
595            false,
596            LogNamespace::Legacy,
597            None,
598        )
599        .await;
600    }
601
602    #[tokio::test]
603    async fn consumes_event_without_acknowledgements_vector_namespace() {
604        pulsar_send_receive(
605            &pulsar_address("pulsar", 6650),
606            false,
607            LogNamespace::Vector,
608            None,
609        )
610        .await;
611    }
612
613    #[tokio::test]
614    async fn consumes_event_with_tls() {
615        pulsar_send_receive(
616            &pulsar_address("pulsar+ssl", 6651),
617            false,
618            LogNamespace::Vector,
619            Some(TlsOptions {
620                ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
621                verify_certificate: None,
622                verify_hostname: None,
623            }),
624        )
625        .await;
626    }
627
628    async fn pulsar_send_receive(
629        endpoint: &str,
630        acknowledgements: bool,
631        log_namespace: LogNamespace,
632        tls: Option<TlsOptions>,
633    ) {
634        trace_init();
635
636        let topic = format!("test-{}", random_string(10));
637        let cnf = PulsarSourceConfig {
638            endpoint: endpoint.into(),
639            topics: vec![topic.clone()],
640            consumer_name: None,
641            subscription_name: None,
642            priority_level: None,
643            batch_size: None,
644            auth: None,
645            dead_letter_queue_policy: None,
646            framing: FramingConfig::Bytes,
647            decoding: DeserializerConfig::Bytes,
648            acknowledgements: acknowledgements.into(),
649            log_namespace: None,
650            tls: tls.clone(),
651        };
652        let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
653        if let Some(options) = &tls {
654            builder = builder
655                .with_certificate_chain_file(Path::new(&options.ca_file))
656                .unwrap();
657            builder =
658                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
659            builder = builder
660                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
661        }
662
663        let pulsar = builder.build().await.unwrap();
664
665        let consumer = cnf.create_consumer().await.unwrap();
666        let decoder = DecodingConfig::new(
667            cnf.framing.clone(),
668            cnf.decoding.clone(),
669            LogNamespace::Legacy,
670        )
671        .build()
672        .unwrap();
673
674        let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
675
676        let msg = "test message";
677
678        let events = assert_source_compliance(&SOURCE_TAGS, async move {
679            let (tx, rx) = SourceSender::new_test();
680            tokio::spawn(pulsar_source(
681                consumer,
682                decoder,
683                ShutdownSignal::noop(),
684                tx,
685                acknowledgements,
686                log_namespace,
687            ));
688            producer.send_non_blocking(msg).await.unwrap();
689
690            collect_n(rx, 1).await
691        })
692        .await;
693
694        assert_eq!(
695            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
696            msg.into()
697        );
698    }
699}