vector/sources/
pulsar.rs

1//! `Pulsar` source.
2//! Accepts log events streamed from [`Apache Pulsar`][pulsar].
3//!
4//! [pulsar]: https://pulsar.apache.org/
5use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10    Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12    consumer::Message,
13    message::proto::MessageIdData,
14};
15use tokio_util::codec::FramedRead;
16use vector_lib::{
17    EstimatedJsonEncodedSizeOf,
18    codecs::{
19        StreamDecodingError,
20        decoding::{DeserializerConfig, FramingConfig},
21    },
22    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
23    configurable::configurable_component,
24    event::Event,
25    finalization::BatchStatus,
26    finalizer::OrderedFinalizer,
27    internal_event::{
28        ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
29        Registered,
30    },
31    sensitive_string::SensitiveString,
32    shutdown::ShutdownSignal,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37    SourceSender,
38    codecs::{Decoder, DecodingConfig},
39    config::{SourceConfig, SourceContext},
40    event::BatchNotifier,
41    internal_events::{
42        PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
43    },
44    serde::{bool_or_struct, default_decoding, default_framing_message_based},
45};
46
47/// Configuration for the `pulsar` source.
48#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
49#[derive(Clone, Debug, Derivative)]
50#[derivative(Default)]
51#[serde(deny_unknown_fields)]
52pub struct PulsarSourceConfig {
53    /// The endpoint to which the Pulsar client should connect to.
54    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
55    #[serde(alias = "address")]
56    endpoint: String,
57
58    /// The Pulsar topic names to read events from.
59    #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
60    topics: Vec<String>,
61
62    /// The Pulsar consumer name.
63    #[configurable(metadata(docs::examples = "consumer-name"))]
64    consumer_name: Option<String>,
65
66    /// The Pulsar subscription name.
67    #[configurable(metadata(docs::examples = "subscription_name"))]
68    subscription_name: Option<String>,
69
70    /// The consumer's priority level.
71    ///
72    /// The broker follows descending priorities. For example, 0=max-priority, 1, 2,...
73    ///
74    /// In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
75    priority_level: Option<i32>,
76
77    /// Max count of messages in a batch.
78    batch_size: Option<u32>,
79
80    #[configurable(derived)]
81    auth: Option<AuthConfig>,
82
83    #[configurable(derived)]
84    dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
85
86    #[configurable(derived)]
87    #[serde(default = "default_framing_message_based")]
88    #[derivative(Default(value = "default_framing_message_based()"))]
89    framing: FramingConfig,
90
91    #[configurable(derived)]
92    #[serde(default = "default_decoding")]
93    #[derivative(Default(value = "default_decoding()"))]
94    decoding: DeserializerConfig,
95
96    #[configurable(derived)]
97    #[serde(default, deserialize_with = "bool_or_struct")]
98    acknowledgements: SourceAcknowledgementsConfig,
99
100    /// The namespace to use for logs. This overrides the global setting.
101    #[configurable(metadata(docs::hidden))]
102    #[serde(default)]
103    log_namespace: Option<bool>,
104
105    #[configurable(derived)]
106    #[serde(default)]
107    tls: Option<TlsOptions>,
108}
109
110/// Authentication configuration.
111#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields, untagged)]
114enum AuthConfig {
115    /// Basic authentication.
116    Basic {
117        /// Basic authentication name/username.
118        ///
119        /// This can be used either for basic authentication (username/password) or JWT authentication.
120        /// When used for JWT, the value should be `token`.
121        #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
122        #[configurable(metadata(docs::examples = "name123"))]
123        name: String,
124
125        /// Basic authentication password/token.
126        ///
127        /// This can be used either for basic authentication (username/password) or JWT authentication.
128        /// When used for JWT, the value should be the signed JWT, in the compact representation.
129        #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
130        #[configurable(metadata(docs::examples = "123456789"))]
131        token: SensitiveString,
132    },
133
134    /// OAuth authentication.
135    OAuth {
136        #[configurable(derived)]
137        oauth2: OAuth2Config,
138    },
139}
140
141/// OAuth2-specific authentication configuration.
142#[configurable_component]
143#[derive(Clone, Debug)]
144pub struct OAuth2Config {
145    /// The issuer URL.
146    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
147    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
148    issuer_url: String,
149
150    /// The credentials URL.
151    ///
152    /// A data URL is also supported.
153    #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
154    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
155    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
156    credentials_url: String,
157
158    /// The OAuth2 audience.
159    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
160    #[configurable(metadata(docs::examples = "pulsar"))]
161    audience: Option<String>,
162
163    /// The OAuth2 scope.
164    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
165    #[configurable(metadata(docs::examples = "admin"))]
166    scope: Option<String>,
167}
168
169/// Dead Letter Queue policy configuration.
170#[configurable_component]
171#[derive(Clone, Debug)]
172struct DeadLetterQueuePolicy {
173    /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
174    pub max_redeliver_count: usize,
175
176    /// Name of the dead letter topic where the failing messages will be sent.
177    pub dead_letter_topic: String,
178}
179
180#[configurable_component]
181#[configurable(description = "TLS options configuration for the Pulsar client.")]
182#[derive(Clone, Debug)]
183pub struct TlsOptions {
184    /// File path containing a list of PEM encoded certificates
185    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
186    pub ca_file: String,
187
188    /// Enables certificate verification.
189    ///
190    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
191    pub verify_certificate: Option<bool>,
192
193    /// Whether hostname verification is enabled when verify_certificate is false
194    ///
195    /// Set to true if not specified.
196    pub verify_hostname: Option<bool>,
197}
198
199#[derive(Debug)]
200struct FinalizerEntry {
201    topic: String,
202    message_id: MessageIdData,
203}
204
205impl_generate_config_from_default!(PulsarSourceConfig);
206
207#[async_trait::async_trait]
208#[typetag::serde(name = "pulsar")]
209impl SourceConfig for PulsarSourceConfig {
210    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
211        let log_namespace = cx.log_namespace(self.log_namespace);
212
213        let consumer = self.create_consumer().await?;
214        let decoder =
215            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
216                .build()?;
217        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
218
219        Ok(Box::pin(pulsar_source(
220            consumer,
221            decoder,
222            cx.shutdown,
223            cx.out,
224            acknowledgements,
225            log_namespace,
226        )))
227    }
228
229    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
230        let log_namespace = global_log_namespace.merge(self.log_namespace);
231
232        let schema_definition = self
233            .decoding
234            .schema_definition(log_namespace)
235            .with_standard_vector_source_metadata()
236            .with_source_metadata(
237                Self::NAME,
238                Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
239                &owned_value_path!("publish_time"),
240                Kind::timestamp(),
241                Some("publish_time"),
242            )
243            .with_source_metadata(
244                Self::NAME,
245                Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
246                &owned_value_path!("topic"),
247                Kind::bytes(),
248                Some("topic"),
249            )
250            .with_source_metadata(
251                Self::NAME,
252                Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
253                &owned_value_path!("producer_name"),
254                Kind::bytes(),
255                Some("producer_name"),
256            );
257        vec![SourceOutput::new_maybe_logs(
258            self.decoding.output_type(),
259            schema_definition,
260        )]
261    }
262
263    fn can_acknowledge(&self) -> bool {
264        true
265    }
266}
267
268impl PulsarSourceConfig {
269    async fn create_consumer(
270        &self,
271    ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
272        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
273
274        if let Some(auth) = &self.auth {
275            builder = match auth {
276                AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
277                    name: name.clone(),
278                    data: token.inner().as_bytes().to_vec(),
279                }),
280                AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
281                    OAuth2Authentication::client_credentials(OAuth2Params {
282                        issuer_url: oauth2.issuer_url.clone(),
283                        credentials_url: oauth2.credentials_url.clone(),
284                        audience: oauth2.audience.clone(),
285                        scope: oauth2.scope.clone(),
286                    }),
287                ),
288            };
289        }
290        if let Some(options) = &self.tls {
291            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
292            builder =
293                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
294            builder = builder
295                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
296        }
297
298        let pulsar = builder.build().await?;
299
300        let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
301            .consumer()
302            .with_topics(&self.topics)
303            .with_subscription_type(SubType::Shared)
304            .with_options(pulsar::consumer::ConsumerOptions {
305                priority_level: self.priority_level,
306                ..Default::default()
307            });
308
309        if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
310            consumer_builder =
311                consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
312                    max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
313                    dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
314                });
315        }
316
317        if let Some(batch_size) = self.batch_size {
318            consumer_builder = consumer_builder.with_batch_size(batch_size);
319        }
320        if let Some(consumer_name) = &self.consumer_name {
321            consumer_builder = consumer_builder.with_consumer_name(consumer_name);
322        }
323        if let Some(subscription_name) = &self.subscription_name {
324            consumer_builder = consumer_builder.with_subscription(subscription_name);
325        }
326
327        let consumer = consumer_builder.build::<String>().await?;
328
329        Ok(consumer)
330    }
331}
332
333async fn pulsar_source(
334    mut consumer: Consumer<String, TokioExecutor>,
335    decoder: Decoder,
336    mut shutdown: ShutdownSignal,
337    mut out: SourceSender,
338    acknowledgements: bool,
339    log_namespace: LogNamespace,
340) -> Result<(), ()> {
341    let (finalizer, mut ack_stream) =
342        OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
343
344    let bytes_received = register!(BytesReceived::from(Protocol::TCP));
345    let events_received = register!(EventsReceived);
346    let pulsar_error_events = register!(PulsarErrorEvent);
347
348    loop {
349        tokio::select! {
350            _ = &mut shutdown => break,
351            entry = ack_stream.next() => {
352                if let Some((status, entry)) = entry {
353                    handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
354                }
355            },
356            Some(maybe_message) = consumer.next() => {
357                match maybe_message {
358                    Ok(msg) => {
359                        bytes_received.emit(ByteSize(msg.payload.data.len()));
360                        parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
361                    }
362                    Err(error) => {
363                        pulsar_error_events.emit(PulsarErrorEventData{
364                            msg: error.to_string(),
365                            error_type:PulsarErrorEventType::Read,
366                        });
367                    }
368                }
369            },
370        }
371    }
372
373    Ok(())
374}
375
376#[allow(clippy::too_many_arguments)]
377async fn parse_message(
378    msg: Message<String>,
379    decoder: &Decoder,
380    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
381    out: &mut SourceSender,
382    consumer: &mut Consumer<String, TokioExecutor>,
383    log_namespace: LogNamespace,
384    events_received: &Registered<EventsReceived>,
385    pulsar_error_events: &Registered<PulsarErrorEvent>,
386) {
387    let publish_time = i64::try_from(msg.payload.metadata.publish_time)
388        .ok()
389        .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
390    let topic = msg.topic.clone();
391    let producer_name = msg.payload.metadata.producer_name.clone();
392
393    let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
394    let stream = async_stream::stream! {
395        while let Some(next) = stream.next().await {
396            match next {
397                Ok((events, _byte_size)) => {
398                    events_received.emit(CountByteSize(
399                        events.len(),
400                        events.estimated_json_encoded_size_of(),
401                    ));
402
403                    let now = chrono::Utc::now();
404
405                    let events = events.into_iter().map(|mut event| {
406                        if let Event::Log(ref mut log) = event {
407                            log_namespace.insert_standard_vector_source_metadata(
408                                log,
409                                PulsarSourceConfig::NAME,
410                                now,
411                            );
412
413                            log_namespace.insert_source_metadata(
414                                PulsarSourceConfig::NAME,
415                                log,
416                                Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
417                                path!("publish_time"),
418                                publish_time,
419                            );
420
421                            log_namespace.insert_source_metadata(
422                                PulsarSourceConfig::NAME,
423                                log,
424                                Some(LegacyKey::InsertIfEmpty(path!("topic"))),
425                                path!("topic"),
426                                topic.clone(),
427                            );
428
429                            log_namespace.insert_source_metadata(
430                                PulsarSourceConfig::NAME,
431                                log,
432                                Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
433                                path!("producer_name"),
434                                producer_name.clone(),
435                            );
436                        }
437                        event
438                    });
439
440                    for event in events {
441                        yield event;
442                    }
443                }
444                Err(error) => {
445                    // Error is logged by `crate::codecs`, no further
446                    // handling is needed here.
447                    if !error.can_continue() {
448                        break;
449                    }
450                }
451            }
452        }
453    }
454    .boxed();
455
456    finalize_event_stream(
457        consumer,
458        finalizer,
459        out,
460        stream,
461        msg.topic.clone(),
462        msg.message_id().clone(),
463        pulsar_error_events,
464    )
465    .await;
466}
467
468/// Send the event stream created by the framed read to the `out` stream.
469async fn finalize_event_stream(
470    consumer: &mut Consumer<String, TokioExecutor>,
471    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
472    out: &mut SourceSender,
473    mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
474    topic: String,
475    message_id: MessageIdData,
476    pulsar_error_events: &Registered<PulsarErrorEvent>,
477) {
478    match finalizer {
479        Some(finalizer) => {
480            let (batch, receiver) = BatchNotifier::new_with_receiver();
481            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
482
483            match out.send_event_stream(&mut stream).await {
484                Err(_error) => {
485                    emit!(StreamClosedError { count: 1 });
486                }
487                Ok(_) => {
488                    finalizer.add(FinalizerEntry { topic, message_id }, receiver);
489                }
490            }
491        }
492        None => match out.send_event_stream(&mut stream).await {
493            Err(_error) => {
494                emit!(StreamClosedError { count: 1 });
495            }
496            Ok(_) => {
497                if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
498                    pulsar_error_events.emit(PulsarErrorEventData {
499                        msg: error.to_string(),
500                        error_type: PulsarErrorEventType::Ack,
501                    });
502                }
503            }
504        },
505    }
506}
507
508async fn handle_ack(
509    consumer: &mut Consumer<String, TokioExecutor>,
510    status: BatchStatus,
511    entry: FinalizerEntry,
512    pulsar_error_events: &Registered<PulsarErrorEvent>,
513) {
514    match status {
515        BatchStatus::Delivered => {
516            if let Err(error) = consumer
517                .ack_with_id(entry.topic.as_str(), entry.message_id)
518                .await
519            {
520                pulsar_error_events.emit(PulsarErrorEventData {
521                    msg: error.to_string(),
522                    error_type: PulsarErrorEventType::Ack,
523                });
524            }
525        }
526        BatchStatus::Errored | BatchStatus::Rejected => {
527            if let Err(error) = consumer
528                .nack_with_id(entry.topic.as_str(), entry.message_id)
529                .await
530            {
531                pulsar_error_events.emit(PulsarErrorEventData {
532                    msg: error.to_string(),
533                    error_type: PulsarErrorEventType::NAck,
534                });
535            }
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use crate::sources::pulsar::PulsarSourceConfig;
543
544    #[test]
545    fn generate_config() {
546        crate::test_util::test_generate_config::<PulsarSourceConfig>();
547    }
548}
549
550#[cfg(feature = "pulsar-integration-tests")]
551#[cfg(test)]
552mod integration_tests {
553    use super::*;
554    use crate::{
555        config::log_schema,
556        test_util::{
557            collect_n,
558            components::{SOURCE_TAGS, assert_source_compliance},
559            random_string, trace_init,
560        },
561        tls::TEST_PEM_INTERMEDIATE_CA_PATH,
562    };
563
564    fn pulsar_host() -> String {
565        std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
566    }
567
568    fn pulsar_address(scheme: &str, port: u16) -> String {
569        format!("{}://{}:{}", scheme, pulsar_host(), port)
570    }
571    #[tokio::test]
572    async fn consumes_event_with_acknowledgements() {
573        pulsar_send_receive(
574            &pulsar_address("pulsar", 6650),
575            true,
576            LogNamespace::Legacy,
577            None,
578        )
579        .await;
580    }
581
582    #[tokio::test]
583    async fn consumes_event_with_acknowledgements_vector_namespace() {
584        pulsar_send_receive(
585            &pulsar_address("pulsar", 6650),
586            true,
587            LogNamespace::Vector,
588            None,
589        )
590        .await;
591    }
592
593    #[tokio::test]
594    async fn consumes_event_without_acknowledgements() {
595        pulsar_send_receive(
596            &pulsar_address("pulsar", 6650),
597            false,
598            LogNamespace::Legacy,
599            None,
600        )
601        .await;
602    }
603
604    #[tokio::test]
605    async fn consumes_event_without_acknowledgements_vector_namespace() {
606        pulsar_send_receive(
607            &pulsar_address("pulsar", 6650),
608            false,
609            LogNamespace::Vector,
610            None,
611        )
612        .await;
613    }
614
615    #[tokio::test]
616    async fn consumes_event_with_tls() {
617        pulsar_send_receive(
618            &pulsar_address("pulsar+ssl", 6651),
619            false,
620            LogNamespace::Vector,
621            Some(TlsOptions {
622                ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
623                verify_certificate: None,
624                verify_hostname: None,
625            }),
626        )
627        .await;
628    }
629
630    async fn pulsar_send_receive(
631        endpoint: &str,
632        acknowledgements: bool,
633        log_namespace: LogNamespace,
634        tls: Option<TlsOptions>,
635    ) {
636        trace_init();
637
638        let topic = format!("test-{}", random_string(10));
639        let cnf = PulsarSourceConfig {
640            endpoint: endpoint.into(),
641            topics: vec![topic.clone()],
642            consumer_name: None,
643            subscription_name: None,
644            priority_level: None,
645            batch_size: None,
646            auth: None,
647            dead_letter_queue_policy: None,
648            framing: FramingConfig::Bytes,
649            decoding: DeserializerConfig::Bytes,
650            acknowledgements: acknowledgements.into(),
651            log_namespace: None,
652            tls: tls.clone(),
653        };
654        let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
655        if let Some(options) = &tls {
656            builder = builder
657                .with_certificate_chain_file(Path::new(&options.ca_file))
658                .unwrap();
659            builder =
660                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
661            builder = builder
662                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
663        }
664
665        let pulsar = builder.build().await.unwrap();
666
667        let consumer = cnf.create_consumer().await.unwrap();
668        let decoder = DecodingConfig::new(
669            cnf.framing.clone(),
670            cnf.decoding.clone(),
671            LogNamespace::Legacy,
672        )
673        .build()
674        .unwrap();
675
676        let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
677
678        let msg = "test message";
679
680        let events = assert_source_compliance(&SOURCE_TAGS, async move {
681            let (tx, rx) = SourceSender::new_test();
682            tokio::spawn(pulsar_source(
683                consumer,
684                decoder,
685                ShutdownSignal::noop(),
686                tx,
687                acknowledgements,
688                log_namespace,
689            ));
690            producer.send_non_blocking(msg).await.unwrap();
691
692            collect_n(rx, 1).await
693        })
694        .await;
695
696        assert_eq!(
697            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
698            msg.into()
699        );
700    }
701}