vector/sources/
pulsar.rs

1//! `Pulsar` source.
2//! Accepts log events streamed from [`Apache Pulsar`][pulsar].
3//!
4//! [pulsar]: https://pulsar.apache.org/
5use std::path::Path;
6
7use chrono::TimeZone;
8use futures_util::StreamExt;
9use pulsar::{
10    Authentication, Consumer, Pulsar, SubType, TokioExecutor,
11    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
12    consumer::Message,
13    message::proto::MessageIdData,
14};
15use tokio_util::codec::FramedRead;
16use vector_lib::{
17    EstimatedJsonEncodedSizeOf,
18    codecs::{
19        Decoder, DecodingConfig, StreamDecodingError,
20        decoding::{DeserializerConfig, FramingConfig},
21    },
22    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
23    configurable::configurable_component,
24    event::Event,
25    finalization::BatchStatus,
26    finalizer::OrderedFinalizer,
27    internal_event::{
28        ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
29        Registered,
30    },
31    sensitive_string::SensitiveString,
32    shutdown::ShutdownSignal,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37    SourceSender,
38    config::{SourceConfig, SourceContext},
39    event::BatchNotifier,
40    internal_events::{
41        PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
42    },
43    serde::{bool_or_struct, default_decoding, default_framing_message_based},
44};
45
46/// Configuration for the `pulsar` source.
47#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
48#[derive(Clone, Debug, Derivative)]
49#[derivative(Default)]
50#[serde(deny_unknown_fields)]
51pub struct PulsarSourceConfig {
52    /// The endpoint to which the Pulsar client should connect to.
53    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
54    #[serde(alias = "address")]
55    endpoint: String,
56
57    /// The Pulsar topic names to read events from.
58    #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
59    topics: Vec<String>,
60
61    /// The Pulsar consumer name.
62    #[configurable(metadata(docs::examples = "consumer-name"))]
63    consumer_name: Option<String>,
64
65    /// The Pulsar subscription name.
66    #[configurable(metadata(docs::examples = "subscription_name"))]
67    subscription_name: Option<String>,
68
69    /// The consumer's priority level.
70    ///
71    /// The broker follows descending priorities. For example, 0=max-priority, 1, 2,...
72    ///
73    /// In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
74    priority_level: Option<i32>,
75
76    /// Max count of messages in a batch.
77    batch_size: Option<u32>,
78
79    #[configurable(derived)]
80    auth: Option<AuthConfig>,
81
82    #[configurable(derived)]
83    dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
84
85    #[configurable(derived)]
86    #[serde(default = "default_framing_message_based")]
87    #[derivative(Default(value = "default_framing_message_based()"))]
88    framing: FramingConfig,
89
90    #[configurable(derived)]
91    #[serde(default = "default_decoding")]
92    #[derivative(Default(value = "default_decoding()"))]
93    decoding: DeserializerConfig,
94
95    #[configurable(derived)]
96    #[serde(default, deserialize_with = "bool_or_struct")]
97    acknowledgements: SourceAcknowledgementsConfig,
98
99    /// The namespace to use for logs. This overrides the global setting.
100    #[configurable(metadata(docs::hidden))]
101    #[serde(default)]
102    log_namespace: Option<bool>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    tls: Option<TlsOptions>,
107}
108
109/// Authentication configuration.
110#[configurable_component]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields, untagged)]
113enum AuthConfig {
114    /// Basic authentication.
115    Basic {
116        /// Basic authentication name/username.
117        ///
118        /// This can be used either for basic authentication (username/password) or JWT authentication.
119        /// When used for JWT, the value should be `token`.
120        #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
121        #[configurable(metadata(docs::examples = "name123"))]
122        name: String,
123
124        /// Basic authentication password/token.
125        ///
126        /// This can be used either for basic authentication (username/password) or JWT authentication.
127        /// When used for JWT, the value should be the signed JWT, in the compact representation.
128        #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
129        #[configurable(metadata(docs::examples = "123456789"))]
130        token: SensitiveString,
131    },
132
133    /// OAuth authentication.
134    OAuth {
135        #[configurable(derived)]
136        oauth2: OAuth2Config,
137    },
138}
139
140/// OAuth2-specific authentication configuration.
141#[configurable_component]
142#[derive(Clone, Debug)]
143pub struct OAuth2Config {
144    /// The issuer URL.
145    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
146    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
147    issuer_url: String,
148
149    /// The credentials URL.
150    ///
151    /// A data URL is also supported.
152    #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
153    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
154    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
155    credentials_url: String,
156
157    /// The OAuth2 audience.
158    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
159    #[configurable(metadata(docs::examples = "pulsar"))]
160    audience: Option<String>,
161
162    /// The OAuth2 scope.
163    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
164    #[configurable(metadata(docs::examples = "admin"))]
165    scope: Option<String>,
166}
167
168/// Dead Letter Queue policy configuration.
169#[configurable_component]
170#[derive(Clone, Debug)]
171struct DeadLetterQueuePolicy {
172    /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
173    pub max_redeliver_count: usize,
174
175    /// Name of the dead letter topic where the failing messages will be sent.
176    pub dead_letter_topic: String,
177}
178
179#[configurable_component]
180#[configurable(description = "TLS options configuration for the Pulsar client.")]
181#[derive(Clone, Debug)]
182pub struct TlsOptions {
183    /// File path containing a list of PEM encoded certificates
184    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
185    pub ca_file: String,
186
187    /// Enables certificate verification.
188    ///
189    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
190    pub verify_certificate: Option<bool>,
191
192    /// Whether hostname verification is enabled when verify_certificate is false
193    ///
194    /// Set to true if not specified.
195    pub verify_hostname: Option<bool>,
196}
197
198#[derive(Debug)]
199struct FinalizerEntry {
200    topic: String,
201    message_id: MessageIdData,
202}
203
204impl_generate_config_from_default!(PulsarSourceConfig);
205
206#[async_trait::async_trait]
207#[typetag::serde(name = "pulsar")]
208impl SourceConfig for PulsarSourceConfig {
209    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
210        let log_namespace = cx.log_namespace(self.log_namespace);
211
212        let consumer = self.create_consumer().await?;
213        let decoder =
214            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
215                .build()?;
216        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
217
218        Ok(Box::pin(pulsar_source(
219            consumer,
220            decoder,
221            cx.shutdown,
222            cx.out,
223            acknowledgements,
224            log_namespace,
225        )))
226    }
227
228    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
229        let log_namespace = global_log_namespace.merge(self.log_namespace);
230
231        let schema_definition = self
232            .decoding
233            .schema_definition(log_namespace)
234            .with_standard_vector_source_metadata()
235            .with_source_metadata(
236                Self::NAME,
237                Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
238                &owned_value_path!("publish_time"),
239                Kind::timestamp(),
240                Some("publish_time"),
241            )
242            .with_source_metadata(
243                Self::NAME,
244                Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
245                &owned_value_path!("topic"),
246                Kind::bytes(),
247                Some("topic"),
248            )
249            .with_source_metadata(
250                Self::NAME,
251                Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
252                &owned_value_path!("producer_name"),
253                Kind::bytes(),
254                Some("producer_name"),
255            );
256        vec![SourceOutput::new_maybe_logs(
257            self.decoding.output_type(),
258            schema_definition,
259        )]
260    }
261
262    fn can_acknowledge(&self) -> bool {
263        true
264    }
265}
266
267impl PulsarSourceConfig {
268    async fn create_consumer(
269        &self,
270    ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
271        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
272
273        if let Some(auth) = &self.auth {
274            builder = match auth {
275                AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
276                    name: name.clone(),
277                    data: token.inner().as_bytes().to_vec(),
278                }),
279                AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
280                    OAuth2Authentication::client_credentials(OAuth2Params {
281                        issuer_url: oauth2.issuer_url.clone(),
282                        credentials_url: oauth2.credentials_url.clone(),
283                        audience: oauth2.audience.clone(),
284                        scope: oauth2.scope.clone(),
285                    }),
286                ),
287            };
288        }
289        if let Some(options) = &self.tls {
290            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
291            builder =
292                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
293            builder = builder
294                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
295        }
296
297        let pulsar = builder.build().await?;
298
299        let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
300            .consumer()
301            .with_topics(&self.topics)
302            .with_subscription_type(SubType::Shared)
303            .with_options(pulsar::consumer::ConsumerOptions {
304                priority_level: self.priority_level,
305                ..Default::default()
306            });
307
308        if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
309            consumer_builder =
310                consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
311                    max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
312                    dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
313                });
314        }
315
316        if let Some(batch_size) = self.batch_size {
317            consumer_builder = consumer_builder.with_batch_size(batch_size);
318        }
319        if let Some(consumer_name) = &self.consumer_name {
320            consumer_builder = consumer_builder.with_consumer_name(consumer_name);
321        }
322        if let Some(subscription_name) = &self.subscription_name {
323            consumer_builder = consumer_builder.with_subscription(subscription_name);
324        }
325
326        let consumer = consumer_builder.build::<String>().await?;
327
328        Ok(consumer)
329    }
330}
331
332async fn pulsar_source(
333    mut consumer: Consumer<String, TokioExecutor>,
334    decoder: Decoder,
335    mut shutdown: ShutdownSignal,
336    mut out: SourceSender,
337    acknowledgements: bool,
338    log_namespace: LogNamespace,
339) -> Result<(), ()> {
340    let (finalizer, mut ack_stream) =
341        OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
342
343    let bytes_received = register!(BytesReceived::from(Protocol::TCP));
344    let events_received = register!(EventsReceived);
345    let pulsar_error_events = register!(PulsarErrorEvent);
346
347    loop {
348        tokio::select! {
349            _ = &mut shutdown => break,
350            entry = ack_stream.next() => {
351                if let Some((status, entry)) = entry {
352                    handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
353                }
354            },
355            Some(maybe_message) = consumer.next() => {
356                match maybe_message {
357                    Ok(msg) => {
358                        bytes_received.emit(ByteSize(msg.payload.data.len()));
359                        parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
360                    }
361                    Err(error) => {
362                        pulsar_error_events.emit(PulsarErrorEventData{
363                            msg: error.to_string(),
364                            error_type:PulsarErrorEventType::Read,
365                        });
366                    }
367                }
368            },
369        }
370    }
371
372    Ok(())
373}
374
375#[allow(clippy::too_many_arguments)]
376async fn parse_message(
377    msg: Message<String>,
378    decoder: &Decoder,
379    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
380    out: &mut SourceSender,
381    consumer: &mut Consumer<String, TokioExecutor>,
382    log_namespace: LogNamespace,
383    events_received: &Registered<EventsReceived>,
384    pulsar_error_events: &Registered<PulsarErrorEvent>,
385) {
386    let publish_time = i64::try_from(msg.payload.metadata.publish_time)
387        .ok()
388        .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
389    let topic = msg.topic.clone();
390    let producer_name = msg.payload.metadata.producer_name.clone();
391
392    let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
393    let stream = async_stream::stream! {
394        while let Some(next) = stream.next().await {
395            match next {
396                Ok((events, _byte_size)) => {
397                    events_received.emit(CountByteSize(
398                        events.len(),
399                        events.estimated_json_encoded_size_of(),
400                    ));
401
402                    let now = chrono::Utc::now();
403
404                    let events = events.into_iter().map(|mut event| {
405                        if let Event::Log(ref mut log) = event {
406                            log_namespace.insert_standard_vector_source_metadata(
407                                log,
408                                PulsarSourceConfig::NAME,
409                                now,
410                            );
411
412                            log_namespace.insert_source_metadata(
413                                PulsarSourceConfig::NAME,
414                                log,
415                                Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
416                                path!("publish_time"),
417                                publish_time,
418                            );
419
420                            log_namespace.insert_source_metadata(
421                                PulsarSourceConfig::NAME,
422                                log,
423                                Some(LegacyKey::InsertIfEmpty(path!("topic"))),
424                                path!("topic"),
425                                topic.clone(),
426                            );
427
428                            log_namespace.insert_source_metadata(
429                                PulsarSourceConfig::NAME,
430                                log,
431                                Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
432                                path!("producer_name"),
433                                producer_name.clone(),
434                            );
435                        }
436                        event
437                    });
438
439                    for event in events {
440                        yield event;
441                    }
442                }
443                Err(error) => {
444                    // Error is logged by `vector_lib::codecs`, no further
445                    // handling is needed here.
446                    if !error.can_continue() {
447                        break;
448                    }
449                }
450            }
451        }
452    }
453    .boxed();
454
455    finalize_event_stream(
456        consumer,
457        finalizer,
458        out,
459        stream,
460        msg.topic.clone(),
461        msg.message_id().clone(),
462        pulsar_error_events,
463    )
464    .await;
465}
466
467/// Send the event stream created by the framed read to the `out` stream.
468async fn finalize_event_stream(
469    consumer: &mut Consumer<String, TokioExecutor>,
470    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
471    out: &mut SourceSender,
472    mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
473    topic: String,
474    message_id: MessageIdData,
475    pulsar_error_events: &Registered<PulsarErrorEvent>,
476) {
477    match finalizer {
478        Some(finalizer) => {
479            let (batch, receiver) = BatchNotifier::new_with_receiver();
480            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
481
482            match out.send_event_stream(&mut stream).await {
483                Err(_error) => {
484                    emit!(StreamClosedError { count: 1 });
485                }
486                Ok(_) => {
487                    finalizer.add(FinalizerEntry { topic, message_id }, receiver);
488                }
489            }
490        }
491        None => match out.send_event_stream(&mut stream).await {
492            Err(_error) => {
493                emit!(StreamClosedError { count: 1 });
494            }
495            Ok(_) => {
496                if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
497                    pulsar_error_events.emit(PulsarErrorEventData {
498                        msg: error.to_string(),
499                        error_type: PulsarErrorEventType::Ack,
500                    });
501                }
502            }
503        },
504    }
505}
506
507async fn handle_ack(
508    consumer: &mut Consumer<String, TokioExecutor>,
509    status: BatchStatus,
510    entry: FinalizerEntry,
511    pulsar_error_events: &Registered<PulsarErrorEvent>,
512) {
513    match status {
514        BatchStatus::Delivered => {
515            if let Err(error) = consumer
516                .ack_with_id(entry.topic.as_str(), entry.message_id)
517                .await
518            {
519                pulsar_error_events.emit(PulsarErrorEventData {
520                    msg: error.to_string(),
521                    error_type: PulsarErrorEventType::Ack,
522                });
523            }
524        }
525        BatchStatus::Errored | BatchStatus::Rejected => {
526            if let Err(error) = consumer
527                .nack_with_id(entry.topic.as_str(), entry.message_id)
528                .await
529            {
530                pulsar_error_events.emit(PulsarErrorEventData {
531                    msg: error.to_string(),
532                    error_type: PulsarErrorEventType::NAck,
533                });
534            }
535        }
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use crate::sources::pulsar::PulsarSourceConfig;
542
543    #[test]
544    fn generate_config() {
545        crate::test_util::test_generate_config::<PulsarSourceConfig>();
546    }
547}
548
549#[cfg(feature = "pulsar-integration-tests")]
550#[cfg(test)]
551mod integration_tests {
552    use super::*;
553    use crate::{
554        config::log_schema,
555        test_util::{
556            collect_n,
557            components::{SOURCE_TAGS, assert_source_compliance},
558            random_string, trace_init,
559        },
560        tls::TEST_PEM_INTERMEDIATE_CA_PATH,
561    };
562
563    fn pulsar_host() -> String {
564        std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
565    }
566
567    fn pulsar_address(scheme: &str, port: u16) -> String {
568        format!("{}://{}:{}", scheme, pulsar_host(), port)
569    }
570    #[tokio::test]
571    async fn consumes_event_with_acknowledgements() {
572        pulsar_send_receive(
573            &pulsar_address("pulsar", 6650),
574            true,
575            LogNamespace::Legacy,
576            None,
577        )
578        .await;
579    }
580
581    #[tokio::test]
582    async fn consumes_event_with_acknowledgements_vector_namespace() {
583        pulsar_send_receive(
584            &pulsar_address("pulsar", 6650),
585            true,
586            LogNamespace::Vector,
587            None,
588        )
589        .await;
590    }
591
592    #[tokio::test]
593    async fn consumes_event_without_acknowledgements() {
594        pulsar_send_receive(
595            &pulsar_address("pulsar", 6650),
596            false,
597            LogNamespace::Legacy,
598            None,
599        )
600        .await;
601    }
602
603    #[tokio::test]
604    async fn consumes_event_without_acknowledgements_vector_namespace() {
605        pulsar_send_receive(
606            &pulsar_address("pulsar", 6650),
607            false,
608            LogNamespace::Vector,
609            None,
610        )
611        .await;
612    }
613
614    #[tokio::test]
615    async fn consumes_event_with_tls() {
616        pulsar_send_receive(
617            &pulsar_address("pulsar+ssl", 6651),
618            false,
619            LogNamespace::Vector,
620            Some(TlsOptions {
621                ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
622                verify_certificate: None,
623                verify_hostname: None,
624            }),
625        )
626        .await;
627    }
628
629    async fn pulsar_send_receive(
630        endpoint: &str,
631        acknowledgements: bool,
632        log_namespace: LogNamespace,
633        tls: Option<TlsOptions>,
634    ) {
635        trace_init();
636
637        let topic = format!("test-{}", random_string(10));
638        let cnf = PulsarSourceConfig {
639            endpoint: endpoint.into(),
640            topics: vec![topic.clone()],
641            consumer_name: None,
642            subscription_name: None,
643            priority_level: None,
644            batch_size: None,
645            auth: None,
646            dead_letter_queue_policy: None,
647            framing: FramingConfig::Bytes,
648            decoding: DeserializerConfig::Bytes,
649            acknowledgements: acknowledgements.into(),
650            log_namespace: None,
651            tls: tls.clone(),
652        };
653        let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
654        if let Some(options) = &tls {
655            builder = builder
656                .with_certificate_chain_file(Path::new(&options.ca_file))
657                .unwrap();
658            builder =
659                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
660            builder = builder
661                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
662        }
663
664        let pulsar = builder.build().await.unwrap();
665
666        let consumer = cnf.create_consumer().await.unwrap();
667        let decoder = DecodingConfig::new(
668            cnf.framing.clone(),
669            cnf.decoding.clone(),
670            LogNamespace::Legacy,
671        )
672        .build()
673        .unwrap();
674
675        let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
676
677        let msg = "test message";
678
679        let events = assert_source_compliance(&SOURCE_TAGS, async move {
680            let (tx, rx) = SourceSender::new_test();
681            tokio::spawn(pulsar_source(
682                consumer,
683                decoder,
684                ShutdownSignal::noop(),
685                tx,
686                acknowledgements,
687                log_namespace,
688            ));
689            producer.send_non_blocking(msg).await.unwrap();
690
691            collect_n(rx, 1).await
692        })
693        .await;
694
695        assert_eq!(
696            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
697            msg.into()
698        );
699    }
700}