vector/sources/
pulsar.rs

1//! `Pulsar` source.
2//! Accepts log events streamed from [`Apache Pulsar`][pulsar].
3//!
4//! [pulsar]: https://pulsar.apache.org/
5use chrono::TimeZone;
6use futures_util::StreamExt;
7use pulsar::{
8    authentication::oauth2::{OAuth2Authentication, OAuth2Params},
9    consumer::Message,
10    message::proto::MessageIdData,
11    Authentication, Consumer, Pulsar, SubType, TokioExecutor,
12};
13use std::path::Path;
14use tokio_util::codec::FramedRead;
15
16use vector_lib::{
17    codecs::{
18        decoding::{DeserializerConfig, FramingConfig},
19        StreamDecodingError,
20    },
21    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, SourceOutput},
22    configurable::configurable_component,
23    event::Event,
24    finalization::BatchStatus,
25    finalizer::OrderedFinalizer,
26    internal_event::{
27        ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
28        Registered,
29    },
30    sensitive_string::SensitiveString,
31    shutdown::ShutdownSignal,
32    EstimatedJsonEncodedSizeOf,
33};
34use vrl::{owned_value_path, path, value::Kind};
35
36use crate::{
37    codecs::{Decoder, DecodingConfig},
38    config::{SourceConfig, SourceContext},
39    event::BatchNotifier,
40    internal_events::{
41        PulsarErrorEvent, PulsarErrorEventData, PulsarErrorEventType, StreamClosedError,
42    },
43    serde::{bool_or_struct, default_decoding, default_framing_message_based},
44    SourceSender,
45};
46
47/// Configuration for the `pulsar` source.
48#[configurable_component(source("pulsar", "Collect logs from Apache Pulsar."))]
49#[derive(Clone, Debug, Derivative)]
50#[derivative(Default)]
51#[serde(deny_unknown_fields)]
52pub struct PulsarSourceConfig {
53    /// The endpoint to which the Pulsar client should connect to.
54    #[configurable(metadata(docs::examples = "pulsar://127.0.0.1:6650"))]
55    #[serde(alias = "address")]
56    endpoint: String,
57
58    /// The Pulsar topic names to read events from.
59    #[configurable(metadata(docs::examples = "[persistent://public/default/my-topic]"))]
60    topics: Vec<String>,
61
62    /// The Pulsar consumer name.
63    #[configurable(metadata(docs::examples = "consumer-name"))]
64    consumer_name: Option<String>,
65
66    /// The Pulsar subscription name.
67    #[configurable(metadata(docs::examples = "subscription_name"))]
68    subscription_name: Option<String>,
69
70    /// The consumer's priority level.
71    ///
72    /// The broker follows descending priorities. For example, 0=max-priority, 1, 2,...
73    ///
74    /// In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
75    priority_level: Option<i32>,
76
77    /// Max count of messages in a batch.
78    batch_size: Option<u32>,
79
80    #[configurable(derived)]
81    auth: Option<AuthConfig>,
82
83    #[configurable(derived)]
84    dead_letter_queue_policy: Option<DeadLetterQueuePolicy>,
85
86    #[configurable(derived)]
87    #[serde(default = "default_framing_message_based")]
88    #[derivative(Default(value = "default_framing_message_based()"))]
89    framing: FramingConfig,
90
91    #[configurable(derived)]
92    #[serde(default = "default_decoding")]
93    #[derivative(Default(value = "default_decoding()"))]
94    decoding: DeserializerConfig,
95
96    #[configurable(derived)]
97    #[serde(default, deserialize_with = "bool_or_struct")]
98    acknowledgements: SourceAcknowledgementsConfig,
99
100    /// The namespace to use for logs. This overrides the global setting.
101    #[configurable(metadata(docs::hidden))]
102    #[serde(default)]
103    log_namespace: Option<bool>,
104
105    #[configurable(derived)]
106    #[serde(default)]
107    tls: Option<TlsOptions>,
108}
109
110/// Authentication configuration.
111#[configurable_component]
112#[derive(Clone, Debug)]
113#[serde(deny_unknown_fields, untagged)]
114enum AuthConfig {
115    /// Basic authentication.
116    Basic {
117        /// Basic authentication name/username.
118        ///
119        /// This can be used either for basic authentication (username/password) or JWT authentication.
120        /// When used for JWT, the value should be `token`.
121        #[configurable(metadata(docs::examples = "${PULSAR_NAME}"))]
122        #[configurable(metadata(docs::examples = "name123"))]
123        name: String,
124
125        /// Basic authentication password/token.
126        ///
127        /// This can be used either for basic authentication (username/password) or JWT authentication.
128        /// When used for JWT, the value should be the signed JWT, in the compact representation.
129        #[configurable(metadata(docs::examples = "${PULSAR_TOKEN}"))]
130        #[configurable(metadata(docs::examples = "123456789"))]
131        token: SensitiveString,
132    },
133
134    /// OAuth authentication.
135    OAuth {
136        #[configurable(derived)]
137        oauth2: OAuth2Config,
138    },
139}
140
141/// OAuth2-specific authentication configuration.
142#[configurable_component]
143#[derive(Clone, Debug)]
144pub struct OAuth2Config {
145    /// The issuer URL.
146    #[configurable(metadata(docs::examples = "${OAUTH2_ISSUER_URL}"))]
147    #[configurable(metadata(docs::examples = "https://oauth2.issuer"))]
148    issuer_url: String,
149
150    /// The credentials URL.
151    ///
152    /// A data URL is also supported.
153    #[configurable(metadata(docs::examples = "${OAUTH2_CREDENTIALS_URL}"))]
154    #[configurable(metadata(docs::examples = "file:///oauth2_credentials"))]
155    #[configurable(metadata(docs::examples = "data:application/json;base64,cHVsc2FyCg=="))]
156    credentials_url: String,
157
158    /// The OAuth2 audience.
159    #[configurable(metadata(docs::examples = "${OAUTH2_AUDIENCE}"))]
160    #[configurable(metadata(docs::examples = "pulsar"))]
161    audience: Option<String>,
162
163    /// The OAuth2 scope.
164    #[configurable(metadata(docs::examples = "${OAUTH2_SCOPE}"))]
165    #[configurable(metadata(docs::examples = "admin"))]
166    scope: Option<String>,
167}
168
169/// Dead Letter Queue policy configuration.
170#[configurable_component]
171#[derive(Clone, Debug)]
172struct DeadLetterQueuePolicy {
173    /// Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
174    pub max_redeliver_count: usize,
175
176    /// Name of the dead letter topic where the failing messages will be sent.
177    pub dead_letter_topic: String,
178}
179
180#[configurable_component]
181#[configurable(description = "TLS options configuration for the Pulsar client.")]
182#[derive(Clone, Debug)]
183pub struct TlsOptions {
184    /// File path containing a list of PEM encoded certificates
185    #[configurable(metadata(docs::examples = "/etc/certs/chain.pem"))]
186    pub ca_file: String,
187
188    /// Enables certificate verification.
189    ///
190    /// Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates.
191    pub verify_certificate: Option<bool>,
192
193    /// Whether hostname verification is enabled when verify_certificate is false
194    ///
195    /// Set to true if not specified.
196    pub verify_hostname: Option<bool>,
197}
198
199#[derive(Debug)]
200struct FinalizerEntry {
201    topic: String,
202    message_id: MessageIdData,
203}
204
205impl_generate_config_from_default!(PulsarSourceConfig);
206
207#[async_trait::async_trait]
208#[typetag::serde(name = "pulsar")]
209impl SourceConfig for PulsarSourceConfig {
210    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
211        let log_namespace = cx.log_namespace(self.log_namespace);
212
213        let consumer = self.create_consumer().await?;
214        let decoder =
215            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
216                .build()?;
217        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
218
219        Ok(Box::pin(pulsar_source(
220            consumer,
221            decoder,
222            cx.shutdown,
223            cx.out,
224            acknowledgements,
225            log_namespace,
226        )))
227    }
228
229    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
230        let log_namespace = global_log_namespace.merge(self.log_namespace);
231
232        let schema_definition = self
233            .decoding
234            .schema_definition(log_namespace)
235            .with_standard_vector_source_metadata()
236            .with_source_metadata(
237                Self::NAME,
238                Some(LegacyKey::InsertIfEmpty(owned_value_path!("publish_time"))),
239                &owned_value_path!("publish_time"),
240                Kind::timestamp(),
241                Some("publish_time"),
242            )
243            .with_source_metadata(
244                Self::NAME,
245                Some(LegacyKey::InsertIfEmpty(owned_value_path!("topic"))),
246                &owned_value_path!("topic"),
247                Kind::bytes(),
248                Some("topic"),
249            )
250            .with_source_metadata(
251                Self::NAME,
252                Some(LegacyKey::InsertIfEmpty(owned_value_path!("producer_name"))),
253                &owned_value_path!("producer_name"),
254                Kind::bytes(),
255                Some("producer_name"),
256            );
257        vec![SourceOutput::new_maybe_logs(
258            self.decoding.output_type(),
259            schema_definition,
260        )]
261    }
262
263    fn can_acknowledge(&self) -> bool {
264        true
265    }
266}
267
268impl PulsarSourceConfig {
269    async fn create_consumer(
270        &self,
271    ) -> crate::Result<pulsar::consumer::Consumer<String, TokioExecutor>> {
272        let mut builder = Pulsar::builder(&self.endpoint, TokioExecutor);
273
274        if let Some(auth) = &self.auth {
275            builder = match auth {
276                AuthConfig::Basic { name, token } => builder.with_auth(Authentication {
277                    name: name.clone(),
278                    data: token.inner().as_bytes().to_vec(),
279                }),
280                AuthConfig::OAuth { oauth2 } => builder.with_auth_provider(
281                    OAuth2Authentication::client_credentials(OAuth2Params {
282                        issuer_url: oauth2.issuer_url.clone(),
283                        credentials_url: oauth2.credentials_url.clone(),
284                        audience: oauth2.audience.clone(),
285                        scope: oauth2.scope.clone(),
286                    }),
287                ),
288            };
289        }
290        if let Some(options) = &self.tls {
291            builder = builder.with_certificate_chain_file(Path::new(&options.ca_file))?;
292            builder =
293                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
294            builder = builder
295                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
296        }
297
298        let pulsar = builder.build().await?;
299
300        let mut consumer_builder: pulsar::ConsumerBuilder<TokioExecutor> = pulsar
301            .consumer()
302            .with_topics(&self.topics)
303            .with_subscription_type(SubType::Shared)
304            .with_options(pulsar::consumer::ConsumerOptions {
305                priority_level: self.priority_level,
306                ..Default::default()
307            });
308
309        if let Some(dead_letter_queue_policy) = &self.dead_letter_queue_policy {
310            consumer_builder =
311                consumer_builder.with_dead_letter_policy(pulsar::consumer::DeadLetterPolicy {
312                    max_redeliver_count: dead_letter_queue_policy.max_redeliver_count,
313                    dead_letter_topic: dead_letter_queue_policy.dead_letter_topic.clone(),
314                });
315        }
316
317        if let Some(batch_size) = self.batch_size {
318            consumer_builder = consumer_builder.with_batch_size(batch_size);
319        }
320        if let Some(consumer_name) = &self.consumer_name {
321            consumer_builder = consumer_builder.with_consumer_name(consumer_name);
322        }
323        if let Some(subscription_name) = &self.subscription_name {
324            consumer_builder = consumer_builder.with_subscription(subscription_name);
325        }
326
327        let consumer = consumer_builder.build::<String>().await?;
328
329        Ok(consumer)
330    }
331}
332
333async fn pulsar_source(
334    mut consumer: Consumer<String, TokioExecutor>,
335    decoder: Decoder,
336    mut shutdown: ShutdownSignal,
337    mut out: SourceSender,
338    acknowledgements: bool,
339    log_namespace: LogNamespace,
340) -> Result<(), ()> {
341    let (finalizer, mut ack_stream) =
342        OrderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
343
344    let bytes_received = register!(BytesReceived::from(Protocol::TCP));
345    let events_received = register!(EventsReceived);
346    let pulsar_error_events = register!(PulsarErrorEvent);
347
348    loop {
349        tokio::select! {
350            _ = &mut shutdown => break,
351            entry = ack_stream.next() => {
352                if let Some((status, entry)) = entry {
353                    handle_ack(&mut consumer, status, entry, &pulsar_error_events).await;
354                }
355            },
356            Some(maybe_message) = consumer.next() => {
357                match maybe_message {
358                    Ok(msg) => {
359                        bytes_received.emit(ByteSize(msg.payload.data.len()));
360                        parse_message(msg, &decoder, &finalizer, &mut out, &mut consumer, log_namespace, &events_received, &pulsar_error_events).await;
361                    }
362                    Err(error) => {
363                        pulsar_error_events.emit(PulsarErrorEventData{
364                            msg: error.to_string(),
365                            error_type:PulsarErrorEventType::Read,
366                        });
367                    }
368                }
369            },
370        }
371    }
372
373    Ok(())
374}
375
376#[allow(clippy::too_many_arguments)]
377async fn parse_message(
378    msg: Message<String>,
379    decoder: &Decoder,
380    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
381    out: &mut SourceSender,
382    consumer: &mut Consumer<String, TokioExecutor>,
383    log_namespace: LogNamespace,
384    events_received: &Registered<EventsReceived>,
385    pulsar_error_events: &Registered<PulsarErrorEvent>,
386) {
387    let publish_time = i64::try_from(msg.payload.metadata.publish_time)
388        .ok()
389        .and_then(|millis| chrono::Utc.timestamp_millis_opt(millis).latest());
390    let topic = msg.topic.clone();
391    let producer_name = msg.payload.metadata.producer_name.clone();
392
393    let mut stream = FramedRead::new(msg.payload.data.as_ref(), decoder.clone());
394    let stream = async_stream::stream! {
395        while let Some(next) = stream.next().await {
396            match next {
397                Ok((events, _byte_size)) => {
398                    events_received.emit(CountByteSize(
399                        events.len(),
400                        events.estimated_json_encoded_size_of(),
401                    ));
402
403                    let now = chrono::Utc::now();
404
405                    let events = events.into_iter().map(|mut event| {
406                        if let Event::Log(ref mut log) = event {
407                            log_namespace.insert_standard_vector_source_metadata(
408                                log,
409                                PulsarSourceConfig::NAME,
410                                now,
411                            );
412
413                            log_namespace.insert_source_metadata(
414                                PulsarSourceConfig::NAME,
415                                log,
416                                Some(LegacyKey::InsertIfEmpty(path!("publish_time"))),
417                                path!("publish_time"),
418                                publish_time,
419                            );
420
421                            log_namespace.insert_source_metadata(
422                                PulsarSourceConfig::NAME,
423                                log,
424                                Some(LegacyKey::InsertIfEmpty(path!("topic"))),
425                                path!("topic"),
426                                topic.clone(),
427                            );
428
429                            log_namespace.insert_source_metadata(
430                                PulsarSourceConfig::NAME,
431                                log,
432                                Some(LegacyKey::InsertIfEmpty(path!("producer_name"))),
433                                path!("producer_name"),
434                                producer_name.clone(),
435                            );
436                        }
437                        event
438                    });
439
440                    for event in events {
441                        yield event;
442                    }
443                }
444                Err(error) => {
445                    // Error is logged by `crate::codecs`, no further
446                    // handling is needed here.
447                    if !error.can_continue() {
448                        break;
449                    }
450                }
451            }
452        }
453    }
454    .boxed();
455
456    finalize_event_stream(
457        consumer,
458        finalizer,
459        out,
460        stream,
461        msg.topic.clone(),
462        msg.message_id().clone(),
463        pulsar_error_events,
464    )
465    .await;
466}
467
468/// Send the event stream created by the framed read to the `out` stream.
469async fn finalize_event_stream(
470    consumer: &mut Consumer<String, TokioExecutor>,
471    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
472    out: &mut SourceSender,
473    mut stream: std::pin::Pin<Box<dyn futures_util::Stream<Item = Event> + Send + '_>>,
474    topic: String,
475    message_id: MessageIdData,
476    pulsar_error_events: &Registered<PulsarErrorEvent>,
477) {
478    match finalizer {
479        Some(finalizer) => {
480            let (batch, receiver) = BatchNotifier::new_with_receiver();
481            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
482
483            match out.send_event_stream(&mut stream).await {
484                Err(_error) => {
485                    emit!(StreamClosedError { count: 1 });
486                }
487                Ok(_) => {
488                    finalizer.add(FinalizerEntry { topic, message_id }, receiver);
489                }
490            }
491        }
492        None => match out.send_event_stream(&mut stream).await {
493            Err(_error) => {
494                emit!(StreamClosedError { count: 1 });
495            }
496            Ok(_) => {
497                if let Err(error) = consumer.ack_with_id(topic.as_str(), message_id).await {
498                    pulsar_error_events.emit(PulsarErrorEventData {
499                        msg: error.to_string(),
500                        error_type: PulsarErrorEventType::Ack,
501                    });
502                }
503            }
504        },
505    }
506}
507
508async fn handle_ack(
509    consumer: &mut Consumer<String, TokioExecutor>,
510    status: BatchStatus,
511    entry: FinalizerEntry,
512    pulsar_error_events: &Registered<PulsarErrorEvent>,
513) {
514    match status {
515        BatchStatus::Delivered => {
516            if let Err(error) = consumer
517                .ack_with_id(entry.topic.as_str(), entry.message_id)
518                .await
519            {
520                pulsar_error_events.emit(PulsarErrorEventData {
521                    msg: error.to_string(),
522                    error_type: PulsarErrorEventType::Ack,
523                });
524            }
525        }
526        BatchStatus::Errored | BatchStatus::Rejected => {
527            if let Err(error) = consumer
528                .nack_with_id(entry.topic.as_str(), entry.message_id)
529                .await
530            {
531                pulsar_error_events.emit(PulsarErrorEventData {
532                    msg: error.to_string(),
533                    error_type: PulsarErrorEventType::NAck,
534                });
535            }
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use crate::sources::pulsar::PulsarSourceConfig;
543
544    #[test]
545    fn generate_config() {
546        crate::test_util::test_generate_config::<PulsarSourceConfig>();
547    }
548}
549
550#[cfg(feature = "pulsar-integration-tests")]
551#[cfg(test)]
552mod integration_tests {
553    use super::*;
554    use crate::config::log_schema;
555    use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
556    use crate::test_util::{collect_n, random_string, trace_init};
557    use crate::tls::TEST_PEM_INTERMEDIATE_CA_PATH;
558
559    fn pulsar_host() -> String {
560        std::env::var("PULSAR_HOST").unwrap_or_else(|_| "127.0.0.1".into())
561    }
562
563    fn pulsar_address(scheme: &str, port: u16) -> String {
564        format!("{}://{}:{}", scheme, pulsar_host(), port)
565    }
566    #[tokio::test]
567    async fn consumes_event_with_acknowledgements() {
568        pulsar_send_receive(
569            &pulsar_address("pulsar", 6650),
570            true,
571            LogNamespace::Legacy,
572            None,
573        )
574        .await;
575    }
576
577    #[tokio::test]
578    async fn consumes_event_with_acknowledgements_vector_namespace() {
579        pulsar_send_receive(
580            &pulsar_address("pulsar", 6650),
581            true,
582            LogNamespace::Vector,
583            None,
584        )
585        .await;
586    }
587
588    #[tokio::test]
589    async fn consumes_event_without_acknowledgements() {
590        pulsar_send_receive(
591            &pulsar_address("pulsar", 6650),
592            false,
593            LogNamespace::Legacy,
594            None,
595        )
596        .await;
597    }
598
599    #[tokio::test]
600    async fn consumes_event_without_acknowledgements_vector_namespace() {
601        pulsar_send_receive(
602            &pulsar_address("pulsar", 6650),
603            false,
604            LogNamespace::Vector,
605            None,
606        )
607        .await;
608    }
609
610    #[tokio::test]
611    async fn consumes_event_with_tls() {
612        pulsar_send_receive(
613            &pulsar_address("pulsar+ssl", 6651),
614            false,
615            LogNamespace::Vector,
616            Some(TlsOptions {
617                ca_file: TEST_PEM_INTERMEDIATE_CA_PATH.into(),
618                verify_certificate: None,
619                verify_hostname: None,
620            }),
621        )
622        .await;
623    }
624
625    async fn pulsar_send_receive(
626        endpoint: &str,
627        acknowledgements: bool,
628        log_namespace: LogNamespace,
629        tls: Option<TlsOptions>,
630    ) {
631        trace_init();
632
633        let topic = format!("test-{}", random_string(10));
634        let cnf = PulsarSourceConfig {
635            endpoint: endpoint.into(),
636            topics: vec![topic.clone()],
637            consumer_name: None,
638            subscription_name: None,
639            priority_level: None,
640            batch_size: None,
641            auth: None,
642            dead_letter_queue_policy: None,
643            framing: FramingConfig::Bytes,
644            decoding: DeserializerConfig::Bytes,
645            acknowledgements: acknowledgements.into(),
646            log_namespace: None,
647            tls: tls.clone(),
648        };
649        let mut builder = Pulsar::<TokioExecutor>::builder(&cnf.endpoint, TokioExecutor);
650        if let Some(options) = &tls {
651            builder = builder
652                .with_certificate_chain_file(Path::new(&options.ca_file))
653                .unwrap();
654            builder =
655                builder.with_allow_insecure_connection(!options.verify_certificate.unwrap_or(true));
656            builder = builder
657                .with_tls_hostname_verification_enabled(options.verify_hostname.unwrap_or(true));
658        }
659
660        let pulsar = builder.build().await.unwrap();
661
662        let consumer = cnf.create_consumer().await.unwrap();
663        let decoder = DecodingConfig::new(
664            cnf.framing.clone(),
665            cnf.decoding.clone(),
666            LogNamespace::Legacy,
667        )
668        .build()
669        .unwrap();
670
671        let mut producer = pulsar.producer().with_topic(topic).build().await.unwrap();
672
673        let msg = "test message";
674
675        let events = assert_source_compliance(&SOURCE_TAGS, async move {
676            let (tx, rx) = SourceSender::new_test();
677            tokio::spawn(pulsar_source(
678                consumer,
679                decoder,
680                ShutdownSignal::noop(),
681                tx,
682                acknowledgements,
683                log_namespace,
684            ));
685            producer.send_non_blocking(msg).await.unwrap();
686
687            collect_n(rx, 1).await
688        })
689        .await;
690
691        assert_eq!(
692            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
693            msg.into()
694        );
695    }
696}